From 1a09a5360a3cabd76da030c1984ed6060f9c6598 Mon Sep 17 00:00:00 2001 From: Test Date: Sun, 14 Jun 2026 18:38:02 +0200 Subject: [PATCH] Add multi-core serving via SO_REUSEPORT fork-based workers Implement App.serve-with-workers that forks n child processes, each running its own event loop bound to the same host:port. The kernel distributes incoming connections across workers via SO_REUSEPORT (already set on TcpListener). Falls back to single-process serve when n <= 1. Add (workers N) form to defserver for declarative configuration. Addresses issue #7. --- CHANGELOG.md | 6 ++++ web.carp | 79 +++++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 75 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 05608f5..7709430 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ ### Added +- **Multi-core serving via SO_REUSEPORT.** `App.serve-with-workers` + forks `n` worker processes, each running its own event loop bound to + the same port. The kernel distributes incoming connections across + workers. Falls back to single-process `serve` when `n` is 1 or less. + `defserver` gains a `(workers N)` form for declarative configuration. + - **WebSocket fragment timeout and size limits.** Fragment accumulation now tracks per-connection timestamps via `ConnState.ws-frag-start`. `sweep-idle` closes connections where fragments have been accumulating diff --git a/web.carp b/web.carp index 684908e..9c7487f 100644 --- a/web.carp +++ b/web.carp @@ -2136,6 +2136,9 @@ fallback.") (hidden wsp-form?) (defndynamic wsp-form? [f] (and (list? f) (= 'WSP (car f)))) + (hidden workers-form?) + (defndynamic workers-form? [f] (and (list? f) (= 'workers (car f)))) + (hidden route-form?) (defndynamic route-form? [f] (and (list? f) @@ -2173,7 +2176,7 @@ Sockets are non-blocking, reads accumulate into a per-connection buffer, and writes drain across writable events. The loop is single-threaded, so peak throughput is bounded by one core. -For multi-core scaling, run several copies behind a TCP load balancer.") +For multi-core scaling, see [`serve-with-workers`](#serve-with-workers).") (defn serve [app before-hooks after-hooks host port] (match (TcpListener.bind host port) (Result.Error e) (IO.errorln &(fmt "Failed to bind: %s" &e)) @@ -2232,7 +2235,53 @@ For multi-core scaling, run several copies behind a TCP load balancer.") (sweep-idle &cs &poll) (flush-closed &cs &poll)))) (Poll.close poll) - (TcpListener.close listener)))))) + (TcpListener.close listener))))) + + ; ----------------------------------------------------------------------- + ; serve-with-workers (fork-based multi-core) + ; ----------------------------------------------------------------------- + + (doc serve-with-workers "starts `n` worker processes, each running its own +event loop bound to `host`:`port`. The kernel distributes incoming +connections across workers via SO_REUSEPORT (Linux 3.9+, most BSDs). + +Falls back to single-process [`serve`](#serve) when `n` is 1 or less. + +``` +(App.serve-with-workers &app &bh &ah \"0.0.0.0\" 3000 4) +``` + +Or via [`defserver`](#defserver): + +``` +(defserver \"0.0.0.0\" 3000 + (workers 4) + (GET \"/\" handler)) +```") + (defn serve-with-workers [app before-hooks after-hooks host port n] + (if (<= n 1) + (serve app before-hooks after-hooks host port) + (let-do [spawned 0] + (for [i 0 n] + (let [pid (System.fork)] + (cond + (< pid 0) (IO.errorln "fork() failed") + (= pid 0) + (do + (serve app before-hooks after-hooks host port) + (System.exit 0)) + (do + (IO.println &(fmt "Spawned worker %d (pid %d)" (+ i 1) pid)) + (set! spawned (+ spawned 1)))))) + (if (= spawned 0) + (IO.errorln "No workers spawned; exiting") + (do + (System.signal System.signal-int (fn [_] ())) + (System.signal System.signal-term (fn [_] ())) + (let [status 0] + (for [_ 0 spawned] + (ignore (System.wait (Pointer.address &status))))) + (IO.println &(fmt "All %d workers exited" spawned)))))))) ; end defmodule App ; --------------------------------------------------------------------------- @@ -2343,15 +2392,18 @@ Each route is `(METHOD \"/path\" handler)` where METHOD is one of `GET`, `POST`, `PUT`, `DELETE`, or `PATCH`. Any other form in the body is evaluated as a setup expression before the server starts, in source order. +Use `(workers N)` to fork N worker processes for multi-core scaling via +SO_REUSEPORT. Each worker runs its own event loop bound to the same port. + ``` (defserver \"0.0.0.0\" 3000 - (Item.create-table &db) + (workers 4) (GET \"/api/todos\" api-list) (POST \"/api/todos\" api-add) (DELETE \"/api/todos/:id\" api-delete)) ``` -is equivalent to: +Without `(workers N)`, is equivalent to: ``` (defn main [] @@ -2361,16 +2413,20 @@ is equivalent to: (App.serve &app &bh &ah \"0.0.0.0\" 3000))) ```") (defmacro defserver [host port :rest body] - (let [; Separate routes, hooks, and setup expressions + (let [; Extract worker count if present, then filter it out + worker-forms (filter App.workers-form? body) + worker-count (if (= 0 (length worker-forms)) 0 (cadr (car worker-forms))) + forms (filter (fn [f] (not (App.workers-form? f))) body) + ; Separate routes, hooks, and setup expressions routes (filter (fn [f] (and (App.route-form? f) (not (App.before-form? f)) (not (App.after-form? f)))) - body) - befores (filter App.before-form? body) - afters (filter App.after-form? body) - setup (filter (fn [f] (not (App.route-form? f))) body) + forms) + befores (filter App.before-form? forms) + afters (filter App.after-form? forms) + setup (filter (fn [f] (not (App.route-form? f))) forms) ; Build the app with routes app-form (cons '-> (cons '(App.create) (map App.route-call routes))) ; Build hook arrays as lambdas wrapping the user's functions @@ -2385,9 +2441,12 @@ is equivalent to: (list 'fn ['req 'params 'resp] (list (cadr f) 'req 'params 'resp))) afters) array) + serve-call (if (> worker-count 0) + (list 'App.serve-with-workers '&app '&bh '&ah host port worker-count) + (list 'App.serve '&app '&bh '&ah host port)) body-forms (append setup (list (list 'let ['app app-form 'bh before-arr 'ah after-arr] - (list 'App.serve '&app '&bh '&ah host port))))] + serve-call)))] (eval (list 'defn 'main (array) (cons 'do body-forms)))))