diff --git a/CHANGELOG.md b/CHANGELOG.md index 89b1534..f738959 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **Server-Sent Events API (#3).** First-class `text/event-stream` helpers on + `HttpResponse` — `sseStart()`, `sseEvent($data, $event, $id, $retry)`, + `sseComment()` and `sseRetry()` — layered on the existing streaming pipeline, + so the same handler works over HTTP/1.1, HTTP/2 and HTTP/3. `sseStart()` sets + the canonical headers (`Content-Type: text/event-stream`, `Cache-Control: + no-cache, no-transform`, `X-Accel-Buffering: no`) and marks the response + non-compressible. Framing follows WHATWG §9.2: multiline `data` is split per + line, single-line fields reject CR/LF and `id` rejects NUL. phpt coverage for + H1/H2/H3 plus the validation surface. + - **hq-interop (HTTP/0.9-over-QUIC) for the interop matrix (#80).** A second QUIC ALPN, `hq-interop`, served straight off the transport (no nghttp3): a raw bidi stream `GET ` returns the file bytes + FIN from `setHttp3HqDocroot()`. diff --git a/CMakeLists.txt b/CMakeLists.txt index 39c0fc9..b64d4b1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -42,6 +42,7 @@ set(CORE_SOURCES src/http_server_exceptions.c src/http_request.c src/http_response.c + src/http_sse.c src/uploaded_file.c src/http_mime.c src/http_date.c diff --git a/README.md b/README.md index b41cefa..0cb2240 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ This means you can serve a REST API over HTTP/2, push real-time events over Serv | ✅ Ready | **HTTP/3 / QUIC** | UDP transport via ngtcp2 + nghttp3; OpenSSL 3.5 QUIC API | | ✅ Ready | **Compression** | gzip (zlib-ng / zlib), Brotli, zstd — response encoding + inbound decode across H1/H2/H3. Server-side preference `zstd > br > gzip`; per-codec level setters. See [docs/COMPRESSION.md](docs/COMPRESSION.md). | | 📋 Planned | **WebSocket** | RFC 6455, upgrade from HTTP/1.1 and HTTP/2, full duplex | -| 📋 Planned | **SSE (Server-Sent Events)** | RFC 8895, server-to-client event streaming | +| ✅ Ready | **SSE (Server-Sent Events)** | `text/event-stream` framing (WHATWG §9.2) over H1/H2/H3 via `HttpResponse::sseStart/sseEvent/sseComment/sseRetry` | | 📋 Planned | **gRPC** | Built on HTTP/2, unary and streaming RPC | ### Development Progress @@ -56,7 +56,7 @@ TLS ████████████████████ 100% HTTP/2 ████████████████████ 100% HTTP/3 ████████████████████ 100% WebSocket ░░░░░░░░░░░░░░░░░░░░ 0% -SSE ░░░░░░░░░░░░░░░░░░░░ 0% +SSE ████████████████████ 100% gRPC ░░░░░░░░░░░░░░░░░░░░ 0% ``` @@ -258,9 +258,37 @@ behaviour. See **[docs/USAGE.md](docs/USAGE.md)** for protocol-restricted listeners (`addHttp1Listener`/`addHttp2Listener`/`addHttp3Listener`), TLS, compression, timeouts, backpressure and caveats. +### Server-Sent Events + +`text/event-stream` is a first-class response mode — the same handler streams +over HTTP/1.1, HTTP/2 and HTTP/3 (the client picks the protocol): + +```php +$server->addHttpHandler(function ($request, $response) { + $response->sseStart(); // commits Content-Type: text/event-stream + $response->sseRetry(3000); // reconnect hint (ms) + + foreach (fetchUpdates() as $i => $update) { + $response->sseEvent( + data: json_encode($update), + event: 'tick', // addEventListener('tick', …) on the client + id: (string) $i, // echoed back as Last-Event-ID on reconnect + ); + if (!$response->sendable()) break; // peer gone — stop early + } + + $response->end(); +}); +``` + +`sseEvent()` formats WHATWG §9.2 records (multiline `data` is split, single-line +fields are CR/LF-validated); `sseComment('')` emits a `:` heartbeat to hold the +connection open through proxy idle timeouts. The stream is never compressed. + Working examples live under [`examples/`](examples/): [`minimal-server.php`](examples/minimal-server.php), [`demo-server.php`](examples/demo-server.php), +[`sse-server.php`](examples/sse-server.php), [`multi-worker.php`](examples/multi-worker.php), [`multi-worker-manual.php`](examples/multi-worker-manual.php). diff --git a/config.m4 b/config.m4 index 9d0a68b..dab8f40 100644 --- a/config.m4 +++ b/config.m4 @@ -513,6 +513,7 @@ if test "$PHP_HTTP_SERVER" != "no"; then src/formats/multipart_processor.c src/http_request.c src/http_response.c + src/http_sse.c src/http_response_server_api.c src/http_body_stream.c src/http_mime.c diff --git a/config.w32 b/config.w32 index 91e2219..8b37a6d 100644 --- a/config.w32 +++ b/config.w32 @@ -20,6 +20,7 @@ if (PHP_TRUE_ASYNC_SERVER == "yes") { "src\\http_server_class.c " + "src\\http_request.c " + "src\\http_response.c " + + "src\\http_sse.c " + "src\\http_response_server_api.c " + "src\\uploaded_file.c " + // HTTP utilities (mirrors config.m4 base source list) diff --git a/examples/sse-server.php b/examples/sse-server.php new file mode 100644 index 0000000..7b921d8 --- /dev/null +++ b/examples/sse-server.php @@ -0,0 +1,64 @@ + console.log('message', e.data); + * es.addEventListener('tick', e => console.log('tick', e.data, e.lastEventId)); + */ + +use TrueAsync\HttpServer; +use TrueAsync\HttpServerConfig; +use function Async\delay; + +$config = (new HttpServerConfig()) + ->addListener('0.0.0.0', (int)(getenv('PORT') ?: 8080)) + ->setWriteTimeout(0); // long-lived stream: no write deadline + +$server = new HttpServer($config); + +$server->addHttpHandler(function ($req, $res) { + // Commit the SSE headers (Content-Type, Cache-Control, X-Accel-Buffering). + // Optional — the first sseEvent()/sseComment() starts the stream too. + $res->sseStart(); + + // Hint the browser to wait 3s before reconnecting after a drop. + $res->sseRetry(3000); + + // Comment line = heartbeat that keeps proxies from idling the conn out. + $res->sseComment('stream open'); + + for ($i = 1; $i <= 10; $i++) { + // A named event with an id (echoed back as Last-Event-ID on reconnect). + $res->sseEvent( + data: json_encode(['n' => $i, 'at' => time()]), + event: 'tick', + id: (string) $i, + ); + + // sendable() is an advisory backpressure check — skip the sleep and + // bail early if the peer has gone away. + if (!$res->sendable()) { + break; + } + + delay(1000); // 1s between events (cooperative, non-blocking) + } + + // A default (unnamed) message event, then close the stream. + $res->sseEvent('bye'); + $res->end(); +}); + +fprintf(STDERR, "[sse-server] http://127.0.0.1:%d/events pid=%d\n", + (int)(getenv('PORT') ?: 8080), getmypid()); + +$server->start(); diff --git a/src/http_sse.c b/src/http_sse.c new file mode 100644 index 0000000..3d7719d --- /dev/null +++ b/src/http_sse.c @@ -0,0 +1,430 @@ +/* + +----------------------------------------------------------------------+ + | Copyright (c) TrueAsync | + +----------------------------------------------------------------------+ + | Licensed under the Apache License, Version 2.0 | + +----------------------------------------------------------------------+ +*/ + +/* Server-Sent Events (text/event-stream). + * + * SSE is not a separate protocol — it is a Content-Type convention plus + * the small line-oriented framing defined by WHATWG §9.2, layered on top + * of the existing HttpResponse::send() streaming pipeline (HTTP/1 chunked, + * HTTP/2 + HTTP/3 DATA frames). These helpers only (1) set the canonical + * headers so a handler can't ship a broken stream behind nginx/a CDN and + * (2) format event records correctly so handlers don't reinvent framing. + * + * Wire commit is lazy: the headers are set and the response is locked into + * streaming mode here, but the actual HEADERS frame / status line is + * emitted by the protocol stream_ops on the first append_chunk — exactly + * the same path the first send() drives. */ + +#ifdef HAVE_CONFIG_H +#include +#endif + +#include "php.h" +#include "zend_exceptions.h" +#include "zend_smart_str.h" +#include "php_http_server.h" +#include "http_response_internal.h" + +#ifdef HAVE_HTTP_COMPRESSION +#include "compression/http_compression_response.h" +#endif + +/* Events are delimited by a blank line, fields by a single LF. WHATWG + * accepts CR / CRLF on input but we always emit LF. CR / LF inside a + * single-line field (event / id) is a framing-injection bug — reject it. */ +static bool sse_has_newline(const zend_string *s) +{ + if (s == NULL) { + return false; + } + + const char *const p = ZSTR_VAL(s); + const size_t len = ZSTR_LEN(s); + + for (size_t i = 0; i < len; i++) { + if (p[i] == '\n' || p[i] == '\r') { + return true; + } + } + + return false; +} + +/* Set one header by literal lowercase name, replacing any prior value. + * Stores a flat IS_STRING — the canonical single-value shape every wire + * formatter already understands (see add_header_value in http_response.c). */ +static void sse_set_header(HashTable *headers, const char *name, size_t name_len, const char *value, + size_t value_len) +{ + zend_string *const key = zend_string_init(name, name_len, 0); + zval v; + ZVAL_STRINGL(&v, value, value_len); + zend_hash_update(headers, key, &v); + zend_string_release(key); +} + +/* Reject sseStart() over a response that already carries a non-SSE + * Content-Type — that is a programming bug, not something to paper over. + * A leading "text/event-stream" (with optional parameters) is allowed. */ +static bool sse_content_type_conflicts(const HashTable *headers) +{ + zval *const ct = zend_hash_str_find(headers, "content-type", sizeof("content-type") - 1); + if (ct == NULL) { + return false; + } + + const char *val = NULL; + size_t val_len = 0; + if (Z_TYPE_P(ct) == IS_STRING) { + val = Z_STRVAL_P(ct); + val_len = Z_STRLEN_P(ct); + } else if (Z_TYPE_P(ct) == IS_ARRAY) { + zval *const first = zend_hash_index_find(Z_ARRVAL_P(ct), 0); + if (first != NULL && Z_TYPE_P(first) == IS_STRING) { + val = Z_STRVAL_P(first); + val_len = Z_STRLEN_P(first); + } + } + + static const char expected[] = "text/event-stream"; + static const size_t expected_len = sizeof(expected) - 1; + + if (val != NULL && val_len >= expected_len && + zend_binary_strcasecmp(val, expected_len, expected, expected_len) == 0) { + return false; + } + + return val != NULL; +} + +/* Idempotent SSE init: validate Content-Type, set the three canonical + * headers, mark the response non-compressible, and switch it into + * streaming mode at the PHP boundary. Emits nothing on the wire — the + * first append_chunk drives the protocol header commit. Returns false on + * error with an exception already thrown. */ +static bool sse_ensure_started(http_response_object *response) +{ + if (response->streaming) { + return true; + } + + if (response->closed) { + zend_throw_exception(http_server_runtime_exception_ce, + "Cannot start SSE on a closed response", 0); + return false; + } + + if (response->send_file_req != NULL) { + zend_throw_exception(http_server_runtime_exception_ce, + "Response is sealed by sendFile() — cannot switch to SSE", 0); + return false; + } + + if (UNEXPECTED(response->stream_ops == NULL)) { + zend_throw_exception(http_server_runtime_exception_ce, + "SSE requires a streaming-capable response (no stream ops — " + "response is detached from a connection)", + 0); + return false; + } + + if (sse_content_type_conflicts(response->headers)) { + zend_throw_exception(http_server_invalid_argument_exception_ce, + "Response already has a non-SSE Content-Type — remove it before " + "switching to SSE", + 0); + return false; + } + + sse_set_header(response->headers, "content-type", sizeof("content-type") - 1, + "text/event-stream", sizeof("text/event-stream") - 1); + sse_set_header(response->headers, "cache-control", sizeof("cache-control") - 1, + "no-cache, no-transform", sizeof("no-cache, no-transform") - 1); + /* nginx-specific: disables proxy_buffering for this response. Harmless + * on protocols / proxies that don't recognise it. */ + sse_set_header(response->headers, "x-accel-buffering", sizeof("x-accel-buffering") - 1, "no", + sizeof("no") - 1); + +#ifdef HAVE_HTTP_COMPRESSION + /* A buffering gzip stream defeats real-time delivery — never compress + * an event stream. SSE dispatches through the raw stream_ops (not the + * send() wrapper), but mark it explicitly so intent is unambiguous. */ + http_compression_mark_no_compression(&response->std); +#endif + + response->streaming = true; + response->committed = true; + response->headers_sent = true; + return true; +} + +/* Append one "data: \n" per line of `value`, splitting on + * LF / CR / CRLF (WHATWG §9.2). A terminator at the very end yields a + * trailing empty data line so the consumer reconstructs the final LF. */ +static void sse_append_data(smart_str *out, const char *p, size_t len) +{ + size_t i = 0; + while (true) { + const size_t start = i; + while (i < len && p[i] != '\n' && p[i] != '\r') { + i++; + } + + smart_str_appendl(out, "data: ", sizeof("data: ") - 1); + smart_str_appendl(out, p + start, i - start); + smart_str_appendc(out, '\n'); + + if (i >= len) { + break; + } + + if (p[i] == '\r' && i + 1 < len && p[i + 1] == '\n') { + i += 2; + } else { + i++; + } + + if (i >= len) { + smart_str_appendl(out, "data: \n", sizeof("data: \n") - 1); + break; + } + } +} + +/* Append ": \n". Caller guarantees `value` has no CR / LF. */ +static void sse_append_field(smart_str *out, const char *field, size_t field_len, const char *value, + size_t value_len) +{ + smart_str_appendl(out, field, field_len); + smart_str_appendl(out, ": ", 2); + if (value_len > 0) { + smart_str_appendl(out, value, value_len); + } + smart_str_appendc(out, '\n'); +} + +/* Push a finalised event payload through the installed stream ops. + * append_chunk takes ownership of the payload ref (so we never release it) + * and suspends the handler under backpressure on H2/H3. Mirrors send(): + * a dead stream surfaces as a 499 the handler may catch. */ +static void sse_dispatch(http_response_object *response, zend_string *payload) +{ + const int rc = response->stream_ops->append_chunk(response->stream_ctx, payload); + + if (rc == HTTP_STREAM_APPEND_STREAM_DEAD) { + zend_throw_exception_ex(http_exception_ce, 499, "stream closed by peer"); + } +} + +/* {{{ proto HttpResponse::sseStart(): static */ +ZEND_METHOD(TrueAsync_HttpResponse, sseStart) +{ + ZEND_PARSE_PARAMETERS_NONE(); + + http_response_object *const response = Z_HTTP_RESPONSE_P(ZEND_THIS); + + if (response->streaming) { + zend_throw_exception(http_server_runtime_exception_ce, + "sseStart(): response is already in streaming mode", 0); + return; + } + + if (!sse_ensure_started(response)) { + return; + } + + RETURN_OBJ_COPY(Z_OBJ_P(ZEND_THIS)); +} +/* }}} */ + +/* {{{ proto HttpResponse::sseEvent(?string $data = null, ?string $event = null, + * ?string $id = null, ?int $retry = null): static */ +ZEND_METHOD(TrueAsync_HttpResponse, sseEvent) +{ + zend_string *data = NULL; + zend_string *event = NULL; + zend_string *id = NULL; + zend_long retry = 0; + bool retry_is_null = true; + + ZEND_PARSE_PARAMETERS_START(0, 4) + Z_PARAM_OPTIONAL + Z_PARAM_STR_OR_NULL(data) + Z_PARAM_STR_OR_NULL(event) + Z_PARAM_STR_OR_NULL(id) + Z_PARAM_LONG_OR_NULL(retry, retry_is_null) + ZEND_PARSE_PARAMETERS_END(); + + http_response_object *const response = Z_HTTP_RESPONSE_P(ZEND_THIS); + + if (response->closed) { + zend_throw_exception(http_server_runtime_exception_ce, + "Cannot sseEvent() on a closed response", 0); + return; + } + + /* CR / LF in a single-line field would let the value inject extra + * fields or terminate the record early — reject so the bug surfaces. */ + if (sse_has_newline(event)) { + zend_throw_exception(http_server_invalid_argument_exception_ce, + "sseEvent(): $event must not contain CR or LF", 0); + return; + } + + if (sse_has_newline(id)) { + zend_throw_exception(http_server_invalid_argument_exception_ce, + "sseEvent(): $id must not contain CR or LF", 0); + return; + } + + /* WHATWG §9.2: a U+0000 in `id` makes the parser ignore the field. */ + if (id != NULL && memchr(ZSTR_VAL(id), '\0', ZSTR_LEN(id)) != NULL) { + zend_throw_exception(http_server_invalid_argument_exception_ce, + "sseEvent(): $id must not contain NUL bytes", 0); + return; + } + + if (!retry_is_null && retry < 0) { + zend_throw_exception(http_server_invalid_argument_exception_ce, + "sseEvent(): $retry must be a non-negative integer", 0); + return; + } + + /* All arguments unset — nothing to dispatch, don't start the stream. */ + if (data == NULL && event == NULL && id == NULL && retry_is_null) { + RETURN_OBJ_COPY(Z_OBJ_P(ZEND_THIS)); + } + + if (!sse_ensure_started(response)) { + return; + } + + /* Conventional field order: id, event, retry, then data. A spec parser + * is order-agnostic; this is what every implementation emits. */ + smart_str payload = {0}; + if (id != NULL) { + sse_append_field(&payload, "id", 2, ZSTR_VAL(id), ZSTR_LEN(id)); + } + + if (event != NULL) { + sse_append_field(&payload, "event", 5, ZSTR_VAL(event), ZSTR_LEN(event)); + } + + if (!retry_is_null) { + char buf[32]; + const int n = snprintf(buf, sizeof(buf), ZEND_LONG_FMT, retry); + if (n > 0) { + sse_append_field(&payload, "retry", 5, buf, (size_t)n); + } + } + + if (data != NULL) { + sse_append_data(&payload, ZSTR_VAL(data), ZSTR_LEN(data)); + } + + smart_str_appendc(&payload, '\n'); + smart_str_0(&payload); + + sse_dispatch(response, payload.s); + if (EG(exception)) { + return; + } + + RETURN_OBJ_COPY(Z_OBJ_P(ZEND_THIS)); +} +/* }}} */ + +/* {{{ proto HttpResponse::sseComment(string $text = ""): static */ +ZEND_METHOD(TrueAsync_HttpResponse, sseComment) +{ + zend_string *text = NULL; + + ZEND_PARSE_PARAMETERS_START(0, 1) + Z_PARAM_OPTIONAL + Z_PARAM_STR(text) + ZEND_PARSE_PARAMETERS_END(); + + http_response_object *const response = Z_HTTP_RESPONSE_P(ZEND_THIS); + + if (response->closed) { + zend_throw_exception(http_server_runtime_exception_ce, + "Cannot sseComment() on a closed response", 0); + return; + } + + if (sse_has_newline(text)) { + zend_throw_exception(http_server_invalid_argument_exception_ce, + "sseComment(): $text must not contain CR or LF", 0); + return; + } + + if (!sse_ensure_started(response)) { + return; + } + + smart_str payload = {0}; + smart_str_appendc(&payload, ':'); + if (text != NULL && ZSTR_LEN(text) > 0) { + smart_str_appendc(&payload, ' '); + smart_str_append(&payload, text); + } + smart_str_appendl(&payload, "\n\n", 2); + smart_str_0(&payload); + + sse_dispatch(response, payload.s); + if (EG(exception)) { + return; + } + + RETURN_OBJ_COPY(Z_OBJ_P(ZEND_THIS)); +} +/* }}} */ + +/* {{{ proto HttpResponse::sseRetry(int $milliseconds): static */ +ZEND_METHOD(TrueAsync_HttpResponse, sseRetry) +{ + zend_long milliseconds; + + ZEND_PARSE_PARAMETERS_START(1, 1) + Z_PARAM_LONG(milliseconds) + ZEND_PARSE_PARAMETERS_END(); + + http_response_object *const response = Z_HTTP_RESPONSE_P(ZEND_THIS); + + if (response->closed) { + zend_throw_exception(http_server_runtime_exception_ce, + "Cannot sseRetry() on a closed response", 0); + return; + } + + if (milliseconds < 0) { + zend_throw_exception(http_server_invalid_argument_exception_ce, + "sseRetry(): $milliseconds must be a non-negative integer", 0); + return; + } + + if (!sse_ensure_started(response)) { + return; + } + + char buf[32]; + const int n = snprintf(buf, sizeof(buf), ZEND_LONG_FMT, milliseconds); + + smart_str payload = {0}; + sse_append_field(&payload, "retry", 5, buf, n > 0 ? (size_t)n : 0); + smart_str_appendc(&payload, '\n'); + smart_str_0(&payload); + + sse_dispatch(response, payload.s); + if (EG(exception)) { + return; + } + + RETURN_OBJ_COPY(Z_OBJ_P(ZEND_THIS)); +} +/* }}} */ diff --git a/stubs/HttpResponse.php b/stubs/HttpResponse.php index 9a86eee..9d7bf0d 100644 --- a/stubs/HttpResponse.php +++ b/stubs/HttpResponse.php @@ -314,6 +314,93 @@ public function end(?string $data = null): void {} */ public function sendFile(string $path, ?SendFileOptions $options = null): void {} + // === Server-Sent Events (text/event-stream) === + + /** + * Switch the response into Server-Sent Events mode and commit headers. + * + * Sets the three canonical SSE headers — `Content-Type: + * text/event-stream`, `Cache-Control: no-cache, no-transform` and + * `X-Accel-Buffering: no` (the last tells nginx not to buffer the + * response; without it events stall behind the proxy buffer until it + * fills) — and marks the response as not-compressible (a buffering + * gzip stream would defeat real-time delivery). The response then + * enters streaming mode exactly as the first {@see self::send()} would: + * status + headers are committed and may no longer change, but no event + * data is emitted until the first sseEvent()/sseComment(). + * + * Calling sseStart() is optional — the first sseEvent()/sseComment() + * starts the stream implicitly. Use it when you want headers on the + * wire immediately (e.g. to unblock the browser's `onopen`) before any + * event is ready. + * + * Throws {@see HttpServerInvalidArgumentException} if the handler has + * already set a Content-Type other than `text/event-stream`, and + * {@see HttpServerRuntimeException} if the response is already + * streaming, closed, or has no connection to stream over. + * + * @return static + */ + public function sseStart(): static {} + + /** + * Format and send one Server-Sent Event, starting the stream if needed. + * + * Multiline `$data` is split on `\n` / `\r\n` / `\r` and emitted as one + * `data:` field per line (WHATWG §9.2 event-stream framing). `$event`, + * `$id` and `$retry` are emitted only when non-null. The record is + * terminated by a blank line so the browser dispatches it immediately. + * + * `$event` and `$id` must not contain `\r` or `\n` (the parser would + * read them as field/record separators) and `$id` must not contain NUL + * (WHATWG: a NUL makes the parser ignore the whole id) — violations + * throw {@see HttpServerInvalidArgumentException}. `$retry` must be + * non-negative. + * + * Empty `$data === ""` is valid and dispatches an empty MessageEvent. + * All four arguments null is a no-op. Note the EventSource parser drops + * an event carrying neither `data` nor `retry`. + * + * @param string|null $data Message payload. Multiline strings are split. + * @param string|null $event Event name (matched by addEventListener()). + * @param string|null $id Event id — echoed as Last-Event-ID on reconnect. + * @param int|null $retry Reconnect delay hint in milliseconds. + * @return static + */ + public function sseEvent( + ?string $data = null, + ?string $event = null, + ?string $id = null, + ?int $retry = null + ): static {} + + /** + * Send an SSE comment line (a record beginning with `:`). + * + * Browsers ignore comments, but they keep the connection alive past + * intermediary idle timeouts (nginx `proxy_read_timeout`, default 60s). + * Call periodically as a heartbeat — the canonical payload is the empty + * string, which becomes `:\n\n` on the wire. Starts the stream if it is + * not already running. + * + * `$text` must not contain `\r` or `\n`. + * + * @param string $text Optional comment payload (informational only). + * @return static + */ + public function sseComment(string $text = ""): static {} + + /** + * Send a bare `retry:` directive telling the browser how long to wait + * before reconnecting after the stream drops, in milliseconds. Sugar + * for sseEvent(retry: $milliseconds) with no message payload. Starts + * the stream if it is not already running. + * + * @param int $milliseconds Non-negative reconnect delay hint. + * @return static + */ + public function sseRetry(int $milliseconds): static {} + // === State methods === /** diff --git a/stubs/HttpResponse.php_arginfo.h b/stubs/HttpResponse.php_arginfo.h index 3340f1a..55c5c65 100644 --- a/stubs/HttpResponse.php_arginfo.h +++ b/stubs/HttpResponse.php_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit HttpResponse.php.stub.php instead. - * Stub hash: de5a2f1533fa4bdbb587efea257bfe7cad2b0829 */ + * Stub hash: 8a423bc943bfc02703da487693f605779200d059 */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_TrueAsync_HttpResponse___construct, 0, 0, 0) ZEND_END_ARG_INFO() @@ -110,6 +110,23 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_TrueAsync_HttpResponse_sen ZEND_ARG_OBJ_INFO_WITH_DEFAULT_VALUE(0, options, TrueAsync\\SendFileOptions, 1, "null") ZEND_END_ARG_INFO() +#define arginfo_class_TrueAsync_HttpResponse_sseStart arginfo_class_TrueAsync_HttpResponse_resetHeaders + +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_TrueAsync_HttpResponse_sseEvent, 0, 0, IS_STATIC, 0) + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, data, IS_STRING, 1, "null") + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, event, IS_STRING, 1, "null") + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, id, IS_STRING, 1, "null") + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, retry, IS_LONG, 1, "null") +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_TrueAsync_HttpResponse_sseComment, 0, 0, IS_STATIC, 0) + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, text, IS_STRING, 0, "\"\"") +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_TrueAsync_HttpResponse_sseRetry, 0, 1, IS_STATIC, 0) + ZEND_ARG_TYPE_INFO(0, milliseconds, IS_LONG, 0) +ZEND_END_ARG_INFO() + #define arginfo_class_TrueAsync_HttpResponse_isHeadersSent arginfo_class_TrueAsync_HttpResponse_sendable #define arginfo_class_TrueAsync_HttpResponse_isClosed arginfo_class_TrueAsync_HttpResponse_sendable @@ -145,6 +162,10 @@ ZEND_METHOD(TrueAsync_HttpResponse, html); ZEND_METHOD(TrueAsync_HttpResponse, redirect); ZEND_METHOD(TrueAsync_HttpResponse, end); ZEND_METHOD(TrueAsync_HttpResponse, sendFile); +ZEND_METHOD(TrueAsync_HttpResponse, sseStart); +ZEND_METHOD(TrueAsync_HttpResponse, sseEvent); +ZEND_METHOD(TrueAsync_HttpResponse, sseComment); +ZEND_METHOD(TrueAsync_HttpResponse, sseRetry); ZEND_METHOD(TrueAsync_HttpResponse, isHeadersSent); ZEND_METHOD(TrueAsync_HttpResponse, isClosed); @@ -180,6 +201,10 @@ static const zend_function_entry class_TrueAsync_HttpResponse_methods[] = { ZEND_ME(TrueAsync_HttpResponse, redirect, arginfo_class_TrueAsync_HttpResponse_redirect, ZEND_ACC_PUBLIC) ZEND_ME(TrueAsync_HttpResponse, end, arginfo_class_TrueAsync_HttpResponse_end, ZEND_ACC_PUBLIC) ZEND_ME(TrueAsync_HttpResponse, sendFile, arginfo_class_TrueAsync_HttpResponse_sendFile, ZEND_ACC_PUBLIC) + ZEND_ME(TrueAsync_HttpResponse, sseStart, arginfo_class_TrueAsync_HttpResponse_sseStart, ZEND_ACC_PUBLIC) + ZEND_ME(TrueAsync_HttpResponse, sseEvent, arginfo_class_TrueAsync_HttpResponse_sseEvent, ZEND_ACC_PUBLIC) + ZEND_ME(TrueAsync_HttpResponse, sseComment, arginfo_class_TrueAsync_HttpResponse_sseComment, ZEND_ACC_PUBLIC) + ZEND_ME(TrueAsync_HttpResponse, sseRetry, arginfo_class_TrueAsync_HttpResponse_sseRetry, ZEND_ACC_PUBLIC) ZEND_ME(TrueAsync_HttpResponse, isHeadersSent, arginfo_class_TrueAsync_HttpResponse_isHeadersSent, ZEND_ACC_PUBLIC) ZEND_ME(TrueAsync_HttpResponse, isClosed, arginfo_class_TrueAsync_HttpResponse_isClosed, ZEND_ACC_PUBLIC) ZEND_FE_END diff --git a/tests/phpt/server/h1/022-h1-sse-api.phpt b/tests/phpt/server/h1/022-h1-sse-api.phpt new file mode 100644 index 0000000..c735100 --- /dev/null +++ b/tests/phpt/server/h1/022-h1-sse-api.phpt @@ -0,0 +1,109 @@ +--TEST-- +HttpResponse SSE API — HTTP/1.1 chunked framing, headers, streaming lock +--EXTENSIONS-- +true_async_server +true_async +--SKIPIF-- + +--FILE-- +addListener('127.0.0.1', $port) + ->setReadTimeout(10) + ->setWriteTimeout(10) +); + +$server->addHttpHandler(function ($req, $res) { + $res->sseStart(); + $res->sseComment(); // heartbeat -> ":\n\n" + $res->sseEvent("hello"); // data: hello + $res->sseEvent("line1\nline2"); // two data: lines + $res->sseEvent("tick", event: "ping", id: "42"); + $res->sseRetry(5000); // retry: 5000 + + $lock = []; + try { $res->setHeader('X-Late', '1'); $lock[] = 'setHeader:no-throw'; } + catch (Throwable $e) { $lock[] = 'setHeader:throw'; } + try { $res->sseStart(); $lock[] = 'start:no-throw'; } + catch (Throwable $e) { $lock[] = 'start:throw'; } + $res->sseEvent(implode(',', $lock)); + + $res->end(); +}); + +$client = spawn(function () use ($port, $server) { + usleep(30000); + $cmd = sprintf( + 'curl --http1.1 -i -s -N --max-time 3 http://127.0.0.1:%d/events', + $port + ); + $raw = shell_exec($cmd) ?? ''; + + $split = preg_split("/\r\n\r\n/", $raw, 2); + $head = $split[0] ?? ''; + $body = $split[1] ?? ''; + + $h = fn($needle) => stripos($head, $needle) !== false ? 'yes' : 'no'; + echo "ct=", $h('Content-Type: text/event-stream'), "\n"; + echo "cc=", $h('Cache-Control: no-cache, no-transform'), "\n"; + echo "xab=", $h('X-Accel-Buffering: no'), "\n"; + echo "te=", $h('Transfer-Encoding: chunked'), "\n"; + + echo "---BODY---\n"; + echo $body; + echo "---END---\n"; + + $t = $server->getTelemetry(); + echo "sends=", $t['stream_send_calls_total'], "\n"; + + $server->stop(); +}); + +$server->start(); +await($client); +echo "done\n"; +?> +--EXPECT-- +ct=yes +cc=yes +xab=yes +te=yes +---BODY--- +: + +data: hello + +data: line1 +data: line2 + +id: 42 +event: ping +data: tick + +retry: 5000 + +data: setHeader:throw,start:throw + +---END--- +sends=6 +done diff --git a/tests/phpt/server/h1/023-h1-sse-validation.phpt b/tests/phpt/server/h1/023-h1-sse-validation.phpt new file mode 100644 index 0000000..9c1e657 --- /dev/null +++ b/tests/phpt/server/h1/023-h1-sse-validation.phpt @@ -0,0 +1,93 @@ +--TEST-- +HttpResponse SSE API — input validation throws before the stream commits +--EXTENSIONS-- +true_async_server +true_async +--SKIPIF-- + +--FILE-- +addListener('127.0.0.1', $port) + ->setReadTimeout(10) + ->setWriteTimeout(10) +); + +$server->addHttpHandler(function ($req, $res) { + $mark = function (callable $fn): string { + try { $fn(); return 'no-throw'; } + catch (Throwable $e) { + $cls = $e::class; + $short = substr($cls, strrpos($cls, '\\') + 1); + $m = $e->getMessage(); + $kind = + str_contains($m, 'CR or LF') ? 'newline' : + (str_contains($m, 'NUL') ? 'nul' : + (str_contains($m, 'non-negative')? 'retry' : + (str_contains($m, 'non-SSE') ? 'ct' : 'other'))); + return "$short:$kind"; + } + }; + + $out = []; + $out['event-lf'] = $mark(fn () => $res->sseEvent('ok', event: "a\nb")); + $out['event-cr'] = $mark(fn () => $res->sseEvent('ok', event: "a\rb")); + $out['id-lf'] = $mark(fn () => $res->sseEvent('ok', id: "a\nb")); + $out['id-nul'] = $mark(fn () => $res->sseEvent('ok', id: "a\0b")); + $out['retry-neg'] = $mark(fn () => $res->sseEvent('ok', retry: -1)); + $out['comment-lf'] = $mark(fn () => $res->sseComment("a\nb")); + $out['sseRetry-neg'] = $mark(fn () => $res->sseRetry(-5)); + + /* Conflicting Content-Type set up-front makes sseStart() throw. */ + $res->setHeader('Content-Type', 'application/json'); + $out['conflict-ct'] = $mark(fn () => $res->sseStart()); + + /* None of the above committed the response — still buffered. */ + $out['committed'] = $res->isHeadersSent() ? 'yes' : 'no'; + + $body = ''; + foreach ($out as $k => $v) { $body .= "$k=$v\n"; } + + $res->setStatusCode(200) + ->setHeader('Content-Type', 'text/plain') + ->setBody($body); +}); + +$client = spawn(function () use ($port, $server) { + usleep(30000); + $cmd = sprintf('curl --http1.1 -s --max-time 3 http://127.0.0.1:%d/v', $port); + echo shell_exec($cmd) ?? ''; + $server->stop(); +}); + +$server->start(); +await($client); +echo "done\n"; +?> +--EXPECT-- +event-lf=HttpServerInvalidArgumentException:newline +event-cr=HttpServerInvalidArgumentException:newline +id-lf=HttpServerInvalidArgumentException:newline +id-nul=HttpServerInvalidArgumentException:nul +retry-neg=HttpServerInvalidArgumentException:retry +comment-lf=HttpServerInvalidArgumentException:newline +sseRetry-neg=HttpServerInvalidArgumentException:retry +conflict-ct=HttpServerInvalidArgumentException:ct +committed=no +done diff --git a/tests/phpt/server/h2/025-h2-sse-api.phpt b/tests/phpt/server/h2/025-h2-sse-api.phpt new file mode 100644 index 0000000..e26d196 --- /dev/null +++ b/tests/phpt/server/h2/025-h2-sse-api.phpt @@ -0,0 +1,85 @@ +--TEST-- +HttpResponse SSE API — HTTP/2 (h2c) event stream over DATA frames +--EXTENSIONS-- +true_async_server +true_async +--SKIPIF-- + true]); +?> +--FILE-- +addListener('127.0.0.1', $port) + ->setReadTimeout(10) + ->setWriteTimeout(10); + +$server = new HttpServer($config); +$server->addHttpHandler(function ($req, $res) { + $res->sseStart(); + $res->sseEvent("hello"); + $res->sseEvent("multi\nline"); + $res->sseEvent("named", event: "ping", id: "7"); + $res->sseComment(); + $res->end(); +}); + +$client = spawn(function () use ($port, $server) { + usleep(30000); + $cmd = sprintf( + 'curl --http2-prior-knowledge -i -s --max-time 3 http://127.0.0.1:%d/events', + $port + ); + $raw = shell_exec($cmd) ?? ''; + + $split = preg_split("/\r\n\r\n/", $raw, 2); + $head = $split[0] ?? ''; + $body = $split[1] ?? ''; + + $h = fn($needle) => stripos($head, $needle) !== false ? 'yes' : 'no'; + echo "ct=", $h('content-type: text/event-stream'), "\n"; + echo "cc=", $h('cache-control: no-cache, no-transform'), "\n"; + echo "xab=", $h('x-accel-buffering: no'), "\n"; + + echo "---BODY---\n"; + echo $body; + echo "---END---\n"; + + $server->stop(); +}); + +$server->start(); +await($client); +echo "done\n"; +?> +--EXPECT-- +ct=yes +cc=yes +xab=yes +---BODY--- +data: hello + +data: multi +data: line + +id: 7 +event: ping +data: named + +: + +---END--- +done diff --git a/tests/phpt/server/h3/041-h3-sse-api.phpt b/tests/phpt/server/h3/041-h3-sse-api.phpt new file mode 100644 index 0000000..e240d9d --- /dev/null +++ b/tests/phpt/server/h3/041-h3-sse-api.phpt @@ -0,0 +1,90 @@ +--TEST-- +HttpResponse SSE API — HTTP/3 event stream over QUIC DATA frames +--EXTENSIONS-- +true_async_server +true_async +--SKIPIF-- + true, 'h3client' => true]); +?> +--FILE-- +/dev/null', + escapeshellarg($key), escapeshellarg($cert)), $_, $rc); +if ($rc !== 0) { echo "cert gen failed\n"; exit(1); } + +$expected_body = + "data: hello\n\n" . + "data: multi\ndata: line\n\n" . + "id: 7\nevent: ping\ndata: named\n\n"; + +$port = 20560 + getmypid() % 40; + +$config = (new HttpServerConfig()) + ->addListener('127.0.0.1', $port + 1) + ->addHttp3Listener('127.0.0.1', $port) + ->enableTls(true)->setCertificate($cert)->setPrivateKey($key); +$server = new HttpServer($config); +$server->addHttpHandler(function ($req, $res) { + $res->sseStart(); + $res->sseEvent("hello"); + $res->sseEvent("multi\nline"); + $res->sseEvent("named", event: "ping", id: "7"); + $res->end(); +}); + +$client_bin = __DIR__ . '/../../../h3client/h3client'; + +$client = spawn(function () use ($server, $port, $client_bin, $tmp, $expected_body) { + usleep(80000); + $errf = $tmp . '/err.txt'; + $cmd = sprintf('H3CLIENT_VERBOSE_HEADERS=1 %s 127.0.0.1 %d /events GET 2>%s', + escapeshellarg($client_bin), $port, escapeshellarg($errf)); + $body = shell_exec($cmd) ?? ''; + $err = @file_get_contents($errf) ?: ''; + + $status = preg_match('/STATUS=(\d+)/', $err, $m) ? (int)$m[1] : -1; + $headers = preg_match('/HEADERS=(\d+)/', $err, $m) ? (int)$m[1] : -1; + + echo "status=", $status, "\n"; + echo "body=", $body === $expected_body ? "ok" : "bad", "\n"; + echo "headers_ge_3=", $headers >= 3 ? "ok" : "bad ($headers)", "\n"; + + $s = $server->getHttp3Stats()[0] ?? []; + echo "streams_opened=", (int)($s['h3_streams_opened'] ?? -1), "\n"; + echo "response_submitted=", (int)($s['h3_response_submitted'] ?? -1), "\n"; + + $server->stop(); +}); + +$server->start(); +await($client); + +@unlink($cert); @unlink($key); @unlink($tmp . '/err.txt'); @rmdir($tmp); +echo "done\n"; +?> +--EXPECT-- +status=200 +body=ok +headers_ge_3=ok +streams_opened=1 +response_submitted=1 +done