Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions tools/sse-timeout-probe/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# sse-timeout-probe

Empirical reproducer for the Databricks Apps SSE timeout gap
([ES-1742245](https://databricks.atlassian.net/browse/ES-1742245), UI-01 / UI-02 in
the EMEA Apps "gaps that matter" doc).

## What it does

Opens one SSE connection per duration in a configurable ladder, holds each one
idle (or paced by a server-side heartbeat), and reports how long each connection
actually survived and how it was terminated. Runs against a Databricks App, an
EKS control, or `localhost`. Running it against a known-good origin and the
Databricks-hosted one in sequence gives you the per-layer ceiling without
having to triangulate from noisy LLM traces.

## Why this exists

The source doc (`Apps Gaps That Matter to EMEA Apps`) and ES-1742245 disagree
about what kills SSE connections. The doc says the drop is "distinct from the
120s idle timeout" and blames buffering / HTTP/2 multiplexing. The ticket's own
diagnosis (Naïm Achahboun) says the drop *is* caused by the effective request
timeout — multi-agent LLM calls take varying durations, so some finish under
the ceiling and some don't.

This probe answers the question deterministically: hold an idle SSE for X
seconds, see if it survives, record where the timeout lives. Comparing results
across durations and with/without heartbeat tells you whether the ceiling is
idle-based (heartbeats save it) or absolute (heartbeats don't).

## Usage

```bash
# Inside a workspace: deploy `server.ts` as the app entrypoint.
# Then from a machine with network access to the app URL:
tsx tools/sse-timeout-probe/probe.ts \
--base-url https://my-app.<workspace>.cloud.databricks.com \
--header "Cookie=<oauth2-proxy session cookie>" \
--durations 30000,60000,90000,120000,150000,180000,240000,300000 \
--json | tee apps-results.jsonl

# Control run against the same codebase on EKS/localhost:
tsx tools/sse-timeout-probe/probe.ts --base-url http://localhost:8000 \
--json | tee local-results.jsonl

# Compare: if `outcome: server-close` clusters sharply at a duration in the
# Apps run but not locally, that duration is your ceiling.
```

Flags:
- `--base-url <URL>` (required) — base URL of the SSE-serving app.
- `--path <PATH>` — SSE endpoint path. Default: `/sse-probe`.
- `--durations <LIST>` — comma-separated milliseconds, one connection per entry. Default: `30000,60000,90000,120000,150000,180000,240000,300000`.
- `--heartbeat <MS>` — if `>0`, the server emits a keepalive comment every N ms. Distinguishes idle-timeout from absolute request-timeout.
- `--header <K=V>` — extra request header. Repeatable (e.g. for auth cookies).
- `--json` — emit one JSON line per result.

## What to look for

- **Sharp cliff at ~60s, 90s, 120s, or 180s** → that's the effective ceiling. Cross-reference with:
- `apps/gateway` (`request_timeout=60`, `pool_idle_timeout=90`, `header_read_timeout=30`)
- `apps/oauth2-proxy` (`DefaultUpstreamTimeout=30`)
- DP ApiProxy envoy (`idle_timeout=180s` / `1200s`)
- **No cliff, `outcome: completed` throughout** → the drop isn't timeout-driven; follow the buffering / HTTP/2 hypothesis.
- **Cliff moves when `--heartbeat` is added** → idle-timeout. Document the heartbeat pattern as the fix.
- **Cliff is identical with and without heartbeat** → absolute request timeout. Requires per-route override on `apps/gateway`.

## Follow-ups

- Wire the companion server into `apps/dev-playground` so probing is one `pnpm deploy` away.
- Export results to a small notebook template for the comparison visualization.
- Add a WebSocket variant of the probe so UI-02 (ping/pong bypass) can be measured on the same axes.
217 changes: 217 additions & 0 deletions tools/sse-timeout-probe/probe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
#!/usr/bin/env tsx
/**
* sse-timeout-probe: Empirically locate the idle-timeout ceiling that kills SSE
* streams on Databricks Apps.
*
* Background: internal field feedback (ES-1742245, and the EMEA Apps "gaps that
* matter" doc) reports ~75% of SSE connections drop mid-stream through the Apps
* reverse proxy. The source doc claims the drop is distinct from any idle timeout,
* but the ticket's own diagnosis (Naïm Achahboun) suggests the drop *is* caused by
* the effective request timeout — multi-agent LLM calls take varying durations, so
* ~30% finish under the ceiling and ~70% exceed it.
*
* This probe reproduces the condition deterministically: it opens one SSE connection
* per configured duration, keeps it idle (or paced by a configurable heartbeat), and
* records how long the connection survived and how it was terminated. Running it
* against a known-good origin (EKS / localhost) vs a Databricks Apps deployment
* gives you the per-layer ceiling without having to triangulate from noisy LLM
* traces.
*
* Usage: tsx tools/sse-timeout-probe/probe.ts --base-url <URL> [flags]
* See --help for flags.
*/

import { performance } from "node:perf_hooks";

interface ProbeConfig {
baseUrl: string;
path: string;
durationsMs: number[];
heartbeatMs: number;
headers: Record<string, string>;
jsonOutput: boolean;
}

interface ProbeResult {
targetDurationMs: number;
actualLifetimeMs: number;
outcome: "completed" | "server-close" | "network-error" | "timeout-header";
detail?: string;
bytesReceived: number;
firstByteMs?: number;
}

function parseArgs(argv: string[]): ProbeConfig {
const args = new Map<string, string>();
for (let i = 0; i < argv.length; i++) {
const a = argv[i];
if (a.startsWith("--")) {
const key = a.slice(2);
const value = argv[i + 1] && !argv[i + 1].startsWith("--") ? argv[++i] : "true";
args.set(key, value);
}
}

if (args.has("help") || !args.has("base-url")) {
process.stderr.write(
[
"usage: tsx tools/sse-timeout-probe/probe.ts --base-url <URL> [flags]",
"",
"flags:",
" --base-url <URL> required. Base URL of the SSE-serving app.",
" --path <PATH> SSE endpoint path. Default: /sse-probe",
" --durations <LIST> comma-separated ms, one connection per entry.",
" Default: 30000,60000,90000,120000,150000,180000,240000,300000",
" --heartbeat <MS> if >0, send a heartbeat comment every MS on the",
" *server* side (requires --path to point at the",
" companion server). If 0, connection is fully idle.",
" Default: 0",
" --header <K=V> extra request header. Repeatable.",
" --json emit machine-readable JSON line per result.",
" --help show this message.",
"",
].join("\n"),
);
process.exit(args.has("help") ? 0 : 2);
}

const headers: Record<string, string> = {};
for (const raw of argv.filter((a) => a.startsWith("--header"))) {
// handled below via repeated flag — placeholder to satisfy linter
void raw;
}
// repeat-flag parser
for (let i = 0; i < argv.length; i++) {
if (argv[i] === "--header" && argv[i + 1]) {
const [k, ...rest] = argv[i + 1].split("=");
headers[k] = rest.join("=");
i++;
}
}

const durations = (
args.get("durations") ?? "30000,60000,90000,120000,150000,180000,240000,300000"
)
.split(",")
.map((s) => Number.parseInt(s.trim(), 10))
.filter((n) => Number.isFinite(n) && n > 0);

return {
baseUrl: args.get("base-url")!.replace(/\/$/, ""),
path: args.get("path") ?? "/sse-probe",
durationsMs: durations,
heartbeatMs: Number.parseInt(args.get("heartbeat") ?? "0", 10),
headers,
jsonOutput: args.get("json") === "true",
};
}

async function probeOnce(config: ProbeConfig, targetDurationMs: number): Promise<ProbeResult> {
const url = new URL(config.path, config.baseUrl);
url.searchParams.set("hold-ms", String(targetDurationMs));
url.searchParams.set("heartbeat-ms", String(config.heartbeatMs));

const start = performance.now();
const controller = new AbortController();
const hardTimeout = setTimeout(() => controller.abort(new Error("probe-hard-timeout")), targetDurationMs + 15_000);

let bytesReceived = 0;
let firstByteMs: number | undefined;
let outcome: ProbeResult["outcome"] = "completed";
let detail: string | undefined;

try {
const resp = await fetch(url, {
method: "GET",
headers: {
Accept: "text/event-stream",
"Cache-Control": "no-cache",
...config.headers,
},
signal: controller.signal,
});

if (!resp.ok) {
return {
targetDurationMs,
actualLifetimeMs: performance.now() - start,
outcome: "server-close",
detail: `HTTP ${resp.status} ${resp.statusText}`,
bytesReceived: 0,
};
}

if (!resp.body) {
return {
targetDurationMs,
actualLifetimeMs: performance.now() - start,
outcome: "network-error",
detail: "no response body",
bytesReceived: 0,
};
}

const reader = resp.body.getReader();
// eslint-disable-next-line no-constant-condition
while (true) {
const { value, done } = await reader.read();
if (firstByteMs === undefined && value) firstByteMs = performance.now() - start;
if (done) {
outcome = "server-close";
break;
}
if (value) bytesReceived += value.byteLength;
}
} catch (err) {
const e = err as Error;
if (e.name === "AbortError" && (e as Error & { cause?: Error }).cause?.message === "probe-hard-timeout") {
outcome = "timeout-header";
detail = "probe hard-timeout triggered (connection never closed)";
} else {
outcome = "network-error";
detail = e.message;
}
} finally {
clearTimeout(hardTimeout);
}

return {
targetDurationMs,
actualLifetimeMs: performance.now() - start,
outcome,
detail,
bytesReceived,
firstByteMs,
};
}

function formatResult(r: ProbeResult): string {
const lifeSec = (r.actualLifetimeMs / 1000).toFixed(1);
const ttfb = r.firstByteMs ? `${(r.firstByteMs / 1000).toFixed(1)}s` : "n/a";
const suffix = r.detail ? ` (${r.detail})` : "";
return ` target=${r.targetDurationMs / 1000}s lived=${lifeSec}s outcome=${r.outcome} bytes=${r.bytesReceived} ttfb=${ttfb}${suffix}`;
}

async function main(): Promise<void> {
const config = parseArgs(process.argv.slice(2));

if (!config.jsonOutput) {
process.stdout.write(`sse-timeout-probe → ${config.baseUrl}${config.path}\n`);
process.stdout.write(` durations: ${config.durationsMs.map((d) => `${d / 1000}s`).join(", ")}\n`);
process.stdout.write(` heartbeat: ${config.heartbeatMs === 0 ? "none (fully idle)" : `${config.heartbeatMs}ms`}\n\n`);
}

for (const duration of config.durationsMs) {
const result = await probeOnce(config, duration);
if (config.jsonOutput) {
process.stdout.write(`${JSON.stringify(result)}\n`);
} else {
process.stdout.write(`${formatResult(result)}\n`);
}
}
}

main().catch((err) => {
process.stderr.write(`probe failed: ${(err as Error).message}\n`);
process.exit(1);
});
59 changes: 59 additions & 0 deletions tools/sse-timeout-probe/server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/usr/bin/env tsx
/**
* Companion server for sse-timeout-probe.
*
* Serves a single SSE endpoint `/sse-probe` that keeps the connection open for a
* configurable duration, optionally sending a heartbeat comment. Intended to run
* inside a Databricks App so a client in the browser (or CLI) can stream against
* it and measure when the effective idle timeout kicks in.
*
* Deploy this as the app's entrypoint, or mount it alongside a larger app.
*/

import { createServer } from "node:http";

const port = Number.parseInt(process.env.PORT ?? process.env.DATABRICKS_APP_PORT ?? "8000", 10);

const server = createServer((req, res) => {
if (!req.url?.startsWith("/sse-probe")) {
res.writeHead(404, { "Content-Type": "text/plain" });
res.end("not found — try /sse-probe\n");
return;
}

const url = new URL(req.url, `http://${req.headers.host ?? "localhost"}`);
const holdMs = Math.max(0, Number.parseInt(url.searchParams.get("hold-ms") ?? "120000", 10));
const heartbeatMs = Math.max(0, Number.parseInt(url.searchParams.get("heartbeat-ms") ?? "0", 10));

res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
});

// Initial event so the client can measure time-to-first-byte.
res.write(`event: probe-start\ndata: ${JSON.stringify({ holdMs, heartbeatMs })}\n\n`);

let heartbeat: NodeJS.Timeout | undefined;
if (heartbeatMs > 0) {
heartbeat = setInterval(() => {
res.write(`: heartbeat ${Date.now()}\n\n`);
}, heartbeatMs);
}

const stop = setTimeout(() => {
if (heartbeat) clearInterval(heartbeat);
res.write(`event: probe-end\ndata: ${JSON.stringify({ reason: "hold-elapsed" })}\n\n`);
res.end();
}, holdMs);

req.on("close", () => {
if (heartbeat) clearInterval(heartbeat);
clearTimeout(stop);
});
});

server.listen(port, () => {
process.stdout.write(`sse-timeout-probe server listening on :${port}\n`);
});
Loading