diff --git a/ds4_server.c b/ds4_server.c index 435491fe..5961cb90 100644 --- a/ds4_server.c +++ b/ds4_server.c @@ -9215,8 +9215,37 @@ typedef struct { bool headers_sent; bool stream_failed; double last_keepalive; + pthread_t keepalive_thread; + volatile int keepalive_running; } server_prefill_progress; +/* Background thread: sends SSE keepalive comments on a wall-clock timer + * so the connection survives prefill stalls that don't trigger the + * progress callback (which only fires on chunk boundaries). */ +static void *keepalive_worker(void *arg) { + server_prefill_progress *p = (server_prefill_progress *)arg; + struct timespec ts = {.tv_sec = 5, .tv_nsec = 0}; + while (p->keepalive_running) { + nanosleep(&ts, NULL); + if (!p->keepalive_running) break; + if (p->stream && p->fd >= 0 && !p->stream_failed) { + if (!p->headers_sent) { + if (sse_headers(p->fd, p->enable_cors)) { + p->headers_sent = true; + } else { + p->stream_failed = true; + } + } else { + static const char ka[] = ": prefill\n\n"; + if (!send_all(p->fd, ka, sizeof(ka) - 1)) { + p->stream_failed = true; + } + } + } + } + return NULL; +} + static void request_ctx_span(char *buf, size_t len, int cached, int prompt) { int suffix = prompt - cached; if (suffix < 0) suffix = 0; @@ -9640,7 +9669,20 @@ static void canonicalize_tool_checkpoint(server *s, const job *j, const char *ct }; snprintf(rebuild_progress.ctx, sizeof(rebuild_progress.ctx), "%s", rebuild_ctx); ds4_session_set_progress(s->session, server_progress_cb, &rebuild_progress); + /* Start keepalive thread for rebuild sync. */ + if (rebuild_progress.stream) { + rebuild_progress.keepalive_running = 1; + if (pthread_create(&rebuild_progress.keepalive_thread, NULL, + keepalive_worker, &rebuild_progress) != 0) { + rebuild_progress.keepalive_running = 0; + } + } if (ds4_session_sync(s->session, sync_prompt, sync_err, sizeof(sync_err)) == 0) { + /* Stop keepalive thread. */ + if (rebuild_progress.keepalive_running) { + rebuild_progress.keepalive_running = 0; + pthread_join(rebuild_progress.keepalive_thread, NULL); + } ds4_session_set_progress(s->session, NULL, NULL); const double rebuild_sec = now_sec() - rebuild_t0; if (loaded > 0) { @@ -9659,6 +9701,11 @@ static void canonicalize_tool_checkpoint(server *s, const job *j, const char *ct common, live_len, canonical.len, err); } } else { + /* Stop keepalive thread on rebuild failure. */ + if (rebuild_progress.keepalive_running) { + rebuild_progress.keepalive_running = 0; + pthread_join(rebuild_progress.keepalive_thread, NULL); + } ds4_session_set_progress(s->session, NULL, NULL); server_log(DS4_LOG_KVCACHE, "ds4-server: tool checkpoint rebuild failed ctx=%s request_ctx=%s source=%s cached=%d replay=%d target=%d error=\"%s\"", @@ -9908,6 +9955,16 @@ static void generate_job(server *s, job *j) { req_flags); ds4_session_set_progress(s->session, server_progress_cb, &progress); + /* Start background keepalive thread so SSE connections survive + * prefill stalls that don't trigger the progress callback. */ + if (progress.stream) { + progress.keepalive_running = 1; + if (pthread_create(&progress.keepalive_thread, NULL, + keepalive_worker, &progress) != 0) { + progress.keepalive_running = 0; + } + } + int cold_store_len = 0; if (cached == 0 && s->kv.enabled && @@ -9942,6 +9999,11 @@ static void generate_job(server *s, job *j) { if (ds4_session_sync(s->session, &prefix, err, sizeof(err)) != 0) { ds4_tokens_free(&prefix); ds4_tokens_free(&effective_prompt); + /* Stop keepalive thread. */ + if (progress.keepalive_running) { + progress.keepalive_running = 0; + pthread_join(progress.keepalive_thread, NULL); + } ds4_session_set_progress(s->session, NULL, NULL); kv_cache_restore_suppressed_continued(&s->kv, suppressed_continued_last, cold_store_len); @@ -9962,6 +10024,11 @@ static void generate_job(server *s, job *j) { if (ds4_session_sync(s->session, prompt_for_sync, err, sizeof(err)) != 0) { ds4_tokens_free(&effective_prompt); + /* Stop keepalive thread before clearing progress callback. */ + if (progress.keepalive_running) { + progress.keepalive_running = 0; + pthread_join(progress.keepalive_thread, NULL); + } ds4_session_set_progress(s->session, NULL, NULL); kv_cache_restore_suppressed_continued(&s->kv, suppressed_continued_last, cold_store_len); @@ -9974,6 +10041,11 @@ static void generate_job(server *s, job *j) { if (!responses_live_continuation) responses_live_clear(s); if (!anthropic_live_continuation) anthropic_live_clear(s); if (!thinking_live_continuation) thinking_live_clear(s); + /* Stop keepalive thread before clearing progress callback. */ + if (progress.keepalive_running) { + progress.keepalive_running = 0; + pthread_join(progress.keepalive_thread, NULL); + } ds4_session_set_progress(s->session, NULL, NULL); kv_cache_maybe_store_continued(s); server_log(DS4_LOG_PREFILL,