diff --git a/xstreamroll-processing/__tests__/metrics.test.ts b/xstreamroll-processing/__tests__/metrics.test.ts index 54780de..23f61b2 100644 --- a/xstreamroll-processing/__tests__/metrics.test.ts +++ b/xstreamroll-processing/__tests__/metrics.test.ts @@ -1,8 +1,12 @@ +import { Server } from "http" +import { AddressInfo } from "net" +import axios from "axios" import { getMetrics, incrementProcessed, incrementErrors, setQueueDepth, + startMetricsServer, } from "../src/metrics" describe("metrics counters", () => { @@ -36,3 +40,82 @@ describe("metrics counters", () => { expect(getMetrics().uptimeSeconds).toBeGreaterThanOrEqual(0) }) }) + +describe("metrics server", () => { + let server: Server + let baseUrl: string + + beforeAll((done) => { + server = startMetricsServer(0) + server.on("listening", () => { + const addr = server.address() as AddressInfo + baseUrl = `http://localhost:${addr.port}` + done() + }) + }) + + afterAll((done) => { + server.close(done) + }) + + it("GET /metrics returns Prometheus format by default", async () => { + const res = await axios.get(`${baseUrl}/metrics`) + expect(res.status).toBe(200) + expect(res.headers["content-type"]).toBe("text/plain; version=0.0.4") + expect(res.data).toContain("# HELP xstreamroll_messages_processed_total") + expect(res.data).toContain("# TYPE xstreamroll_messages_processed_total counter") + expect(res.data).toContain("xstreamroll_messages_processed_total") + expect(res.data).toContain("# HELP xstreamroll_uptime_seconds") + }) + + it("GET /metrics/prometheus returns Prometheus format", async () => { + const res = await axios.get(`${baseUrl}/metrics/prometheus`) + expect(res.status).toBe(200) + expect(res.headers["content-type"]).toBe("text/plain; version=0.0.4") + expect(res.data).toContain("xstreamroll_uptime_seconds") + }) + + it("GET /metrics/json returns JSON format", async () => { + const res = await axios.get(`${baseUrl}/metrics/json`) + expect(res.status).toBe(200) + expect(res.headers["content-type"]).toBe("application/json") + expect(res.data).toHaveProperty("uptimeSeconds") + expect(res.data).toHaveProperty("messagesProcessed") + }) + + it("GET /metrics with Accept: application/json returns JSON format", async () => { + const res = await axios.get(`${baseUrl}/metrics`, { + headers: { Accept: "application/json" }, + }) + expect(res.status).toBe(200) + expect(res.headers["content-type"]).toBe("application/json") + expect(res.data).toHaveProperty("uptimeSeconds") + }) + + it("GET /metrics with Accept: application/json, text/plain returns Prometheus format", async () => { + const res = await axios.get(`${baseUrl}/metrics`, { + headers: { Accept: "application/json, text/plain" }, + }) + expect(res.status).toBe(200) + expect(res.headers["content-type"]).toBe("text/plain; version=0.0.4") + expect(res.data).toContain("xstreamroll_uptime_seconds") + }) + + it("GET /metrics with Accept: */* returns Prometheus format", async () => { + const res = await axios.get(`${baseUrl}/metrics`, { + headers: { Accept: "*/*" }, + }) + expect(res.status).toBe(200) + expect(res.headers["content-type"]).toBe("text/plain; version=0.0.4") + expect(res.data).toContain("xstreamroll_uptime_seconds") + }) + + it("GET /invalid-route returns 404", async () => { + await expect(axios.get(`${baseUrl}/invalid-route`)).rejects.toThrow() + try { + await axios.get(`${baseUrl}/invalid-route`) + } catch (err: any) { + expect(err.response.status).toBe(404) + } + }) +}) diff --git a/xstreamroll-processing/src/metrics.ts b/xstreamroll-processing/src/metrics.ts index 42939b3..e3c252f 100644 --- a/xstreamroll-processing/src/metrics.ts +++ b/xstreamroll-processing/src/metrics.ts @@ -35,14 +35,53 @@ export function getMetrics(): Metrics { export function startMetricsServer(port = 3002): ReturnType { const server = createServer((req: IncomingMessage, res: ServerResponse) => { - if (req.method === "GET" && req.url === "/metrics") { - const body = JSON.stringify(getMetrics()) - res.writeHead(200, { "Content-Type": "application/json" }) - res.end(body) - } else { - res.writeHead(404) - res.end() + const url = req.url || "" + const accept = (req.headers.accept || "").toLowerCase() + + if (req.method === "GET") { + if (url === "/metrics/json") { + const body = JSON.stringify(getMetrics()) + res.writeHead(200, { "Content-Type": "application/json" }) + res.end(body) + return + } + + if (url === "/metrics" || url === "/metrics/" || url === "/metrics/prometheus") { + const wantsJson = + accept.includes("application/json") && + !accept.includes("text/plain") && + !accept.includes("*/*") + + if (wantsJson) { + const body = JSON.stringify(getMetrics()) + res.writeHead(200, { "Content-Type": "application/json" }) + res.end(body) + } else { + const m = getMetrics() + const body = [ + `# HELP xstreamroll_messages_processed_total Total messages processed by the worker`, + `# TYPE xstreamroll_messages_processed_total counter`, + `xstreamroll_messages_processed_total ${m.messagesProcessed}`, + `# HELP xstreamroll_errors_total Total errors encountered by the worker`, + `# TYPE xstreamroll_errors_total counter`, + `xstreamroll_errors_total ${m.errors}`, + `# HELP xstreamroll_queue_depth Current queue depth of the worker`, + `# TYPE xstreamroll_queue_depth gauge`, + `xstreamroll_queue_depth ${m.queueDepth}`, + `# HELP xstreamroll_uptime_seconds Uptime of the worker in seconds`, + `# TYPE xstreamroll_uptime_seconds gauge`, + `xstreamroll_uptime_seconds ${m.uptimeSeconds}`, + ].join("\n") + "\n" + + res.writeHead(200, { "Content-Type": "text/plain; version=0.0.4" }) + res.end(body) + } + return + } } + + res.writeHead(404) + res.end() }) server.listen(port, () => {