From 7c62ee97e313696251d4504dfea7030a6a85c45b Mon Sep 17 00:00:00 2001 From: root Date: Fri, 22 May 2026 21:31:00 +0000 Subject: [PATCH] Fix SSE keepalive to use wall-clock timer, not progress callback The SSE keepalive comment () was emitted from inside the server_prefill_progress callback, which only fires on chunk boundaries. When a single prefill chunk stalls internally for longer than the client's idle timeout, no progress callback fires, no keepalive is sent, and the client eventually drops the socket. Fix: add a background pthread () that sends keepalive comments on a 5-second wall-clock timer independent of progress events. The thread is started before ds4_session_sync() and joined after it completes (or fails), covering both the main prefill path (generate_job) and the tool-checkpoint rebuild path (canonicalize_tool_checkpoint). Fixes #222 --- ds4_server.c | 72 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/ds4_server.c b/ds4_server.c index 435491fe..5961cb90 100644 --- a/ds4_server.c +++ b/ds4_server.c @@ -9215,8 +9215,37 @@ typedef struct { bool headers_sent; bool stream_failed; double last_keepalive; + pthread_t keepalive_thread; + volatile int keepalive_running; } server_prefill_progress; +/* Background thread: sends SSE keepalive comments on a wall-clock timer + * so the connection survives prefill stalls that don't trigger the + * progress callback (which only fires on chunk boundaries). */ +static void *keepalive_worker(void *arg) { + server_prefill_progress *p = (server_prefill_progress *)arg; + struct timespec ts = {.tv_sec = 5, .tv_nsec = 0}; + while (p->keepalive_running) { + nanosleep(&ts, NULL); + if (!p->keepalive_running) break; + if (p->stream && p->fd >= 0 && !p->stream_failed) { + if (!p->headers_sent) { + if (sse_headers(p->fd, p->enable_cors)) { + p->headers_sent = true; + } else { + p->stream_failed = true; + } + } else { + static const char ka[] = ": prefill\n\n"; + if (!send_all(p->fd, ka, sizeof(ka) - 1)) { + p->stream_failed = true; + } + } + } + } + return NULL; +} + static void request_ctx_span(char *buf, size_t len, int cached, int prompt) { int suffix = prompt - cached; if (suffix < 0) suffix = 0; @@ -9640,7 +9669,20 @@ static void canonicalize_tool_checkpoint(server *s, const job *j, const char *ct }; snprintf(rebuild_progress.ctx, sizeof(rebuild_progress.ctx), "%s", rebuild_ctx); ds4_session_set_progress(s->session, server_progress_cb, &rebuild_progress); + /* Start keepalive thread for rebuild sync. */ + if (rebuild_progress.stream) { + rebuild_progress.keepalive_running = 1; + if (pthread_create(&rebuild_progress.keepalive_thread, NULL, + keepalive_worker, &rebuild_progress) != 0) { + rebuild_progress.keepalive_running = 0; + } + } if (ds4_session_sync(s->session, sync_prompt, sync_err, sizeof(sync_err)) == 0) { + /* Stop keepalive thread. */ + if (rebuild_progress.keepalive_running) { + rebuild_progress.keepalive_running = 0; + pthread_join(rebuild_progress.keepalive_thread, NULL); + } ds4_session_set_progress(s->session, NULL, NULL); const double rebuild_sec = now_sec() - rebuild_t0; if (loaded > 0) { @@ -9659,6 +9701,11 @@ static void canonicalize_tool_checkpoint(server *s, const job *j, const char *ct common, live_len, canonical.len, err); } } else { + /* Stop keepalive thread on rebuild failure. */ + if (rebuild_progress.keepalive_running) { + rebuild_progress.keepalive_running = 0; + pthread_join(rebuild_progress.keepalive_thread, NULL); + } ds4_session_set_progress(s->session, NULL, NULL); server_log(DS4_LOG_KVCACHE, "ds4-server: tool checkpoint rebuild failed ctx=%s request_ctx=%s source=%s cached=%d replay=%d target=%d error=\"%s\"", @@ -9908,6 +9955,16 @@ static void generate_job(server *s, job *j) { req_flags); ds4_session_set_progress(s->session, server_progress_cb, &progress); + /* Start background keepalive thread so SSE connections survive + * prefill stalls that don't trigger the progress callback. */ + if (progress.stream) { + progress.keepalive_running = 1; + if (pthread_create(&progress.keepalive_thread, NULL, + keepalive_worker, &progress) != 0) { + progress.keepalive_running = 0; + } + } + int cold_store_len = 0; if (cached == 0 && s->kv.enabled && @@ -9942,6 +9999,11 @@ static void generate_job(server *s, job *j) { if (ds4_session_sync(s->session, &prefix, err, sizeof(err)) != 0) { ds4_tokens_free(&prefix); ds4_tokens_free(&effective_prompt); + /* Stop keepalive thread. */ + if (progress.keepalive_running) { + progress.keepalive_running = 0; + pthread_join(progress.keepalive_thread, NULL); + } ds4_session_set_progress(s->session, NULL, NULL); kv_cache_restore_suppressed_continued(&s->kv, suppressed_continued_last, cold_store_len); @@ -9962,6 +10024,11 @@ static void generate_job(server *s, job *j) { if (ds4_session_sync(s->session, prompt_for_sync, err, sizeof(err)) != 0) { ds4_tokens_free(&effective_prompt); + /* Stop keepalive thread before clearing progress callback. */ + if (progress.keepalive_running) { + progress.keepalive_running = 0; + pthread_join(progress.keepalive_thread, NULL); + } ds4_session_set_progress(s->session, NULL, NULL); kv_cache_restore_suppressed_continued(&s->kv, suppressed_continued_last, cold_store_len); @@ -9974,6 +10041,11 @@ static void generate_job(server *s, job *j) { if (!responses_live_continuation) responses_live_clear(s); if (!anthropic_live_continuation) anthropic_live_clear(s); if (!thinking_live_continuation) thinking_live_clear(s); + /* Stop keepalive thread before clearing progress callback. */ + if (progress.keepalive_running) { + progress.keepalive_running = 0; + pthread_join(progress.keepalive_thread, NULL); + } ds4_session_set_progress(s->session, NULL, NULL); kv_cache_maybe_store_continued(s); server_log(DS4_LOG_PREFILL,