From 61b645bb776b4397f5e22e20e7ff0c9bdbb911eb Mon Sep 17 00:00:00 2001 From: Max Gruenfelder Date: Sat, 7 Mar 2026 12:54:10 +0100 Subject: [PATCH 01/18] redis statistics --- .gitignore | 1 + src/dbHandler.js | 127 +++++++++--- src/shared/eventQueueStats.js | 146 ++++++++++++++ test/dbHandlerStats.test.js | 365 ++++++++++++++++++++++++++++++++++ test/eventQueueStats.test.js | 134 +++++++++++++ test/mocks/redisMock.js | 63 +++++- test/redisPubSub.test.js | 2 + 7 files changed, 812 insertions(+), 26 deletions(-) create mode 100644 src/shared/eventQueueStats.js create mode 100644 test/dbHandlerStats.test.js create mode 100644 test/eventQueueStats.test.js diff --git a/.gitignore b/.gitignore index 7ca70254..9c6cb6df 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ **/.project **/.settings *.code-workspace +CLAUDE.md # package managers bower_components/ diff --git a/src/dbHandler.js b/src/dbHandler.js index 030db39d..74e1169c 100644 --- a/src/dbHandler.js +++ b/src/dbHandler.js @@ -4,11 +4,14 @@ const cds = require("@sap/cds"); const redisPub = require("./redis/redisPub"); const config = require("./config"); +const eventQueueStats = require("./shared/eventQueueStats"); +const { EventProcessingStatus } = require("./constants"); const COMPONENT_NAME = "/eventQueue/dbHandler"; const registeredHandlers = { eventQueueDbHandler: false, beforeDbHandler: false, + updateDbHandler: false, }; const registerEventQueueDbHandler = (dbService) => { @@ -26,36 +29,112 @@ const registerEventQueueDbHandler = (dbService) => { req.tx._.eventQueuePublishEvents = req.tx._.eventQueuePublishEvents ?? {}; const eventQueuePublishEvents = req.tx._.eventQueuePublishEvents; const data = Array.isArray(req.query.INSERT.entries) ? req.query.INSERT.entries : [req.query.INSERT.entries]; - const eventCombinations = Object.keys( - data.reduce((result, event) => { - const key = [event.type, event.subType, event.namespace].join("##"); - if ( - !config.hasEventAfterCommitFlag(event.type, event.subType, event.namespace) || - eventQueuePublishEvents[key] - ) { - return result; - } + + req.tx._.eventQueueStatsOpenCount = (req.tx._.eventQueueStatsOpenCount ?? 0) + data.length; + const newCombinations = data.reduce((result, event) => { + const key = [event.type, event.subType, event.namespace].join("##"); + if (config.hasEventAfterCommitFlag(event.type, event.subType, event.namespace) && !eventQueuePublishEvents[key]) { eventQueuePublishEvents[key] = true; - result[key] = true; - return result; - }, {}) - ); + result.push(key); + } + return result; + }, []); - eventCombinations.length && + req.tx._.eventQueueBroadcastCombinations ??= []; + req.tx._.eventQueueBroadcastCombinations.push(...newCombinations); + if (!req.tx._.eventQueueSucceededHandlerRegistered) { + req.tx._.eventQueueSucceededHandlerRegistered = true; req.on("succeeded", () => { - const events = eventCombinations.map((eventCombination) => { - const [type, subType, namespace] = eventCombination.split("##"); - return { type, subType, namespace }; - }); - - redisPub.broadcastEvent(req.tenant, events).catch((err) => { - cds.log(COMPONENT_NAME).error("db handler failure during broadcasting event", err, { - tenant: req.tenant, - events, + if (config.redisEnabled && req.tx._.eventQueueStatsOpenCount) { + eventQueueStats + .incrementCounters(req.tenant, eventQueueStats.StatusField.Pending, req.tx._.eventQueueStatsOpenCount) + .catch((err) => { + cds.log(COMPONENT_NAME).error("db handler failure during updating event stats", err, { + tenant: req.tenant, + }); + }); + } + const combinations = req.tx._.eventQueueBroadcastCombinations; + if (combinations.length) { + const events = combinations.map((combination) => { + const [type, subType, namespace] = combination.split("##"); + return { type, subType, namespace }; }); - }); + redisPub.broadcastEvent(req.tenant, events).catch((err) => { + cds.log(COMPONENT_NAME).error("db handler failure during broadcasting event", err, { + tenant: req.tenant, + events, + }); + }); + } }); + } }); + + if (!registeredHandlers.updateDbHandler) { + registeredHandlers.updateDbHandler = true; + dbService.after("UPDATE", def, (affectedRows, req) => { + const newStatus = req.query.UPDATE?.data?.status; + if (newStatus == null) { + return; + } + + const count = typeof affectedRows === "number" && affectedRows > 0 ? affectedRows : 1; + + req.tx._ = req.tx._ ?? {}; + req.tx._.eventQueueStatsPendingDelta = req.tx._.eventQueueStatsPendingDelta ?? 0; + req.tx._.eventQueueStatsInProgressDelta = req.tx._.eventQueueStatsInProgressDelta ?? 0; + + if (newStatus === EventProcessingStatus.InProgress) { + req.tx._.eventQueueStatsPendingDelta -= count; + req.tx._.eventQueueStatsInProgressDelta += count; + } else if (newStatus === EventProcessingStatus.Error) { + req.tx._.eventQueueStatsInProgressDelta -= count; + req.tx._.eventQueueStatsPendingDelta += count; + } else if ( + newStatus === EventProcessingStatus.Done || + newStatus === EventProcessingStatus.Exceeded || + newStatus === EventProcessingStatus.Suspended + ) { + req.tx._.eventQueueStatsInProgressDelta -= count; + } + + if (!req.tx._.eventQueueUpdateSucceededHandlerRegistered) { + req.tx._.eventQueueUpdateSucceededHandlerRegistered = true; + req.on("succeeded", () => { + if (!config.redisEnabled) { + return; + } + + const pendingDelta = req.tx._.eventQueueStatsPendingDelta; + const inProgressDelta = req.tx._.eventQueueStatsInProgressDelta; + const ops = []; + + if (pendingDelta !== 0) { + ops.push( + eventQueueStats.adjustTenantCounter(req.tenant, eventQueueStats.StatusField.Pending, pendingDelta), + eventQueueStats.adjustGlobalCounter(eventQueueStats.StatusField.Pending, pendingDelta) + ); + } + if (inProgressDelta !== 0) { + ops.push( + eventQueueStats.adjustTenantCounter(req.tenant, eventQueueStats.StatusField.InProgress, inProgressDelta), + eventQueueStats.adjustGlobalCounter(eventQueueStats.StatusField.InProgress, inProgressDelta) + ); + } + Promise.allSettled(ops).then((results) => { + for (const result of results) { + if (result.status === "rejected") { + cds + .log(COMPONENT_NAME) + .error("db handler failure during updating event stats on update", result.reason, { tenant: req.tenant }); + } + } + }); + }); + } + }); + } }; module.exports = { diff --git a/src/shared/eventQueueStats.js b/src/shared/eventQueueStats.js new file mode 100644 index 00000000..6e7da197 --- /dev/null +++ b/src/shared/eventQueueStats.js @@ -0,0 +1,146 @@ +"use strict"; + +const cds = require("@sap/cds"); + +const redis = require("./redis"); +const config = require("../config"); + +const COMPONENT_NAME = "/eventQueue/eventQueueStats"; + +const StatusField = { + Pending: "pending", + InProgress: "inProgress", +}; + +const _tenantKey = (tenantId) => `${config.redisNamespace(true)}##stats##tenant##${tenantId}`; +const _globalKey = () => `${config.redisNamespace(true)}##stats##global`; + +/** + * Atomically adjusts a tenant's event counter for the given status field. + * + * @param {string} tenantId + * @param {string} field - one of StatusField.* + * @param {number} increment - positive to increment, negative to decrement + */ +const adjustTenantCounter = async (tenantId, field, increment) => { + const logger = cds.log(COMPONENT_NAME); + try { + const client = await redis.createMainClientAndConnect(); + await client.hIncrBy(_tenantKey(tenantId), field, increment); + } catch (err) { + logger.error("failed to adjust tenant stats counter", err, { tenantId, field, increment }); + } +}; + +/** + * Atomically adjusts the global event counter for the given status field. + * Also updates the `updatedAt` timestamp on the global hash. + * + * @param {string} field - one of StatusField.* + * @param {number} increment - positive to increment, negative to decrement + */ +const adjustGlobalCounter = async (field, increment) => { + const logger = cds.log(COMPONENT_NAME); + try { + const client = await redis.createMainClientAndConnect(); + await client.hIncrBy(_globalKey(), field, increment); + } catch (err) { + logger.error("failed to adjust global stats counter", err, { field, increment }); + } +}; + +/** + * Increments a tenant counter and the matching global counter in a single call. + * + * @param {string} tenantId + * @param {string} field - one of StatusField.* + * @param {number} [increment=1] + */ +const incrementCounters = async (tenantId, field, increment = 1) => { + await Promise.all([adjustTenantCounter(tenantId, field, increment), adjustGlobalCounter(field, increment)]); +}; + +/** + * Decrements a tenant counter and the matching global counter in a single call. + * + * @param {string} tenantId + * @param {string} field - one of StatusField.* + * @param {number} [decrement=1] + */ +const decrementCounters = async (tenantId, field, decrement = 1) => { + await Promise.all([adjustTenantCounter(tenantId, field, -decrement), adjustGlobalCounter(field, -decrement)]); +}; + +/** + * Returns the current stats hash for a single tenant. + * All counter values are returned as integers; missing fields default to 0. + * + * @param {string} tenantId + * @returns {Promise<{open: number, inProgress: number, error: number}>} + */ +const getTenantStats = async (tenantId) => { + const logger = cds.log(COMPONENT_NAME); + try { + const client = await redis.createMainClientAndConnect(); + const raw = await client.hGetAll(_tenantKey(tenantId)); + return _parseCounterHash(raw); + } catch (err) { + logger.error("failed to read tenant stats", err, { tenantId }); + return _emptyCounters(); + } +}; + +/** + * Returns the current global stats hash. + * All counter values are returned as integers; missing fields default to 0. + * + * @returns {Promise<{open: number, inProgress: number, error: number}>} + */ +const getGlobalStats = async () => { + const logger = cds.log(COMPONENT_NAME); + try { + const client = await redis.createMainClientAndConnect(); + const raw = await client.hGetAll(_globalKey()); + return _parseCounterHash(raw); + } catch (err) { + logger.error("failed to read global stats", err); + return _emptyCounters(); + } +}; + +/** + * Deletes the stats hash for a specific tenant. + * Intended for use during tenant offboarding. It does not adjust the global stats still will be fixed with the next global run + * + * @param {string} tenantId + */ +const deleteTenantStats = async (tenantId) => { + const logger = cds.log(COMPONENT_NAME); + try { + const client = await redis.createMainClientAndConnect(); + await client.del(_tenantKey(tenantId)); + } catch (err) { + logger.error("failed to delete tenant stats", err, { tenantId }); + } +}; + +const _parseCounterHash = (raw) => ({ + [StatusField.Pending]: raw[StatusField.Pending] != null ? parseInt(raw[StatusField.Pending]) : 0, + [StatusField.InProgress]: raw[StatusField.InProgress] != null ? parseInt(raw[StatusField.InProgress]) : 0, +}); + +const _emptyCounters = () => ({ + [StatusField.Pending]: 0, + [StatusField.InProgress]: 0, +}); + +module.exports = { + StatusField, + incrementCounters, + decrementCounters, + adjustTenantCounter, + adjustGlobalCounter, + getTenantStats, + getGlobalStats, + deleteTenantStats, +}; diff --git a/test/dbHandlerStats.test.js b/test/dbHandlerStats.test.js new file mode 100644 index 00000000..e50cb869 --- /dev/null +++ b/test/dbHandlerStats.test.js @@ -0,0 +1,365 @@ +"use strict"; + +const path = require("path"); + +const mockRedis = require("./mocks/redisMock"); +jest.mock("../src/shared/redis", () => mockRedis); + +const cds = require("@sap/cds/lib"); + +const eventQueue = require("../src"); +const config = require("../src/config"); +const { getTenantStats, getGlobalStats, StatusField } = require("../src/shared/eventQueueStats"); +const { EventProcessingStatus } = require("../src/constants"); +const { processEventQueue } = require("../src/processEventQueue"); +const testHelper = require("./helper"); +const { Logger: mockLogger } = require("./mocks/logger"); + +const outboxProject = path.join(__dirname, "asset", "outboxProject"); + +cds.env.requires.StandardService = { + impl: path.join(outboxProject, "srv/service/standard-service.js"), + outbox: { kind: "persistent-outbox" }, +}; + +cds.env.requires.NotificationService = { + impl: path.join(outboxProject, "srv/service/service.js"), + outbox: { kind: "persistent-outbox" }, +}; + +cds.test(outboxProject); + +describe("dbHandler - stats tracking via CAP outbox", () => { + let context, tx, loggerMock; + + beforeAll(async () => { + eventQueue.config.initialized = false; + await eventQueue.initialize({ + processEventsAfterPublish: false, + registerAsEventProcessor: false, + insertEventsBeforeCommit: true, + useAsCAPOutbox: true, + userId: "dummyTestUser", + }); + cds.emit("connect", await cds.connect.to("db")); + config.redisEnabled = true; + eventQueue.registerEventQueueDbHandler(cds.db); + loggerMock = mockLogger(); + }); + + beforeEach(async () => { + context = new cds.EventContext({ user: "testUser", tenant: 123 }); + tx = cds.tx(context); + await tx.run(DELETE.from("sap.eventqueue.Lock")); + await tx.run(DELETE.from("sap.eventqueue.Event")); + await commitAndOpenNew(); + mockRedis.clearState(); + jest.clearAllMocks(); + }); + + afterEach(async () => { + await tx.rollback(); + }); + + afterAll(async () => { + config.redisEnabled = false; + await cds.shutdown; + }); + + // ── CREATE handler tests ──────────────────────────────────────────────────── + + it("increments pending counter by 1 after single send and commit", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(1); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(1); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("accumulates pending counter for multiple sends in same transaction", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(3); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(3); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("does not increment counter when transaction is rolled back", async () => { + const innerTx = cds.tx(context); + const service = (await cds.connect.to("StandardService")).tx(innerTx.context); + await service.send("main", {}); + await innerTx.rollback(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + }); + + it("does not increment counter when redisEnabled is false", async () => { + config.redisEnabled = false; + try { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + } finally { + config.redisEnabled = true; + } + }); + + it("tracks stats per tenant while global counter aggregates", async () => { + // tenant 123: send 2 events + const service123 = (await cds.connect.to("StandardService")).tx(context); + await service123.send("main", {}); + await service123.send("main", {}); + await commitAndOpenNew(); + + // tenant 456: send 1 event in its own tx + const ctx456 = new cds.EventContext({ user: "testUser", tenant: 456 }); + const tx456 = cds.tx(ctx456); + const service456 = (await cds.connect.to("StandardService")).tx(ctx456); + await service456.send("main", {}); + await tx456.commit(); + + const stats123 = await getTenantStats(123); + expect(stats123[StatusField.Pending]).toBe(2); + + const stats456 = await getTenantStats(456); + expect(stats456[StatusField.Pending]).toBe(1); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(3); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + // ── UPDATE handler tests — direct status transitions ──────────────────────── + + describe("UPDATE handler — direct status transitions", () => { + it("Open → InProgress: decrements pending, increments inProgress", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(2); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + expect(globalStats[StatusField.InProgress]).toBe(2); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("InProgress → Done: decrements inProgress, no pending change", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); // pending=0, inProgress=2 + + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Done }) + .where({ status: EventProcessingStatus.InProgress }) + ); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("InProgress → Error: decrements inProgress, increments pending (will be retried)", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); // pending=0, inProgress=2 + + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Error }) + .where({ status: EventProcessingStatus.InProgress }) + ); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(2); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(2); + expect(globalStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("InProgress → Exceeded: decrements inProgress, no pending change", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); // pending=0, inProgress=2 + + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Exceeded }) + .where({ status: EventProcessingStatus.InProgress }) + ); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("two UPDATEs in one transaction accumulate into a single succeeded handler call", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + // Open→InProgress then InProgress→Done without an intermediate commit + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Done }) + .where({ status: EventProcessingStatus.InProgress }) + ); + await commitAndOpenNew(); + + // Net delta: pending -2, inProgress +2 then -2 → both counters at 0 + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("does not adjust counters when redisEnabled is false", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + config.redisEnabled = false; + try { + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(2); + expect(tenantStats[StatusField.InProgress]).toBe(0); + } finally { + config.redisEnabled = true; + } + }); + + it("does not adjust counters when transaction is rolled back", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + const innerTx = cds.tx(context); + await innerTx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await innerTx.rollback(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(2); + expect(tenantStats[StatusField.InProgress]).toBe(0); + }); + }); + + // ── processEventQueue integration tests ───────────────────────────────────── + + describe("processEventQueue integration — stats via real processing", () => { + it("successful processing transitions pending → inProgress → Done (counters reach zero)", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await commitAndOpenNew(); + + expect((await getTenantStats(123))[StatusField.Pending]).toBe(1); + expect((await getTenantStats(123))[StatusField.InProgress]).toBe(0); + + await processEventQueue(tx.context, "CAP_OUTBOX", "StandardService"); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + expect(globalStats[StatusField.InProgress]).toBe(0); + + await testHelper.selectEventQueueAndExpectDone(tx, { expectedLength: 1 }); + }); + + it("failed processing transitions pending → inProgress → Error → back to pending", async () => { + const service = cds.outboxed(await cds.connect.to("NotificationService")).tx(context); + await service.send("errorEvent", { to: "to", subject: "subject", body: "body" }); + await commitAndOpenNew(); + + expect((await getTenantStats(123))[StatusField.Pending]).toBe(1); + expect((await getTenantStats(123))[StatusField.InProgress]).toBe(0); + + await processEventQueue(tx.context, "CAP_OUTBOX", "NotificationService"); + await commitAndOpenNew(); + + // Error state means the event will be retried → counts as pending + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(1); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(1); + expect(globalStats[StatusField.InProgress]).toBe(0); + + await testHelper.selectEventQueueAndExpectError(tx, { expectedLength: 1 }); + }); + }); + + const commitAndOpenNew = async () => { + await tx.commit(); + context = new cds.EventContext({ user: "testUser", tenant: 123 }); + tx = cds.tx(context); + }; +}); diff --git a/test/eventQueueStats.test.js b/test/eventQueueStats.test.js new file mode 100644 index 00000000..6c3815ba --- /dev/null +++ b/test/eventQueueStats.test.js @@ -0,0 +1,134 @@ +"use strict"; + +const path = require("path"); + +const cds = require("@sap/cds/lib"); + +const mockRedis = require("./mocks/redisMock"); +jest.mock("../src/shared/redis", () => mockRedis); + +const eventQueue = require("../src"); +const { + StatusField, + incrementCounters, + decrementCounters, + adjustTenantCounter, + adjustGlobalCounter, + getTenantStats, + getGlobalStats, + deleteTenantStats, +} = require("../src/shared/eventQueueStats"); + +const project = __dirname + "/.."; +cds.test(project); + +describe("eventQueueStats", () => { + beforeAll(async () => { + const configFilePath = path.join(__dirname, "asset", "config.yml"); + await eventQueue.initialize({ + configFilePath, + processEventsAfterPublish: false, + registerAsEventProcessor: false, + }); + }); + + beforeEach(() => { + mockRedis.clearState(); + }); + + afterAll(() => cds.shutdown); + + describe("incrementCounters / decrementCounters", () => { + it("increments tenant and global counters", async () => { + await incrementCounters("t1", StatusField.Pending, 3); + + const tenant = await getTenantStats("t1"); + expect(tenant.pending).toBe(3); + expect(tenant.inProgress).toBe(0); + + const global = await getGlobalStats(); + expect(global.pending).toBe(3); + }); + + it("decrements tenant and global counters", async () => { + await incrementCounters("t1", StatusField.Pending, 5); + await decrementCounters("t1", StatusField.Pending, 2); + + const tenant = await getTenantStats("t1"); + expect(tenant.pending).toBe(3); + + const global = await getGlobalStats(); + expect(global.pending).toBe(3); + }); + + it("multiple tenants are tracked independently", async () => { + await incrementCounters("t1", StatusField.Pending, 2); + await incrementCounters("t2", StatusField.Pending, 5); + + expect((await getTenantStats("t1")).pending).toBe(2); + expect((await getTenantStats("t2")).pending).toBe(5); + }); + + it("global counter aggregates across all tenants", async () => { + await incrementCounters("t1", StatusField.InProgress, 1); + await incrementCounters("t2", StatusField.InProgress, 4); + + const global = await getGlobalStats(); + expect(global.inProgress).toBe(5); + }); + }); + + describe("adjustTenantCounter", () => { + it("creates hash with zero base when first incremented", async () => { + await adjustTenantCounter("t1", StatusField.InProgress, 1); + + const stats = await getTenantStats("t1"); + expect(stats.inProgress).toBe(1); + expect(stats.pending).toBe(0); + }); + + it("supports negative increments", async () => { + await adjustTenantCounter("t1", StatusField.Pending, 10); + await adjustTenantCounter("t1", StatusField.Pending, -3); + + expect((await getTenantStats("t1")).pending).toBe(7); + }); + }); + + describe("adjustGlobalCounter", () => { + it("increments the global counter for the given field", async () => { + await adjustGlobalCounter(StatusField.Pending, 7); + + const global = await getGlobalStats(); + expect(global.pending).toBe(7); + }); + }); + + describe("getTenantStats", () => { + it("returns all-zero object for unknown tenant", async () => { + const stats = await getTenantStats("unknown-tenant"); + expect(stats).toEqual({ pending: 0, inProgress: 0 }); + }); + }); + + describe("getGlobalStats", () => { + it("returns all-zero object when no data exists", async () => { + const stats = await getGlobalStats(); + expect(stats).toEqual({ pending: 0, inProgress: 0 }); + }); + }); + + describe("deleteTenantStats", () => { + it("removes the tenant hash", async () => { + await incrementCounters("t1", StatusField.Pending, 5); + await deleteTenantStats("t1"); + + const stats = await getTenantStats("t1"); + expect(stats).toEqual({ pending: 0, inProgress: 0 }); + }); + + it("does not throw when tenant does not exist", async () => { + await expect(deleteTenantStats("nonexistent")).resolves.toBeUndefined(); + }); + }); +}); diff --git a/test/mocks/redisMock.js b/test/mocks/redisMock.js index a0f03202..5cb66f8c 100644 --- a/test/mocks/redisMock.js +++ b/test/mocks/redisMock.js @@ -2,7 +2,8 @@ let state = {}; let testState = {}; -const _createMainClientAndConnect = async () => ({ + +const _buildClient = () => ({ get: async (key) => state[key]?.value ?? null, exists: async (key) => Object.prototype.hasOwnProperty.call(state, key), set: async (key, value, options) => { @@ -13,12 +14,70 @@ const _createMainClientAndConnect = async () => ({ testState[key] = { value, options }; return "OK"; }, - del: async (key) => delete state[key], + del: async (key) => { + const existed = Object.prototype.hasOwnProperty.call(state, key); + delete state[key]; + return existed ? 1 : 0; + }, + hIncrBy: async (key, field, increment) => { + if (!state[key]) { + state[key] = { hash: {} }; + } + const current = parseInt(state[key].hash[field] ?? "0", 10); + state[key].hash[field] = String(current + increment); + return current + increment; + }, + hSet: async (key, field, value) => { + if (!state[key]) { + state[key] = { hash: {} }; + } + const isNew = !Object.prototype.hasOwnProperty.call(state[key].hash, field); + state[key].hash[field] = String(value); + return isNew ? 1 : 0; + }, + hGetAll: async (key) => { + return state[key]?.hash ?? {}; + }, + multi: () => { + const ops = []; + const pipeline = { + hIncrBy: (key, field, increment) => { + ops.push(async () => { + if (!state[key]) { + state[key] = { hash: {} }; + } + const current = parseInt(state[key].hash[field] ?? "0", 10); + state[key].hash[field] = String(current + increment); + return current + increment; + }); + return pipeline; + }, + hSet: (key, field, value) => { + ops.push(async () => { + if (!state[key]) { + state[key] = { hash: {} }; + } + state[key].hash[field] = String(value); + }); + return pipeline; + }, + exec: async () => { + const results = []; + for (const op of ops) { + results.push(await op()); + } + return results; + }, + }; + return pipeline; + }, _: { state, }, }); +const _createMainClientAndConnect = async () => _buildClient(); + module.exports = { attachRedisUnsubscribeHandler: () => {}, subscribeRedisChannel: () => {}, diff --git a/test/redisPubSub.test.js b/test/redisPubSub.test.js index 37fcd9ed..24df9362 100644 --- a/test/redisPubSub.test.js +++ b/test/redisPubSub.test.js @@ -10,6 +10,7 @@ const setTimeoutSpy = jest.spyOn(global, "setTimeout").mockImplementation((first }); const distributedLock = require("../src/shared/distributedLock"); +const eventQueueStats = require("../src/shared/eventQueueStats"); const checkLockExistsSpy = jest.spyOn(distributedLock, "checkLockExists"); const config = require("../src/config"); const redisPub = require("../src/redis/redisPub"); @@ -67,6 +68,7 @@ describe("eventQueue Redis Events and DB Handlers", () => { eventQueue.registerEventQueueDbHandler(cds.db); loggerMock = mockLogger(); jest.spyOn(cds.utils, "uuid").mockReturnValue("6e31047a-d2b5-4e3c-83d8-deab20165956"); + jest.spyOn(eventQueueStats, "incrementCounters").mockResolvedValue(); }); beforeEach(async () => { From b4ab94d7b0af0b0cb897dc779b99cfe5cb88ec64 Mon Sep 17 00:00:00 2001 From: Max Gruenfelder Date: Sat, 7 Mar 2026 12:56:16 +0100 Subject: [PATCH 02/18] remove comments --- test/dbHandlerStats.test.js | 4 ---- 1 file changed, 4 deletions(-) diff --git a/test/dbHandlerStats.test.js b/test/dbHandlerStats.test.js index e50cb869..36a34028 100644 --- a/test/dbHandlerStats.test.js +++ b/test/dbHandlerStats.test.js @@ -66,8 +66,6 @@ describe("dbHandler - stats tracking via CAP outbox", () => { await cds.shutdown; }); - // ── CREATE handler tests ──────────────────────────────────────────────────── - it("increments pending counter by 1 after single send and commit", async () => { const service = (await cds.connect.to("StandardService")).tx(context); await service.send("main", {}); @@ -154,8 +152,6 @@ describe("dbHandler - stats tracking via CAP outbox", () => { expect(loggerMock.callsLengths().error).toBe(0); }); - // ── UPDATE handler tests — direct status transitions ──────────────────────── - describe("UPDATE handler — direct status transitions", () => { it("Open → InProgress: decrements pending, increments inProgress", async () => { const service = (await cds.connect.to("StandardService")).tx(context); From 6cc055efde72c2c41b1cf6a15c35e45e7ee89046 Mon Sep 17 00:00:00 2001 From: Max Gruenfelder Date: Sat, 7 Mar 2026 12:57:30 +0100 Subject: [PATCH 03/18] remove comments --- test/dbHandlerStats.test.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/dbHandlerStats.test.js b/test/dbHandlerStats.test.js index 36a34028..2bcd8565 100644 --- a/test/dbHandlerStats.test.js +++ b/test/dbHandlerStats.test.js @@ -304,8 +304,6 @@ describe("dbHandler - stats tracking via CAP outbox", () => { }); }); - // ── processEventQueue integration tests ───────────────────────────────────── - describe("processEventQueue integration — stats via real processing", () => { it("successful processing transitions pending → inProgress → Done (counters reach zero)", async () => { const service = (await cds.connect.to("StandardService")).tx(context); From 8295092143dfa185897daa7eda6e8ce931d8c064 Mon Sep 17 00:00:00 2001 From: Max Gruenfelder Date: Sat, 7 Mar 2026 21:05:52 +0100 Subject: [PATCH 04/18] fix --- test-integration/keep-alive-tx-handling-e2e.test.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test-integration/keep-alive-tx-handling-e2e.test.js b/test-integration/keep-alive-tx-handling-e2e.test.js index 748e34fa..c4e54397 100644 --- a/test-integration/keep-alive-tx-handling-e2e.test.js +++ b/test-integration/keep-alive-tx-handling-e2e.test.js @@ -20,6 +20,7 @@ const distributedLock = require("../src/shared/distributedLock"); const periodicEvents = require("../src/periodicEvents"); const { publishEvent } = require("../src/publishEvent"); const redisPub = require("../src/redis/redisPub"); +const eventQueueStats = require("../src/shared/eventQueueStats"); const configFilePath = path.join(__dirname, "..", "./test", "asset", "config.yml"); @@ -48,6 +49,7 @@ describe("keep-alive-tx-handling-e2e", () => { registerAsEventProcessor: false, isEventQueueActive: false, }); + jest.spyOn(eventQueueStats, "incrementCounters").mockResolvedValue(); loggerMock = mockLogger(); const db = await cds.connect.to("db"); From 6889a63189a8f72824414be37f901f3bd5f9029a Mon Sep 17 00:00:00 2001 From: Max Gruenfelder Date: Sat, 7 Mar 2026 23:00:41 +0100 Subject: [PATCH 05/18] wip --- src/dbHandler.js | 4 +++- .../keep-alive-tx-handling-e2e.test.js | 18 +++++++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/dbHandler.js b/src/dbHandler.js index 74e1169c..db00cdf7 100644 --- a/src/dbHandler.js +++ b/src/dbHandler.js @@ -127,7 +127,9 @@ const registerEventQueueDbHandler = (dbService) => { if (result.status === "rejected") { cds .log(COMPONENT_NAME) - .error("db handler failure during updating event stats on update", result.reason, { tenant: req.tenant }); + .error("db handler failure during updating event stats on update", result.reason, { + tenant: req.tenant, + }); } } }); diff --git a/test-integration/keep-alive-tx-handling-e2e.test.js b/test-integration/keep-alive-tx-handling-e2e.test.js index c4e54397..485df2c4 100644 --- a/test-integration/keep-alive-tx-handling-e2e.test.js +++ b/test-integration/keep-alive-tx-handling-e2e.test.js @@ -105,13 +105,17 @@ describe("keep-alive-tx-handling-e2e", () => { describe("commitOnEventLevel", () => { it("straight forward", async () => { - await cds.tx({}, (tx2) => - testHelper.insertEventEntry(tx2, { - numberOfEntries: 2, - type: isolatedNoParallel.type, - subType: isolatedNoParallel.subType, - }) - ); + try { + await cds.tx({}, (tx2) => + testHelper.insertEventEntry(tx2, { + numberOfEntries: 2, + type: isolatedNoParallel.type, + subType: isolatedNoParallel.subType, + }) + ); + } catch (err) { + debugger; + } const { db } = cds.services; const { Event } = cds.entities("sap.eventqueue"); let forUpdateCounter = 0; From e869f2f947ee343c641833c1599d9680de4c8045 Mon Sep 17 00:00:00 2001 From: Max Gruenfelder Date: Sat, 7 Mar 2026 23:33:12 +0100 Subject: [PATCH 06/18] wip --- .../__snapshots__/e2e-redis.test.js.snap | 11 ++++++++++- .../keep-alive-tx-handling-e2e.test.js | 19 ++++++++----------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/test-integration/__snapshots__/e2e-redis.test.js.snap b/test-integration/__snapshots__/e2e-redis.test.js.snap index 2f662a8e..d7f8e85d 100644 --- a/test-integration/__snapshots__/e2e-redis.test.js.snap +++ b/test-integration/__snapshots__/e2e-redis.test.js.snap @@ -12,6 +12,8 @@ exports[`end-to-end redis broadcast checkAndInsertPeriodicEvents should insert n [ "EVENT_QUEUE##default##RUN_REDIS_CHECK", "EVENT_QUEUE##default####TEST_STATIC", + "EVENT_QUEUE##default##stats##tenant##undefined", + "EVENT_QUEUE##default##stats##global", ] `; @@ -24,6 +26,8 @@ exports[`end-to-end redis broadcast insert entry: redis broadcast + process 1`] exports[`end-to-end redis broadcast insert entry: redis broadcast + process 2`] = ` [ + "EVENT_QUEUE##default##stats##tenant##undefined", + "EVENT_QUEUE##default##stats##global", "EVENT_QUEUE##default####TEST_STATIC", ] `; @@ -34,4 +38,9 @@ exports[`end-to-end runner should select open events and process + validate skip ] `; -exports[`end-to-end runner should select open events and process + validate skip broadcast 2`] = `[]`; +exports[`end-to-end runner should select open events and process + validate skip broadcast 2`] = ` +[ + "EVENT_QUEUE##default##stats##tenant##undefined", + "EVENT_QUEUE##default##stats##global", +] +`; diff --git a/test-integration/keep-alive-tx-handling-e2e.test.js b/test-integration/keep-alive-tx-handling-e2e.test.js index 485df2c4..f7783815 100644 --- a/test-integration/keep-alive-tx-handling-e2e.test.js +++ b/test-integration/keep-alive-tx-handling-e2e.test.js @@ -105,17 +105,14 @@ describe("keep-alive-tx-handling-e2e", () => { describe("commitOnEventLevel", () => { it("straight forward", async () => { - try { - await cds.tx({}, (tx2) => - testHelper.insertEventEntry(tx2, { - numberOfEntries: 2, - type: isolatedNoParallel.type, - subType: isolatedNoParallel.subType, - }) - ); - } catch (err) { - debugger; - } + await cds.tx({}, (tx2) => + testHelper.insertEventEntry(tx2, { + numberOfEntries: 2, + type: isolatedNoParallel.type, + subType: isolatedNoParallel.subType, + }) + ); + const { db } = cds.services; const { Event } = cds.entities("sap.eventqueue"); let forUpdateCounter = 0; From 96f8b70369bbfbd9d7da7d4c822e8c22baf68b4a Mon Sep 17 00:00:00 2001 From: Max Gruenfelder Date: Sun, 8 Mar 2026 09:45:10 +0100 Subject: [PATCH 07/18] update statistic via runner --- src/runner/openEvents.js | 12 ++--- src/runner/runner.js | 24 +++++++++- src/shared/eventQueueStats.js | 44 +++++++++++++------ .../__snapshots__/runner.test.js.snap | 31 ++++++++++++- test-integration/e2e-redis.test.js | 1 - test-integration/runner.test.js | 44 +++++++++++++++++++ 6 files changed, 133 insertions(+), 23 deletions(-) diff --git a/src/runner/openEvents.js b/src/runner/openEvents.js index fc95f75a..71058dfa 100644 --- a/src/runner/openEvents.js +++ b/src/runner/openEvents.js @@ -29,12 +29,12 @@ const getOpenQueueEntries = async (tx, filterAppSpecificEvents = true) => { new Date(startTime.getTime() - 30 * MS_IN_DAYS).toISOString(), ")" ) - .columns("type", "subType", "namespace") + .columns("count(ID) as count", "type", "subType", "namespace") .groupBy("type", "subType", "namespace") ); const result = []; - for (const { type, subType, namespace } of entries) { + for (const { type, subType, namespace, count } of entries) { if (config.isCapOutboxEvent(type)) { const { srvName, actionName } = config.normalizeSubType(type, subType); try { @@ -48,14 +48,14 @@ const getOpenQueueEntries = async (tx, filterAppSpecificEvents = true) => { config.addCAPServiceWithoutEnvConfig(subType, service); } if (config.shouldBeProcessedInThisApplication(type, subType, namespace)) { - result.push({ namespace, type, subType }); + result.push({ namespace, type, subType, count }); } } } catch { /* ignore catch */ } finally { if (!filterAppSpecificEvents) { - result.push({ namespace, type, subType }); + result.push({ namespace, type, subType, count }); } } } else { @@ -64,10 +64,10 @@ const getOpenQueueEntries = async (tx, filterAppSpecificEvents = true) => { config.getEventConfig(type, subType, namespace) && config.shouldBeProcessedInThisApplication(type, subType, namespace) ) { - result.push({ namespace, type, subType }); + result.push({ namespace, type, subType, count }); } } else { - result.push({ namespace, type, subType }); + result.push({ namespace, type, subType, count }); } } } diff --git a/src/runner/runner.js b/src/runner/runner.js index 9295041c..d4095265 100644 --- a/src/runner/runner.js +++ b/src/runner/runner.js @@ -14,6 +14,7 @@ const common = require("../shared/common"); const config = require("../config"); const redisPub = require("../redis/redisPub"); const openEvents = require("./openEvents"); +const eventQueueStats = require("../shared/eventQueueStats"); const { runEventCombinationForTenant } = require("./runnerHelper"); const { trace } = require("../shared/openTelemetry"); @@ -141,7 +142,7 @@ const _executeEventsAllTenantsRedis = async (tenantIds) => { } catch (err) { logger.error("executing event queue run for multi instance and tenant failed", err); } - + const tenantCounts = {}; for (const tenantId of tenantIds) { try { await cds.tx({ tenant: tenantId }, async (tx) => { @@ -160,6 +161,16 @@ const _executeEventsAllTenantsRedis = async (tenantIds) => { tenantId, entries: entries.length, }); + tenantCounts[tenantId] = entries; + const pendingByNamespace = {}; + for (const entry of entries) { + pendingByNamespace[entry.namespace] = (pendingByNamespace[entry.namespace] ?? 0) + entry.count; + } + for (const [namespace, count] of Object.entries(pendingByNamespace)) { + eventQueueStats + .setTenantCounter(tenantId, namespace, eventQueueStats.StatusField.Pending, count) + .catch((err) => logger.error("updating tenant stats failed", err, { tenantId, namespace })); + } if (!entries.length) { return; } @@ -178,6 +189,17 @@ const _executeEventsAllTenantsRedis = async (tenantIds) => { logger.error("broadcasting events for tenant failed", { tenantId }, err); } } + const globalPendingByNamespace = {}; + for (const tenantEntries of Object.values(tenantCounts)) { + for (const entry of tenantEntries) { + globalPendingByNamespace[entry.namespace] = (globalPendingByNamespace[entry.namespace] ?? 0) + entry.count; + } + } + for (const [namespace, count] of Object.entries(globalPendingByNamespace)) { + eventQueueStats + .setGlobalCounter(namespace, eventQueueStats.StatusField.Pending, count) + .catch((err) => logger.error("updating global stats failed", err, { namespace })); + } }; const _executeEventsAllTenants = async (tenantIds) => { diff --git a/src/shared/eventQueueStats.js b/src/shared/eventQueueStats.js index 6e7da197..5edbafef 100644 --- a/src/shared/eventQueueStats.js +++ b/src/shared/eventQueueStats.js @@ -14,6 +14,7 @@ const StatusField = { const _tenantKey = (tenantId) => `${config.redisNamespace(true)}##stats##tenant##${tenantId}`; const _globalKey = () => `${config.redisNamespace(true)}##stats##global`; +const _keyPrefix = (namespace) => `${config.redisNamespace(false)}##${namespace}`; /** * Atomically adjusts a tenant's event counter for the given status field. @@ -23,12 +24,11 @@ const _globalKey = () => `${config.redisNamespace(true)}##stats##global`; * @param {number} increment - positive to increment, negative to decrement */ const adjustTenantCounter = async (tenantId, field, increment) => { - const logger = cds.log(COMPONENT_NAME); try { const client = await redis.createMainClientAndConnect(); await client.hIncrBy(_tenantKey(tenantId), field, increment); } catch (err) { - logger.error("failed to adjust tenant stats counter", err, { tenantId, field, increment }); + cds.log(COMPONENT_NAME).error("failed to adjust tenant stats counter", err, { tenantId, field, increment }); } }; @@ -40,12 +40,11 @@ const adjustTenantCounter = async (tenantId, field, increment) => { * @param {number} increment - positive to increment, negative to decrement */ const adjustGlobalCounter = async (field, increment) => { - const logger = cds.log(COMPONENT_NAME); try { const client = await redis.createMainClientAndConnect(); await client.hIncrBy(_globalKey(), field, increment); } catch (err) { - logger.error("failed to adjust global stats counter", err, { field, increment }); + cds.log(COMPONENT_NAME).error("failed to adjust global stats counter", err, { field, increment }); } }; @@ -57,7 +56,7 @@ const adjustGlobalCounter = async (field, increment) => { * @param {number} [increment=1] */ const incrementCounters = async (tenantId, field, increment = 1) => { - await Promise.all([adjustTenantCounter(tenantId, field, increment), adjustGlobalCounter(field, increment)]); + await Promise.allSettled([adjustTenantCounter(tenantId, field, increment), adjustGlobalCounter(field, increment)]); }; /** @@ -68,7 +67,7 @@ const incrementCounters = async (tenantId, field, increment = 1) => { * @param {number} [decrement=1] */ const decrementCounters = async (tenantId, field, decrement = 1) => { - await Promise.all([adjustTenantCounter(tenantId, field, -decrement), adjustGlobalCounter(field, -decrement)]); + await Promise.allSettled([adjustTenantCounter(tenantId, field, -decrement), adjustGlobalCounter(field, -decrement)]); }; /** @@ -76,16 +75,15 @@ const decrementCounters = async (tenantId, field, decrement = 1) => { * All counter values are returned as integers; missing fields default to 0. * * @param {string} tenantId - * @returns {Promise<{open: number, inProgress: number, error: number}>} + * @returns {Promise<{pending: number, inProgress: number}>} */ const getTenantStats = async (tenantId) => { - const logger = cds.log(COMPONENT_NAME); try { const client = await redis.createMainClientAndConnect(); const raw = await client.hGetAll(_tenantKey(tenantId)); return _parseCounterHash(raw); } catch (err) { - logger.error("failed to read tenant stats", err, { tenantId }); + cds.log(COMPONENT_NAME).error("failed to read tenant stats", err, { tenantId }); return _emptyCounters(); } }; @@ -94,16 +92,15 @@ const getTenantStats = async (tenantId) => { * Returns the current global stats hash. * All counter values are returned as integers; missing fields default to 0. * - * @returns {Promise<{open: number, inProgress: number, error: number}>} + * @returns {Promise<{pending: number, inProgress: number}>} */ const getGlobalStats = async () => { - const logger = cds.log(COMPONENT_NAME); try { const client = await redis.createMainClientAndConnect(); const raw = await client.hGetAll(_globalKey()); return _parseCounterHash(raw); } catch (err) { - logger.error("failed to read global stats", err); + cds.log(COMPONENT_NAME).error("failed to read global stats", err); return _emptyCounters(); } }; @@ -114,13 +111,30 @@ const getGlobalStats = async () => { * * @param {string} tenantId */ +const setTenantCounter = async (tenantId, namespace, field, value) => { + try { + const client = await redis.createMainClientAndConnect(); + await client.hSet(`${_keyPrefix(namespace)}##stats##tenant##${tenantId}`, field, value); + } catch (err) { + cds.log(COMPONENT_NAME).error("failed to set tenant stats counter", err, { tenantId, namespace, field, value }); + } +}; + +const setGlobalCounter = async (namespace, field, value) => { + try { + const client = await redis.createMainClientAndConnect(); + await client.hSet(`${_keyPrefix(namespace)}##stats##global`, field, value); + } catch (err) { + cds.log(COMPONENT_NAME).error("failed to set global stats counter", err, { namespace, field, value }); + } +}; + const deleteTenantStats = async (tenantId) => { - const logger = cds.log(COMPONENT_NAME); try { const client = await redis.createMainClientAndConnect(); await client.del(_tenantKey(tenantId)); } catch (err) { - logger.error("failed to delete tenant stats", err, { tenantId }); + cds.log(COMPONENT_NAME).error("failed to delete tenant stats", err, { tenantId }); } }; @@ -140,6 +154,8 @@ module.exports = { decrementCounters, adjustTenantCounter, adjustGlobalCounter, + setTenantCounter, + setGlobalCounter, getTenantStats, getGlobalStats, deleteTenantStats, diff --git a/test-integration/__snapshots__/runner.test.js.snap b/test-integration/__snapshots__/runner.test.js.snap index 54c00387..d1617495 100644 --- a/test-integration/__snapshots__/runner.test.js.snap +++ b/test-integration/__snapshots__/runner.test.js.snap @@ -95,6 +95,7 @@ exports[`runner redis multi tenant tenant id filter should not acquire lock - on "e9bb8ec0-c85e-4035-b7cf-1b11ba8e5792", [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", @@ -105,6 +106,7 @@ exports[`runner redis multi tenant tenant id filter should not acquire lock - on "cd805323-879c-4bf7-b19c-8ffbbee22e1f", [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", @@ -115,6 +117,7 @@ exports[`runner redis multi tenant tenant id filter should not acquire lock - on "9f3ed8f0-8aaf-439e-a96a-04cd5b680c59", [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", @@ -126,7 +129,20 @@ exports[`runner redis multi tenant tenant id filter should not acquire lock - on exports[`runner redis multi tenant tenant id filter should not acquire lock - only process tenants based on tenant filter with open events - split into two instances 2`] = `[]`; -exports[`runner redis multi tenant tenant id filter should not acquire lock - only process tenants based on tenant filter with open events - split into two instances 3`] = `{}`; +exports[`runner redis multi tenant tenant id filter should not acquire lock - only process tenants based on tenant filter with open events - split into two instances 3`] = ` +{ + "EVENT_QUEUE##default##stats##global": { + "hash": { + "pending": "2", + }, + }, + "EVENT_QUEUE##default##stats##tenant##TEST_STATIC": { + "hash": { + "pending": "1", + }, + }, +} +`; exports[`runner redis multi tenant with open events - broadcast should be called 1`] = ` [ @@ -134,6 +150,7 @@ exports[`runner redis multi tenant with open events - broadcast should be called "cd805323-879c-4bf7-b19c-8ffbbee22e1f", [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", @@ -144,6 +161,7 @@ exports[`runner redis multi tenant with open events - broadcast should be called "9f3ed8f0-8aaf-439e-a96a-04cd5b680c59", [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", @@ -154,6 +172,7 @@ exports[`runner redis multi tenant with open events - broadcast should be called "e9bb8ec0-c85e-4035-b7cf-1b11ba8e5792", [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", @@ -203,6 +222,16 @@ exports[`runner redis multi tenant with open events - broadcast should be called "PX": 60000, }, }, + "EVENT_QUEUE##default##stats##global": { + "hash": { + "pending": "3", + }, + }, + "EVENT_QUEUE##default##stats##tenant##TEST_STATIC": { + "hash": { + "pending": "1", + }, + }, } `; diff --git a/test-integration/e2e-redis.test.js b/test-integration/e2e-redis.test.js index e310968c..4ee0b4c2 100644 --- a/test-integration/e2e-redis.test.js +++ b/test-integration/e2e-redis.test.js @@ -82,7 +82,6 @@ describe("end-to-end", () => { beforeEach(async () => { await DELETE.from("sap.eventqueue.Lock"); await DELETE.from("sap.eventqueue.Event"); - await DELETE.from("cds.outbox.Messages"); jest.clearAllMocks(); redisMock.clearState(); redisMock.clearTestState(); diff --git a/test-integration/runner.test.js b/test-integration/runner.test.js index 79e8a153..366fa217 100644 --- a/test-integration/runner.test.js +++ b/test-integration/runner.test.js @@ -7,6 +7,7 @@ cds.test(__dirname + "/_env"); const mockRedis = require("../test/mocks/redisMock"); jest.mock("../src/shared/redis", () => mockRedis); +const { getTenantStats, getGlobalStats, StatusField } = require("../src/shared/eventQueueStats"); const WorkerQueue = require("../src/shared/WorkerQueue"); const processEventQueue = require("../src/processEventQueue"); const openEvents = require("../src/runner/openEvents"); @@ -176,6 +177,49 @@ describe("runner", () => { expect(loggerMock.callsLengths().error).toEqual(0); }); + describe("stats tracking", () => { + it("sets pending counter per tenant and globally after runner with open events", async () => { + await cds.tx({}, async (tx2) => { + await periodicEvents.checkAndInsertPeriodicEvents(tx2.context); + }); + mockTenantIds(tenantIds); + jest.spyOn(redisPub, "broadcastEvent").mockResolvedValue(); + jest.spyOn(periodicEvents, "checkAndInsertPeriodicEvents").mockResolvedValue(); + + await Promise.allSettled([runner.__._multiTenancyRedis(), runner.__._multiTenancyRedis()]); + await Promise.allSettled(WorkerQueue.instance.runningPromises); + await new Promise(setImmediate); + + for (const tenantId of tenantIds) { + const stats = await getTenantStats(tenantId); + expect(stats[StatusField.Pending]).toBe(1); + } + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(tenantIds.length); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("does not update stats when there are no open events", async () => { + mockTenantIds(tenantIds); + jest.spyOn(redisPub, "broadcastEvent").mockResolvedValue(); + jest.spyOn(periodicEvents, "checkAndInsertPeriodicEvents").mockResolvedValue(); + + await Promise.allSettled([runner.__._multiTenancyRedis()]); + await Promise.allSettled(WorkerQueue.instance.runningPromises); + await new Promise(setImmediate); + + for (const tenantId of tenantIds) { + const stats = await getTenantStats(tenantId); + expect(stats[StatusField.Pending]).toBe(0); + } + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + }); + describe("tenant id filter should not acquire lock - only process tenants based on tenant filter", () => { it("always false", async () => { await cds.tx({}, async (tx2) => { From 6aa10f6bc3e3c5991d0cea591d883c9fcbd542ab Mon Sep 17 00:00:00 2001 From: Max Gruenfelder Date: Sun, 8 Mar 2026 10:41:05 +0100 Subject: [PATCH 08/18] report metrics to open telemtry --- src/initialize.js | 2 ++ src/shared/eventQueueStats.js | 21 ++++++++++++ src/shared/openTelemetry.js | 60 ++++++++++++++++++++++++++++++++++- 3 files changed, 82 insertions(+), 1 deletion(-) diff --git a/src/initialize.js b/src/initialize.js index 392c55e8..7ba7f0ef 100644 --- a/src/initialize.js +++ b/src/initialize.js @@ -17,6 +17,7 @@ const { getAllTenantIds } = require("./shared/cdsHelper"); const { EventProcessingStatus } = require("./constants"); const distributedLock = require("./shared/distributedLock"); const EventQueueError = require("./EventQueueError"); +const { initMetrics } = require("./shared/openTelemetry"); const readFileAsync = promisify(fs.readFile); @@ -125,6 +126,7 @@ const initialize = async (options = {}) => { runInterval: config.runInterval, useAsCAPQueue: config.useAsCAPQueue, }); + initMetrics(); resolveFn(); }; diff --git a/src/shared/eventQueueStats.js b/src/shared/eventQueueStats.js index 5edbafef..2010a36c 100644 --- a/src/shared/eventQueueStats.js +++ b/src/shared/eventQueueStats.js @@ -129,6 +129,26 @@ const setGlobalCounter = async (namespace, field, value) => { } }; +const getAllNamespaceStats = async () => { + const namespaces = config.processingNamespaces; + const client = await redis.createMainClientAndConnect(); + const results = await Promise.allSettled( + namespaces.map(async (namespace) => { + const raw = await client.hGetAll(`${_keyPrefix(namespace)}##stats##global`); + return { namespace, stats: _parseCounterHash(raw) }; + }) + ); + const out = {}; + for (const result of results) { + if (result.status === "fulfilled") { + out[result.value.namespace] = result.value.stats; + } else { + cds.log(COMPONENT_NAME).error("failed to read namespace stats", result.reason); + } + } + return out; +}; + const deleteTenantStats = async (tenantId) => { try { const client = await redis.createMainClientAndConnect(); @@ -156,6 +176,7 @@ module.exports = { adjustGlobalCounter, setTenantCounter, setGlobalCounter, + getAllNamespaceStats, getTenantStats, getGlobalStats, deleteTenantStats, diff --git a/src/shared/openTelemetry.js b/src/shared/openTelemetry.js index f9d5896a..323edecf 100644 --- a/src/shared/openTelemetry.js +++ b/src/shared/openTelemetry.js @@ -12,9 +12,13 @@ const cds = require("@sap/cds"); const otel = _resilientRequire("@opentelemetry/api"); const config = require("../config"); +const eventQueueStats = require("./eventQueueStats"); const COMPONENT_NAME = "/shared/openTelemetry"; +let _statsSnapshot = null; +let _metricsInitialized = false; + const trace = async (context, label, fn, { attributes = {}, newRootSpan = false, traceContext } = {}) => { if (!config.enableTelemetry || !otel) { return fn(); @@ -110,4 +114,58 @@ const getCurrentTraceContext = () => { return carrier; }; -module.exports = { trace, getCurrentTraceContext }; +const _refreshStats = async () => { + try { + const namespaces = await eventQueueStats.getAllNamespaceStats(); + _statsSnapshot = { namespaces, lastRefreshedAt: Date.now() }; + } catch (err) { + cds.log(COMPONENT_NAME).error("failed to refresh queue stats for metrics", err); + } +}; + +const initMetrics = () => { + if (_metricsInitialized || !config.enableTelemetry || !config.redisEnabled || !otel?.metrics) { + return; + } + const meterProvider = otel.metrics.getMeterProvider?.(); + if (!meterProvider) { + return; + } + + _metricsInitialized = true; + + const meter = otel.metrics.getMeter("@cap-js-community/event-queue"); + + const pendingGauge = meter.createObservableGauge("cap.event_queue.jobs.pending", { + description: "Current number of jobs waiting to be processed.", + unit: "1", + }); + const inProgressGauge = meter.createObservableGauge("cap.event_queue.jobs.in_progress", { + description: "Current number of jobs actively being processed by workers.", + unit: "1", + }); + const refreshAgeGauge = meter.createObservableGauge("cap.event_queue.stats.refresh_age", { + description: "Age of the most recent queue statistics snapshot.", + unit: "s", + }); + + meter.addBatchObservableCallback( + (observableResult) => { + if (!_statsSnapshot) { + return; + } + observableResult.observe(refreshAgeGauge, (Date.now() - _statsSnapshot.lastRefreshedAt) / 1000); + for (const [namespace, stats] of Object.entries(_statsSnapshot.namespaces)) { + const attrs = { "queue.namespace": namespace }; + observableResult.observe(pendingGauge, stats.pending, attrs); + observableResult.observe(inProgressGauge, stats.inProgress, attrs); + } + }, + [pendingGauge, inProgressGauge, refreshAgeGauge] + ); + + _refreshStats(); + setInterval(_refreshStats, 30_000).unref(); +}; + +module.exports = { trace, getCurrentTraceContext, initMetrics }; From 900ae7db9ecb573edb5f4191224cf8469af7739a Mon Sep 17 00:00:00 2001 From: Max Gruenfelder Date: Sun, 8 Mar 2026 11:43:57 +0100 Subject: [PATCH 09/18] snapshots --- .../baseFunctionality.test.js.snap | 36 +++++++++++++++++++ .../eventQueueOutbox.test.js.snap | 5 +++ 2 files changed, 41 insertions(+) diff --git a/test/__snapshots__/baseFunctionality.test.js.snap b/test/__snapshots__/baseFunctionality.test.js.snap index 3c7688e1..d1c4249a 100644 --- a/test/__snapshots__/baseFunctionality.test.js.snap +++ b/test/__snapshots__/baseFunctionality.test.js.snap @@ -111,31 +111,37 @@ exports[`baseFunctionality ad-hoc events error handling missing event implementa exports[`baseFunctionality getOpenQueueEntries event types in error should be considered 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DBKeepAlive", "type": "HealthCheckKeepAlive_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DB", "type": "HealthCheck_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", }, { + "count": 1, "namespace": "default", "subType": "everyFiveMin", "type": "TimeSpecificEveryFiveMin_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "cron", "type": "TimeSpecificEveryMin_PERIODIC", @@ -146,26 +152,31 @@ exports[`baseFunctionality getOpenQueueEntries event types in error should be co exports[`baseFunctionality getOpenQueueEntries event types in progress should be ignored 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DBKeepAlive", "type": "HealthCheckKeepAlive_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DB", "type": "HealthCheck_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "everyFiveMin", "type": "TimeSpecificEveryFiveMin_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "cron", "type": "TimeSpecificEveryMin_PERIODIC", @@ -176,46 +187,55 @@ exports[`baseFunctionality getOpenQueueEntries event types in progress should be exports[`baseFunctionality getOpenQueueEntries filterAppSpecificEvents return open event types 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "AppInstance", "type": "AppSpecific_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "AppName", "type": "AppSpecific_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "both", "type": "AppSpecific_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DBKeepAlive", "type": "HealthCheckKeepAlive_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DB", "type": "HealthCheck_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", }, { + "count": 1, "namespace": "default", "subType": "everyFiveMin", "type": "TimeSpecificEveryFiveMin_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "cron", "type": "TimeSpecificEveryMin_PERIODIC", @@ -226,31 +246,37 @@ exports[`baseFunctionality getOpenQueueEntries filterAppSpecificEvents return op exports[`baseFunctionality getOpenQueueEntries return open event types 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DBKeepAlive", "type": "HealthCheckKeepAlive_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DB", "type": "HealthCheck_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", }, { + "count": 1, "namespace": "default", "subType": "everyFiveMin", "type": "TimeSpecificEveryFiveMin_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "cron", "type": "TimeSpecificEveryMin_PERIODIC", @@ -261,11 +287,13 @@ exports[`baseFunctionality getOpenQueueEntries return open event types 2`] = ` exports[`baseFunctionality getOpenQueueEntries should respect app instance configuration both open event relevant for app 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "AppInstance", "type": "AppSpecific", }, { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", @@ -276,6 +304,7 @@ exports[`baseFunctionality getOpenQueueEntries should respect app instance confi exports[`baseFunctionality getOpenQueueEntries should respect app instance configuration one open event for app and one not for this app 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", @@ -286,11 +315,13 @@ exports[`baseFunctionality getOpenQueueEntries should respect app instance confi exports[`baseFunctionality getOpenQueueEntries should respect app instance configuration one open event for app and one not for this app but redis should ignore filter 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "AppInstance", "type": "AppSpecific", }, { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", @@ -301,11 +332,13 @@ exports[`baseFunctionality getOpenQueueEntries should respect app instance confi exports[`baseFunctionality getOpenQueueEntries should respect app name configuration both open event relevant for app 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "AppName", "type": "AppSpecific", }, { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", @@ -316,6 +349,7 @@ exports[`baseFunctionality getOpenQueueEntries should respect app name configura exports[`baseFunctionality getOpenQueueEntries should respect app name configuration one open event for app and one not for this app 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", @@ -326,11 +360,13 @@ exports[`baseFunctionality getOpenQueueEntries should respect app name configura exports[`baseFunctionality getOpenQueueEntries should respect app name configuration one open event for app and one not for this app but redis should ignore filter 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "AppName", "type": "AppSpecific", }, { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", diff --git a/test/__snapshots__/eventQueueOutbox.test.js.snap b/test/__snapshots__/eventQueueOutbox.test.js.snap index 33f65d63..9ce8cc2c 100644 --- a/test/__snapshots__/eventQueueOutbox.test.js.snap +++ b/test/__snapshots__/eventQueueOutbox.test.js.snap @@ -419,26 +419,31 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true req.data should be stored exports[`event-queue outbox monkeyPatchCAPOutbox=true return open event types 1`] = ` [ { + "count": 1, "namespace": "default", "subType": "NotificationService", "type": "CAP_OUTBOX", }, { + "count": 1, "namespace": "default", "subType": "NotificationServicePeriodic.main", "type": "CAP_OUTBOX_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "NotificationServicePeriodic.randomOffset", "type": "CAP_OUTBOX_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "QueueService.main", "type": "CAP_OUTBOX_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "sapafcsdk.scheduling.ProviderService.timeBucketAction", "type": "CAP_OUTBOX_PERIODIC", From 7f36d6a69dec05b8eef4f8b7927ce8fb51e1e719 Mon Sep 17 00:00:00 2001 From: Max Gruenfelder Date: Sun, 8 Mar 2026 13:36:01 +0100 Subject: [PATCH 10/18] wip --- .claude/worktrees/funny-borg | 1 + src/runner/runner.js | 9 +++++++++ test-integration/__snapshots__/runner.test.js.snap | 5 +++++ 3 files changed, 15 insertions(+) create mode 160000 .claude/worktrees/funny-borg diff --git a/.claude/worktrees/funny-borg b/.claude/worktrees/funny-borg new file mode 160000 index 00000000..900ae7db --- /dev/null +++ b/.claude/worktrees/funny-borg @@ -0,0 +1 @@ +Subproject commit 900ae7db9ecb573edb5f4191224cf8469af7739a diff --git a/src/runner/runner.js b/src/runner/runner.js index d4095265..2c68bf4a 100644 --- a/src/runner/runner.js +++ b/src/runner/runner.js @@ -389,6 +389,15 @@ const _singleTenantRedis = async () => { logger.info("broadcasting events for run", { entries: entries.length, }); + const pendingByNamespace = {}; + for (const entry of entries) { + pendingByNamespace[entry.namespace] = (pendingByNamespace[entry.namespace] ?? 0) + entry.count; + } + for (const [namespace, count] of Object.entries(pendingByNamespace)) { + eventQueueStats + .setGlobalCounter(namespace, eventQueueStats.StatusField.Pending, count) + .catch((err) => logger.error("updating global stats failed", err, { namespace })); + } if (!entries.length) { return; } diff --git a/test-integration/__snapshots__/runner.test.js.snap b/test-integration/__snapshots__/runner.test.js.snap index d1617495..495d3dfa 100644 --- a/test-integration/__snapshots__/runner.test.js.snap +++ b/test-integration/__snapshots__/runner.test.js.snap @@ -292,5 +292,10 @@ exports[`runner redis single tenant with open events - broadcast should be calle "PX": 1425000, }, }, + "EVENT_QUEUE##default##stats##global": { + "hash": { + "pending": "1", + }, + }, } `; From 17e8d68d1646218eb96bcfdf233c6a1f645703ba Mon Sep 17 00:00:00 2001 From: Max Gruenfelder Date: Sun, 8 Mar 2026 13:48:03 +0100 Subject: [PATCH 11/18] add tests --- test-integration/dbHandlerStats.test.js | 384 ++++++++++++++++++++++++ 1 file changed, 384 insertions(+) create mode 100644 test-integration/dbHandlerStats.test.js diff --git a/test-integration/dbHandlerStats.test.js b/test-integration/dbHandlerStats.test.js new file mode 100644 index 00000000..5f836ec5 --- /dev/null +++ b/test-integration/dbHandlerStats.test.js @@ -0,0 +1,384 @@ +"use strict"; + +const path = require("path"); + +const mockRedis = require("../test/mocks/redisMock"); +jest.mock("../src/shared/redis", () => mockRedis); + +const cds = require("@sap/cds"); +cds.test(__dirname + "/_env"); + +const basePath = path.join(__dirname, "..", "test", "asset", "outboxProject"); +cds.env.requires.StandardService = { + impl: path.join(basePath, "srv/service/standard-service.js"), + outbox: { kind: "persistent-outbox" }, +}; + +cds.env.requires.NotificationService = { + impl: path.join(basePath, "srv/service/service.js"), + outbox: { kind: "persistent-outbox" }, +}; + +const eventQueue = require("../src"); +const config = require("../src/config"); +const { getTenantStats, getGlobalStats, StatusField } = require("../src/shared/eventQueueStats"); +const { EventProcessingStatus } = require("../src/constants"); +const { processEventQueue } = require("../src/processEventQueue"); +const testHelper = require("../test/helper"); +const { Logger: mockLogger } = require("../test/mocks/logger"); + +describe("dbHandler - stats tracking on HANA", () => { + let context, tx, loggerMock; + + beforeAll(async () => { + eventQueue.config.initialized = false; + await eventQueue.initialize({ + processEventsAfterPublish: false, + registerAsEventProcessor: false, + insertEventsBeforeCommit: true, + useAsCAPOutbox: true, + userId: "dummyTestUser", + }); + const db = await cds.connect.to("db"); + cds.emit("connect", db); + config.redisEnabled = true; + eventQueue.registerEventQueueDbHandler(db); + loggerMock = mockLogger(); + }); + + beforeEach(async () => { + context = new cds.EventContext({ user: "testUser", tenant: 123 }); + tx = cds.tx(context); + await cds.tx({}, async (tx2) => { + await tx2.run(DELETE.from("sap.eventqueue.Lock")); + await tx2.run(DELETE.from("sap.eventqueue.Event")); + }); + await commitAndOpenNew(); + mockRedis.clearState(); + jest.clearAllMocks(); + }); + + afterEach(async () => { + await tx.rollback(); + }); + + afterAll(async () => { + config.redisEnabled = false; + await cds.disconnect(); + await cds.shutdown(); + }); + + it("increments pending counter by 1 after single send and commit", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(1); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(1); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("accumulates pending counter for multiple sends in same transaction", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(3); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(3); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("does not increment counter when transaction is rolled back", async () => { + const innerTx = cds.tx(context); + const service = (await cds.connect.to("StandardService")).tx(innerTx.context); + await service.send("main", {}); + await innerTx.rollback(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + }); + + it("does not increment counter when redisEnabled is false", async () => { + config.redisEnabled = false; + try { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + } finally { + config.redisEnabled = true; + } + }); + + it("tracks stats per tenant while global counter aggregates", async () => { + const service123 = (await cds.connect.to("StandardService")).tx(context); + await service123.send("main", {}); + await service123.send("main", {}); + await commitAndOpenNew(); + + const ctx456 = new cds.EventContext({ user: "testUser", tenant: 456 }); + const tx456 = cds.tx(ctx456); + const service456 = (await cds.connect.to("StandardService")).tx(ctx456); + await service456.send("main", {}); + await tx456.commit(); + + const stats123 = await getTenantStats(123); + expect(stats123[StatusField.Pending]).toBe(2); + + const stats456 = await getTenantStats(456); + expect(stats456[StatusField.Pending]).toBe(1); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(3); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + describe("UPDATE handler — HANA affectedRows behavior", () => { + it("Open → InProgress: HANA affectedRows returns correct count for bulk update of 3 rows", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=3 + + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); + + // On HANA, affectedRows must be 3 — not the fallback of 1 + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(3); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + expect(globalStats[StatusField.InProgress]).toBe(3); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("InProgress → Done: HANA affectedRows correctly decrements all inProgress", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=3 + + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); // pending=0, inProgress=3 + + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Done }) + .where({ status: EventProcessingStatus.InProgress }) + ); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("InProgress → Error: HANA affectedRows correctly restores all rows as pending", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=3 + + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); // pending=0, inProgress=3 + + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Error }) + .where({ status: EventProcessingStatus.InProgress }) + ); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(3); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(3); + expect(globalStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("InProgress → Exceeded: HANA affectedRows correctly decrements all inProgress", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=3 + + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); // pending=0, inProgress=3 + + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Exceeded }) + .where({ status: EventProcessingStatus.InProgress }) + ); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("UPDATE matching 0 rows does not affect counters", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await commitAndOpenNew(); // pending=1 + + // WHERE clause matches nothing — affectedRows=0, must not change counters + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Done }) + .where({ status: EventProcessingStatus.InProgress }) // nothing is InProgress yet + ); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(1); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("two UPDATEs in one transaction accumulate into a single succeeded handler call", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + // Open→InProgress then InProgress→Done without an intermediate commit + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Done }) + .where({ status: EventProcessingStatus.InProgress }) + ); + await commitAndOpenNew(); + + // Net delta: pending -2, inProgress +2 then -2 → both counters at 0 + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("does not adjust counters when redisEnabled is false", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + config.redisEnabled = false; + try { + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(2); + expect(tenantStats[StatusField.InProgress]).toBe(0); + } finally { + config.redisEnabled = true; + } + }); + + it("does not adjust counters when transaction is rolled back", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + const innerTx = cds.tx(context); + await innerTx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await innerTx.rollback(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(2); + expect(tenantStats[StatusField.InProgress]).toBe(0); + }); + }); + + describe("processEventQueue integration — stats via real processing", () => { + it("successful processing transitions pending → inProgress → Done (counters reach zero)", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await commitAndOpenNew(); + + expect((await getTenantStats(123))[StatusField.Pending]).toBe(1); + expect((await getTenantStats(123))[StatusField.InProgress]).toBe(0); + + await processEventQueue(tx.context, "CAP_OUTBOX", "StandardService"); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + expect(globalStats[StatusField.InProgress]).toBe(0); + + await testHelper.selectEventQueueAndExpectDone(tx, { expectedLength: 1 }); + }); + + it("failed processing transitions pending → inProgress → Error → back to pending", async () => { + const service = cds.outboxed(await cds.connect.to("NotificationService")).tx(context); + await service.send("errorEvent", { to: "to", subject: "subject", body: "body" }); + await commitAndOpenNew(); + + expect((await getTenantStats(123))[StatusField.Pending]).toBe(1); + expect((await getTenantStats(123))[StatusField.InProgress]).toBe(0); + + await processEventQueue(tx.context, "CAP_OUTBOX", "NotificationService"); + await commitAndOpenNew(); + + // Error state means the event will be retried → counts as pending + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(1); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(1); + expect(globalStats[StatusField.InProgress]).toBe(0); + + await testHelper.selectEventQueueAndExpectError(tx, { expectedLength: 1 }); + }); + }); + + const commitAndOpenNew = async () => { + await tx.commit(); + context = new cds.EventContext({ user: "testUser", tenant: 123 }); + tx = cds.tx(context); + }; +}); From f583b591c8f7b01713076470d5e85a2bfc3a2ee5 Mon Sep 17 00:00:00 2001 From: Max Gruenfelder Date: Sun, 8 Mar 2026 13:57:18 +0100 Subject: [PATCH 12/18] wip --- .claude/worktrees/funny-borg | 2 +- src/dbHandler.js | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/.claude/worktrees/funny-borg b/.claude/worktrees/funny-borg index 900ae7db..17e8d68d 160000 --- a/.claude/worktrees/funny-borg +++ b/.claude/worktrees/funny-borg @@ -1 +1 @@ -Subproject commit 900ae7db9ecb573edb5f4191224cf8469af7739a +Subproject commit 17e8d68d1646218eb96bcfdf233c6a1f645703ba diff --git a/src/dbHandler.js b/src/dbHandler.js index db00cdf7..9381fa04 100644 --- a/src/dbHandler.js +++ b/src/dbHandler.js @@ -73,14 +73,12 @@ const registerEventQueueDbHandler = (dbService) => { if (!registeredHandlers.updateDbHandler) { registeredHandlers.updateDbHandler = true; - dbService.after("UPDATE", def, (affectedRows, req) => { + dbService.after("UPDATE", def, (count, req) => { const newStatus = req.query.UPDATE?.data?.status; if (newStatus == null) { return; } - const count = typeof affectedRows === "number" && affectedRows > 0 ? affectedRows : 1; - req.tx._ = req.tx._ ?? {}; req.tx._.eventQueueStatsPendingDelta = req.tx._.eventQueueStatsPendingDelta ?? 0; req.tx._.eventQueueStatsInProgressDelta = req.tx._.eventQueueStatsInProgressDelta ?? 0; From c59ae6544a8de6ffeb731f6639ab287705e2d3db Mon Sep 17 00:00:00 2001 From: Max Gruenfelder Date: Sun, 8 Mar 2026 13:58:35 +0100 Subject: [PATCH 13/18] gitingore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 47915012..1c953d34 100644 --- a/.gitignore +++ b/.gitignore @@ -31,6 +31,7 @@ data/afc.sqlite-wal data/test.sqlite coverage Claude.md +.claude/ # temp temp/ From 866f1422a5f9ac827a7afc0af86195110e12197e Mon Sep 17 00:00:00 2001 From: Max Gruenfelder Date: Sun, 8 Mar 2026 14:51:05 +0100 Subject: [PATCH 14/18] wip --- src/runner/runner.js | 8 +++++--- test-integration/__snapshots__/e2e-redis.test.js.snap | 2 +- test-integration/__snapshots__/runner.test.js.snap | 5 +++++ test/mocks/redisMock.js | 1 + 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/runner/runner.js b/src/runner/runner.js index 2c68bf4a..5591ec76 100644 --- a/src/runner/runner.js +++ b/src/runner/runner.js @@ -162,7 +162,7 @@ const _executeEventsAllTenantsRedis = async (tenantIds) => { entries: entries.length, }); tenantCounts[tenantId] = entries; - const pendingByNamespace = {}; + const pendingByNamespace = Object.fromEntries(config.processingNamespaces.map((name) => [name, 0])); for (const entry of entries) { pendingByNamespace[entry.namespace] = (pendingByNamespace[entry.namespace] ?? 0) + entry.count; } @@ -189,7 +189,9 @@ const _executeEventsAllTenantsRedis = async (tenantIds) => { logger.error("broadcasting events for tenant failed", { tenantId }, err); } } - const globalPendingByNamespace = {}; + const globalPendingByNamespace = Object.fromEntries( + tenantIds.map((tenant) => [tenant, config.processingNamespaces.map((namespace) => ({ namespace, count: 0 }))]) + ); for (const tenantEntries of Object.values(tenantCounts)) { for (const entry of tenantEntries) { globalPendingByNamespace[entry.namespace] = (globalPendingByNamespace[entry.namespace] ?? 0) + entry.count; @@ -389,7 +391,7 @@ const _singleTenantRedis = async () => { logger.info("broadcasting events for run", { entries: entries.length, }); - const pendingByNamespace = {}; + const pendingByNamespace = Object.fromEntries(config.processingNamespaces.map((name) => [name, 0])); for (const entry of entries) { pendingByNamespace[entry.namespace] = (pendingByNamespace[entry.namespace] ?? 0) + entry.count; } diff --git a/test-integration/__snapshots__/e2e-redis.test.js.snap b/test-integration/__snapshots__/e2e-redis.test.js.snap index d7f8e85d..161f47ef 100644 --- a/test-integration/__snapshots__/e2e-redis.test.js.snap +++ b/test-integration/__snapshots__/e2e-redis.test.js.snap @@ -11,9 +11,9 @@ exports[`end-to-end redis broadcast checkAndInsertPeriodicEvents should insert n exports[`end-to-end redis broadcast checkAndInsertPeriodicEvents should insert new events and runner should broadcast + process events 2`] = ` [ "EVENT_QUEUE##default##RUN_REDIS_CHECK", + "EVENT_QUEUE##default##stats##global", "EVENT_QUEUE##default####TEST_STATIC", "EVENT_QUEUE##default##stats##tenant##undefined", - "EVENT_QUEUE##default##stats##global", ] `; diff --git a/test-integration/__snapshots__/runner.test.js.snap b/test-integration/__snapshots__/runner.test.js.snap index 495d3dfa..34494250 100644 --- a/test-integration/__snapshots__/runner.test.js.snap +++ b/test-integration/__snapshots__/runner.test.js.snap @@ -262,6 +262,11 @@ exports[`runner redis single tenant no open events 2`] = ` "PX": 1425000, }, }, + "EVENT_QUEUE##default##stats##global": { + "hash": { + "pending": "0", + }, + }, } `; diff --git a/test/mocks/redisMock.js b/test/mocks/redisMock.js index 5cb66f8c..2ff69bd2 100644 --- a/test/mocks/redisMock.js +++ b/test/mocks/redisMock.js @@ -81,6 +81,7 @@ const _createMainClientAndConnect = async () => _buildClient(); module.exports = { attachRedisUnsubscribeHandler: () => {}, subscribeRedisChannel: () => {}, + publishMessage: async () => {}, createClientAndConnect: _createMainClientAndConnect, createMainClientAndConnect: _createMainClientAndConnect, closeSubscribeClient: () => {}, From ba52c32bc92a30a680934f300ba0820d82ff35e7 Mon Sep 17 00:00:00 2001 From: Max Gruenfelder Date: Sun, 8 Mar 2026 15:07:17 +0100 Subject: [PATCH 15/18] wip --- src/shared/eventQueueStats.js | 28 +++++++++++++++++++ src/shared/openTelemetry.js | 13 ++++++++- test/eventQueueStats.test.js | 51 +++++++++++++++++++++++++++++++++++ test/mocks/redisMock.js | 14 ++++++++++ 4 files changed, 105 insertions(+), 1 deletion(-) diff --git a/src/shared/eventQueueStats.js b/src/shared/eventQueueStats.js index 2010a36c..597ea5a2 100644 --- a/src/shared/eventQueueStats.js +++ b/src/shared/eventQueueStats.js @@ -158,6 +158,33 @@ const deleteTenantStats = async (tenantId) => { } }; +/** + * Resets the inProgress counter to 0 for all processing namespaces (global + all tenants). + * Called on instance startup to clean up stale counts left by a previous crash. + */ +const resetInProgressCounters = async () => { + try { + const clientOrCluster = await redis.createMainClientAndConnect(); + const clients = redis.isClusterMode() + ? clientOrCluster.masters.map((master) => master.client) + : [clientOrCluster]; + + const globalOps = config.processingNamespaces.map((namespace) => + clientOrCluster.hSet(`${_keyPrefix(namespace)}##stats##global`, StatusField.InProgress, 0) + ); + await Promise.allSettled(globalOps); + + // NOTE: use SCAN because KEYS is not supported for cluster clients + for (const client of clients) { + for await (const key of client.scanIterator({ MATCH: "*##stats##tenant##*", COUNT: 1000 })) { + await client.hSet(key, StatusField.InProgress, 0); + } + } + } catch (err) { + cds.log(COMPONENT_NAME).error("failed to reset inProgress counters on startup", err); + } +}; + const _parseCounterHash = (raw) => ({ [StatusField.Pending]: raw[StatusField.Pending] != null ? parseInt(raw[StatusField.Pending]) : 0, [StatusField.InProgress]: raw[StatusField.InProgress] != null ? parseInt(raw[StatusField.InProgress]) : 0, @@ -180,4 +207,5 @@ module.exports = { getTenantStats, getGlobalStats, deleteTenantStats, + resetInProgressCounters, }; diff --git a/src/shared/openTelemetry.js b/src/shared/openTelemetry.js index 323edecf..96af84e0 100644 --- a/src/shared/openTelemetry.js +++ b/src/shared/openTelemetry.js @@ -134,6 +134,10 @@ const initMetrics = () => { _metricsInitialized = true; + eventQueueStats + .resetInProgressCounters() + .catch((err) => cds.log(COMPONENT_NAME).error("failed to reset inProgress counters", err)); + const meter = otel.metrics.getMeter("@cap-js-community/event-queue"); const pendingGauge = meter.createObservableGauge("cap.event_queue.jobs.pending", { @@ -149,6 +153,14 @@ const initMetrics = () => { unit: "s", }); + _statsSnapshot = { + lastRefreshedAt: Date.now(), + namespaces: Object.fromEntries( + config.processingNamespaces.map((namespace) => [namespace, { pending: 0, inProgress: 0 }]) + ), + }; + _refreshStats(); + meter.addBatchObservableCallback( (observableResult) => { if (!_statsSnapshot) { @@ -164,7 +176,6 @@ const initMetrics = () => { [pendingGauge, inProgressGauge, refreshAgeGauge] ); - _refreshStats(); setInterval(_refreshStats, 30_000).unref(); }; diff --git a/test/eventQueueStats.test.js b/test/eventQueueStats.test.js index 6c3815ba..0e908046 100644 --- a/test/eventQueueStats.test.js +++ b/test/eventQueueStats.test.js @@ -17,6 +17,7 @@ const { getTenantStats, getGlobalStats, deleteTenantStats, + resetInProgressCounters, } = require("../src/shared/eventQueueStats"); const project = __dirname + "/.."; @@ -131,4 +132,54 @@ describe("eventQueueStats", () => { await expect(deleteTenantStats("nonexistent")).resolves.toBeUndefined(); }); }); + + describe("resetInProgressCounters", () => { + it("resets global inProgress to 0 for all configured namespaces", async () => { + await incrementCounters("t1", StatusField.InProgress, 5); + + await resetInProgressCounters(); + + const global = await getGlobalStats(); + expect(global.inProgress).toBe(0); + }); + + it("does not touch the pending counter", async () => { + await incrementCounters("t1", StatusField.Pending, 3); + await incrementCounters("t1", StatusField.InProgress, 2); + + await resetInProgressCounters(); + + const global = await getGlobalStats(); + expect(global.pending).toBe(3); + expect(global.inProgress).toBe(0); + }); + + it("resets inProgress in tenant hash keys found via scan", async () => { + await incrementCounters("t1", StatusField.InProgress, 4); + await incrementCounters("t2", StatusField.InProgress, 2); + + await resetInProgressCounters(); + + const t1 = await getTenantStats("t1"); + expect(t1.inProgress).toBe(0); + + const t2 = await getTenantStats("t2"); + expect(t2.inProgress).toBe(0); + }); + + it("preserves tenant pending counter after reset", async () => { + await incrementCounters("t1", StatusField.Pending, 7); + await incrementCounters("t1", StatusField.InProgress, 3); + + await resetInProgressCounters(); + + const t1 = await getTenantStats("t1"); + expect(t1.pending).toBe(7); + expect(t1.inProgress).toBe(0); + }); + + it("resolves without error when no keys exist", async () => { + await expect(resetInProgressCounters()).resolves.toBeUndefined(); + }); + }); }); diff --git a/test/mocks/redisMock.js b/test/mocks/redisMock.js index 2ff69bd2..987f957c 100644 --- a/test/mocks/redisMock.js +++ b/test/mocks/redisMock.js @@ -38,6 +38,19 @@ const _buildClient = () => ({ hGetAll: async (key) => { return state[key]?.hash ?? {}; }, + scanIterator: ({ MATCH } = {}) => { + const regex = MATCH + ? new RegExp( + "^" + MATCH.replace(/[.+^${}()|[\]\\]/g, "\\$&").replace(/\*/g, ".*").replace(/\?/g, ".") + "$" + ) + : null; + const matchingKeys = Object.keys(state).filter((k) => !regex || regex.test(k)); + return (async function* () { + for (const key of matchingKeys) { + yield key; + } + })(); + }, multi: () => { const ops = []; const pipeline = { @@ -82,6 +95,7 @@ module.exports = { attachRedisUnsubscribeHandler: () => {}, subscribeRedisChannel: () => {}, publishMessage: async () => {}, + isClusterMode: () => false, createClientAndConnect: _createMainClientAndConnect, createMainClientAndConnect: _createMainClientAndConnect, closeSubscribeClient: () => {}, From a5894c4821f7b5a725eb819fc563f899b9d5a67f Mon Sep 17 00:00:00 2001 From: Max Gruenfelder Date: Sun, 8 Mar 2026 15:08:40 +0100 Subject: [PATCH 16/18] fix --- src/runner/runner.js | 4 +--- test-integration/__snapshots__/runner.test.js.snap | 10 ++++++++++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/runner/runner.js b/src/runner/runner.js index 5591ec76..e2d290d3 100644 --- a/src/runner/runner.js +++ b/src/runner/runner.js @@ -189,9 +189,7 @@ const _executeEventsAllTenantsRedis = async (tenantIds) => { logger.error("broadcasting events for tenant failed", { tenantId }, err); } } - const globalPendingByNamespace = Object.fromEntries( - tenantIds.map((tenant) => [tenant, config.processingNamespaces.map((namespace) => ({ namespace, count: 0 }))]) - ); + const globalPendingByNamespace = Object.fromEntries(config.processingNamespaces.map((namespace) => [namespace, 0])); for (const tenantEntries of Object.values(tenantCounts)) { for (const entry of tenantEntries) { globalPendingByNamespace[entry.namespace] = (globalPendingByNamespace[entry.namespace] ?? 0) + entry.count; diff --git a/test-integration/__snapshots__/runner.test.js.snap b/test-integration/__snapshots__/runner.test.js.snap index 34494250..9009169c 100644 --- a/test-integration/__snapshots__/runner.test.js.snap +++ b/test-integration/__snapshots__/runner.test.js.snap @@ -86,6 +86,16 @@ exports[`runner redis multi tenant no open events 2`] = ` "PX": 60000, }, }, + "EVENT_QUEUE##default##stats##global": { + "hash": { + "pending": "0", + }, + }, + "EVENT_QUEUE##default##stats##tenant##TEST_STATIC": { + "hash": { + "pending": "0", + }, + }, } `; From d8503948c9dabcd546ac41ff8e4b1e6f7849fc20 Mon Sep 17 00:00:00 2001 From: Max Gruenfelder Date: Sun, 8 Mar 2026 15:21:33 +0100 Subject: [PATCH 17/18] fix lint --- src/shared/eventQueueStats.js | 4 +--- test/mocks/redisMock.js | 6 +++++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/shared/eventQueueStats.js b/src/shared/eventQueueStats.js index 597ea5a2..4bba3ecf 100644 --- a/src/shared/eventQueueStats.js +++ b/src/shared/eventQueueStats.js @@ -165,9 +165,7 @@ const deleteTenantStats = async (tenantId) => { const resetInProgressCounters = async () => { try { const clientOrCluster = await redis.createMainClientAndConnect(); - const clients = redis.isClusterMode() - ? clientOrCluster.masters.map((master) => master.client) - : [clientOrCluster]; + const clients = redis.isClusterMode() ? clientOrCluster.masters.map((master) => master.client) : [clientOrCluster]; const globalOps = config.processingNamespaces.map((namespace) => clientOrCluster.hSet(`${_keyPrefix(namespace)}##stats##global`, StatusField.InProgress, 0) diff --git a/test/mocks/redisMock.js b/test/mocks/redisMock.js index 987f957c..1465ebcb 100644 --- a/test/mocks/redisMock.js +++ b/test/mocks/redisMock.js @@ -41,7 +41,11 @@ const _buildClient = () => ({ scanIterator: ({ MATCH } = {}) => { const regex = MATCH ? new RegExp( - "^" + MATCH.replace(/[.+^${}()|[\]\\]/g, "\\$&").replace(/\*/g, ".*").replace(/\?/g, ".") + "$" + "^" + + MATCH.replace(/[.+^${}()|[\]\\]/g, "\\$&") + .replace(/\*/g, ".*") + .replace(/\?/g, ".") + + "$" ) : null; const matchingKeys = Object.keys(state).filter((k) => !regex || regex.test(k)); From 14e44e603c65e18606e5ef0331cd4638541708c5 Mon Sep 17 00:00:00 2001 From: Max Gruenfelder Date: Sun, 8 Mar 2026 15:22:50 +0100 Subject: [PATCH 18/18] wip --- .claude/worktrees/funny-borg | 1 - 1 file changed, 1 deletion(-) delete mode 160000 .claude/worktrees/funny-borg diff --git a/.claude/worktrees/funny-borg b/.claude/worktrees/funny-borg deleted file mode 160000 index 17e8d68d..00000000 --- a/.claude/worktrees/funny-borg +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 17e8d68d1646218eb96bcfdf233c6a1f645703ba