Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions xstreamroll-processing/__tests__/metrics.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import { Server } from "http"
import { AddressInfo } from "net"
import axios from "axios"
import {
getMetrics,
incrementProcessed,
incrementErrors,
setQueueDepth,
startMetricsServer,
} from "../src/metrics"

describe("metrics counters", () => {
Expand Down Expand Up @@ -36,3 +40,82 @@
expect(getMetrics().uptimeSeconds).toBeGreaterThanOrEqual(0)
})
})

describe("metrics server", () => {
let server: Server
let baseUrl: string

beforeAll((done) => {
server = startMetricsServer(0)
server.on("listening", () => {
const addr = server.address() as AddressInfo
baseUrl = `http://localhost:${addr.port}`
done()
})
})

afterAll((done) => {
server.close(done)
})

it("GET /metrics returns Prometheus format by default", async () => {
const res = await axios.get(`${baseUrl}/metrics`)
expect(res.status).toBe(200)
expect(res.headers["content-type"]).toBe("text/plain; version=0.0.4")
expect(res.data).toContain("# HELP xstreamroll_messages_processed_total")
expect(res.data).toContain("# TYPE xstreamroll_messages_processed_total counter")
expect(res.data).toContain("xstreamroll_messages_processed_total")
expect(res.data).toContain("# HELP xstreamroll_uptime_seconds")
})

it("GET /metrics/prometheus returns Prometheus format", async () => {
const res = await axios.get(`${baseUrl}/metrics/prometheus`)
expect(res.status).toBe(200)
expect(res.headers["content-type"]).toBe("text/plain; version=0.0.4")
expect(res.data).toContain("xstreamroll_uptime_seconds")
})

it("GET /metrics/json returns JSON format", async () => {
const res = await axios.get(`${baseUrl}/metrics/json`)
expect(res.status).toBe(200)
expect(res.headers["content-type"]).toBe("application/json")
expect(res.data).toHaveProperty("uptimeSeconds")
expect(res.data).toHaveProperty("messagesProcessed")
})

it("GET /metrics with Accept: application/json returns JSON format", async () => {
const res = await axios.get(`${baseUrl}/metrics`, {
headers: { Accept: "application/json" },
})
expect(res.status).toBe(200)
expect(res.headers["content-type"]).toBe("application/json")
expect(res.data).toHaveProperty("uptimeSeconds")
})

it("GET /metrics with Accept: application/json, text/plain returns Prometheus format", async () => {
const res = await axios.get(`${baseUrl}/metrics`, {
headers: { Accept: "application/json, text/plain" },
})
expect(res.status).toBe(200)
expect(res.headers["content-type"]).toBe("text/plain; version=0.0.4")
expect(res.data).toContain("xstreamroll_uptime_seconds")
})

it("GET /metrics with Accept: */* returns Prometheus format", async () => {
const res = await axios.get(`${baseUrl}/metrics`, {
headers: { Accept: "*/*" },
})
expect(res.status).toBe(200)
expect(res.headers["content-type"]).toBe("text/plain; version=0.0.4")
expect(res.data).toContain("xstreamroll_uptime_seconds")
})

it("GET /invalid-route returns 404", async () => {
await expect(axios.get(`${baseUrl}/invalid-route`)).rejects.toThrow()
try {
await axios.get(`${baseUrl}/invalid-route`)
} catch (err: any) {

Check warning on line 117 in xstreamroll-processing/__tests__/metrics.test.ts

View workflow job for this annotation

GitHub Actions / quality

Unexpected any. Specify a different type
expect(err.response.status).toBe(404)
}
})
})
53 changes: 46 additions & 7 deletions xstreamroll-processing/src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,53 @@ export function getMetrics(): Metrics {

export function startMetricsServer(port = 3002): ReturnType<typeof createServer> {
const server = createServer((req: IncomingMessage, res: ServerResponse) => {
if (req.method === "GET" && req.url === "/metrics") {
const body = JSON.stringify(getMetrics())
res.writeHead(200, { "Content-Type": "application/json" })
res.end(body)
} else {
res.writeHead(404)
res.end()
const url = req.url || ""
const accept = (req.headers.accept || "").toLowerCase()

if (req.method === "GET") {
if (url === "/metrics/json") {
const body = JSON.stringify(getMetrics())
res.writeHead(200, { "Content-Type": "application/json" })
res.end(body)
return
}

if (url === "/metrics" || url === "/metrics/" || url === "/metrics/prometheus") {
const wantsJson =
accept.includes("application/json") &&
!accept.includes("text/plain") &&
!accept.includes("*/*")

if (wantsJson) {
const body = JSON.stringify(getMetrics())
res.writeHead(200, { "Content-Type": "application/json" })
res.end(body)
} else {
const m = getMetrics()
const body = [
`# HELP xstreamroll_messages_processed_total Total messages processed by the worker`,
`# TYPE xstreamroll_messages_processed_total counter`,
`xstreamroll_messages_processed_total ${m.messagesProcessed}`,
`# HELP xstreamroll_errors_total Total errors encountered by the worker`,
`# TYPE xstreamroll_errors_total counter`,
`xstreamroll_errors_total ${m.errors}`,
`# HELP xstreamroll_queue_depth Current queue depth of the worker`,
`# TYPE xstreamroll_queue_depth gauge`,
`xstreamroll_queue_depth ${m.queueDepth}`,
`# HELP xstreamroll_uptime_seconds Uptime of the worker in seconds`,
`# TYPE xstreamroll_uptime_seconds gauge`,
`xstreamroll_uptime_seconds ${m.uptimeSeconds}`,
].join("\n") + "\n"

res.writeHead(200, { "Content-Type": "text/plain; version=0.0.4" })
res.end(body)
}
return
}
}

res.writeHead(404)
res.end()
})

server.listen(port, () => {
Expand Down
Loading