Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@

### Added

- **Multi-core serving via SO_REUSEPORT.** `App.serve-with-workers`
forks `n` worker processes, each running its own event loop bound to
the same port. The kernel distributes incoming connections across
workers. Falls back to single-process `serve` when `n` is 1 or less.
`defserver` gains a `(workers N)` form for declarative configuration.

- **WebSocket fragment timeout and size limits.** Fragment accumulation
now tracks per-connection timestamps via `ConnState.ws-frag-start`.
`sweep-idle` closes connections where fragments have been accumulating
Expand Down
79 changes: 69 additions & 10 deletions web.carp
Original file line number Diff line number Diff line change
Expand Up @@ -2136,6 +2136,9 @@ fallback.")
(hidden wsp-form?)
(defndynamic wsp-form? [f] (and (list? f) (= 'WSP (car f))))

(hidden workers-form?)
(defndynamic workers-form? [f] (and (list? f) (= 'workers (car f))))

(hidden route-form?)
(defndynamic route-form? [f]
(and (list? f)
Expand Down Expand Up @@ -2173,7 +2176,7 @@ Sockets are non-blocking, reads accumulate into a per-connection buffer,
and writes drain across writable events.

The loop is single-threaded, so peak throughput is bounded by one core.
For multi-core scaling, run several copies behind a TCP load balancer.")
For multi-core scaling, see [`serve-with-workers`](#serve-with-workers).")
(defn serve [app before-hooks after-hooks host port]
(match (TcpListener.bind host port)
(Result.Error e) (IO.errorln &(fmt "Failed to bind: %s" &e))
Expand Down Expand Up @@ -2232,7 +2235,53 @@ For multi-core scaling, run several copies behind a TCP load balancer.")
(sweep-idle &cs &poll)
(flush-closed &cs &poll))))
(Poll.close poll)
(TcpListener.close listener))))))
(TcpListener.close listener)))))

; -----------------------------------------------------------------------
; serve-with-workers (fork-based multi-core)
; -----------------------------------------------------------------------

(doc serve-with-workers "starts `n` worker processes, each running its own
event loop bound to `host`:`port`. The kernel distributes incoming
connections across workers via SO_REUSEPORT (Linux 3.9+, most BSDs).

Falls back to single-process [`serve`](#serve) when `n` is 1 or less.

```
(App.serve-with-workers &app &bh &ah \"0.0.0.0\" 3000 4)
```

Or via [`defserver`](#defserver):

```
(defserver \"0.0.0.0\" 3000
(workers 4)
(GET \"/\" handler))
```")
(defn serve-with-workers [app before-hooks after-hooks host port n]
(if (<= n 1)
(serve app before-hooks after-hooks host port)
(let-do [spawned 0]
(for [i 0 n]
(let [pid (System.fork)]
(cond
(< pid 0) (IO.errorln "fork() failed")
(= pid 0)
(do
(serve app before-hooks after-hooks host port)
(System.exit 0))
(do
(IO.println &(fmt "Spawned worker %d (pid %d)" (+ i 1) pid))
(set! spawned (+ spawned 1))))))
(if (= spawned 0)
(IO.errorln "No workers spawned; exiting")
(do
(System.signal System.signal-int (fn [_] ()))
(System.signal System.signal-term (fn [_] ()))
(let [status 0]
(for [_ 0 spawned]
(ignore (System.wait (Pointer.address &status)))))
(IO.println &(fmt "All %d workers exited" spawned))))))))
; end defmodule App

; ---------------------------------------------------------------------------
Expand Down Expand Up @@ -2343,15 +2392,18 @@ Each route is `(METHOD \"/path\" handler)` where METHOD is one of `GET`,
`POST`, `PUT`, `DELETE`, or `PATCH`. Any other form in the body is
evaluated as a setup expression before the server starts, in source order.

Use `(workers N)` to fork N worker processes for multi-core scaling via
SO_REUSEPORT. Each worker runs its own event loop bound to the same port.

```
(defserver \"0.0.0.0\" 3000
(Item.create-table &db)
(workers 4)
(GET \"/api/todos\" api-list)
(POST \"/api/todos\" api-add)
(DELETE \"/api/todos/:id\" api-delete))
```

is equivalent to:
Without `(workers N)`, is equivalent to:

```
(defn main []
Expand All @@ -2361,16 +2413,20 @@ is equivalent to:
(App.serve &app &bh &ah \"0.0.0.0\" 3000)))
```")
(defmacro defserver [host port :rest body]
(let [; Separate routes, hooks, and setup expressions
(let [; Extract worker count if present, then filter it out
worker-forms (filter App.workers-form? body)
worker-count (if (= 0 (length worker-forms)) 0 (cadr (car worker-forms)))
forms (filter (fn [f] (not (App.workers-form? f))) body)
; Separate routes, hooks, and setup expressions
routes (filter
(fn [f]
(and (App.route-form? f)
(not (App.before-form? f))
(not (App.after-form? f))))
body)
befores (filter App.before-form? body)
afters (filter App.after-form? body)
setup (filter (fn [f] (not (App.route-form? f))) body)
forms)
befores (filter App.before-form? forms)
afters (filter App.after-form? forms)
setup (filter (fn [f] (not (App.route-form? f))) forms)
; Build the app with routes
app-form (cons '-> (cons '(App.create) (map App.route-call routes)))
; Build hook arrays as lambdas wrapping the user's functions
Expand All @@ -2385,9 +2441,12 @@ is equivalent to:
(list 'fn ['req 'params 'resp] (list (cadr f) 'req 'params 'resp)))
afters)
array)
serve-call (if (> worker-count 0)
(list 'App.serve-with-workers '&app '&bh '&ah host port worker-count)
(list 'App.serve '&app '&bh '&ah host port))
body-forms (append setup
(list
(list 'let
['app app-form 'bh before-arr 'ah after-arr]
(list 'App.serve '&app '&bh '&ah host port))))]
serve-call)))]
(eval (list 'defn 'main (array) (cons 'do body-forms)))))