diff --git a/.gitignore b/.gitignore index a1132f57..1c953d34 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ **/.project **/.settings *.code-workspace +CLAUDE.md # package managers bower_components/ @@ -30,6 +31,7 @@ data/afc.sqlite-wal data/test.sqlite coverage Claude.md +.claude/ # temp temp/ diff --git a/src/dbHandler.js b/src/dbHandler.js index 030db39d..9381fa04 100644 --- a/src/dbHandler.js +++ b/src/dbHandler.js @@ -4,11 +4,14 @@ const cds = require("@sap/cds"); const redisPub = require("./redis/redisPub"); const config = require("./config"); +const eventQueueStats = require("./shared/eventQueueStats"); +const { EventProcessingStatus } = require("./constants"); const COMPONENT_NAME = "/eventQueue/dbHandler"; const registeredHandlers = { eventQueueDbHandler: false, beforeDbHandler: false, + updateDbHandler: false, }; const registerEventQueueDbHandler = (dbService) => { @@ -26,36 +29,112 @@ const registerEventQueueDbHandler = (dbService) => { req.tx._.eventQueuePublishEvents = req.tx._.eventQueuePublishEvents ?? {}; const eventQueuePublishEvents = req.tx._.eventQueuePublishEvents; const data = Array.isArray(req.query.INSERT.entries) ? req.query.INSERT.entries : [req.query.INSERT.entries]; - const eventCombinations = Object.keys( - data.reduce((result, event) => { - const key = [event.type, event.subType, event.namespace].join("##"); - if ( - !config.hasEventAfterCommitFlag(event.type, event.subType, event.namespace) || - eventQueuePublishEvents[key] - ) { - return result; - } + + req.tx._.eventQueueStatsOpenCount = (req.tx._.eventQueueStatsOpenCount ?? 0) + data.length; + const newCombinations = data.reduce((result, event) => { + const key = [event.type, event.subType, event.namespace].join("##"); + if (config.hasEventAfterCommitFlag(event.type, event.subType, event.namespace) && !eventQueuePublishEvents[key]) { eventQueuePublishEvents[key] = true; - result[key] = true; - return result; - }, {}) - ); + result.push(key); + } + return result; + }, []); - eventCombinations.length && + req.tx._.eventQueueBroadcastCombinations ??= []; + req.tx._.eventQueueBroadcastCombinations.push(...newCombinations); + if (!req.tx._.eventQueueSucceededHandlerRegistered) { + req.tx._.eventQueueSucceededHandlerRegistered = true; req.on("succeeded", () => { - const events = eventCombinations.map((eventCombination) => { - const [type, subType, namespace] = eventCombination.split("##"); - return { type, subType, namespace }; - }); - - redisPub.broadcastEvent(req.tenant, events).catch((err) => { - cds.log(COMPONENT_NAME).error("db handler failure during broadcasting event", err, { - tenant: req.tenant, - events, + if (config.redisEnabled && req.tx._.eventQueueStatsOpenCount) { + eventQueueStats + .incrementCounters(req.tenant, eventQueueStats.StatusField.Pending, req.tx._.eventQueueStatsOpenCount) + .catch((err) => { + cds.log(COMPONENT_NAME).error("db handler failure during updating event stats", err, { + tenant: req.tenant, + }); + }); + } + const combinations = req.tx._.eventQueueBroadcastCombinations; + if (combinations.length) { + const events = combinations.map((combination) => { + const [type, subType, namespace] = combination.split("##"); + return { type, subType, namespace }; }); - }); + redisPub.broadcastEvent(req.tenant, events).catch((err) => { + cds.log(COMPONENT_NAME).error("db handler failure during broadcasting event", err, { + tenant: req.tenant, + events, + }); + }); + } }); + } }); + + if (!registeredHandlers.updateDbHandler) { + registeredHandlers.updateDbHandler = true; + dbService.after("UPDATE", def, (count, req) => { + const newStatus = req.query.UPDATE?.data?.status; + if (newStatus == null) { + return; + } + + req.tx._ = req.tx._ ?? {}; + req.tx._.eventQueueStatsPendingDelta = req.tx._.eventQueueStatsPendingDelta ?? 0; + req.tx._.eventQueueStatsInProgressDelta = req.tx._.eventQueueStatsInProgressDelta ?? 0; + + if (newStatus === EventProcessingStatus.InProgress) { + req.tx._.eventQueueStatsPendingDelta -= count; + req.tx._.eventQueueStatsInProgressDelta += count; + } else if (newStatus === EventProcessingStatus.Error) { + req.tx._.eventQueueStatsInProgressDelta -= count; + req.tx._.eventQueueStatsPendingDelta += count; + } else if ( + newStatus === EventProcessingStatus.Done || + newStatus === EventProcessingStatus.Exceeded || + newStatus === EventProcessingStatus.Suspended + ) { + req.tx._.eventQueueStatsInProgressDelta -= count; + } + + if (!req.tx._.eventQueueUpdateSucceededHandlerRegistered) { + req.tx._.eventQueueUpdateSucceededHandlerRegistered = true; + req.on("succeeded", () => { + if (!config.redisEnabled) { + return; + } + + const pendingDelta = req.tx._.eventQueueStatsPendingDelta; + const inProgressDelta = req.tx._.eventQueueStatsInProgressDelta; + const ops = []; + + if (pendingDelta !== 0) { + ops.push( + eventQueueStats.adjustTenantCounter(req.tenant, eventQueueStats.StatusField.Pending, pendingDelta), + eventQueueStats.adjustGlobalCounter(eventQueueStats.StatusField.Pending, pendingDelta) + ); + } + if (inProgressDelta !== 0) { + ops.push( + eventQueueStats.adjustTenantCounter(req.tenant, eventQueueStats.StatusField.InProgress, inProgressDelta), + eventQueueStats.adjustGlobalCounter(eventQueueStats.StatusField.InProgress, inProgressDelta) + ); + } + Promise.allSettled(ops).then((results) => { + for (const result of results) { + if (result.status === "rejected") { + cds + .log(COMPONENT_NAME) + .error("db handler failure during updating event stats on update", result.reason, { + tenant: req.tenant, + }); + } + } + }); + }); + } + }); + } }; module.exports = { diff --git a/src/initialize.js b/src/initialize.js index 392c55e8..7ba7f0ef 100644 --- a/src/initialize.js +++ b/src/initialize.js @@ -17,6 +17,7 @@ const { getAllTenantIds } = require("./shared/cdsHelper"); const { EventProcessingStatus } = require("./constants"); const distributedLock = require("./shared/distributedLock"); const EventQueueError = require("./EventQueueError"); +const { initMetrics } = require("./shared/openTelemetry"); const readFileAsync = promisify(fs.readFile); @@ -125,6 +126,7 @@ const initialize = async (options = {}) => { runInterval: config.runInterval, useAsCAPQueue: config.useAsCAPQueue, }); + initMetrics(); resolveFn(); }; diff --git a/src/runner/openEvents.js b/src/runner/openEvents.js index fc95f75a..71058dfa 100644 --- a/src/runner/openEvents.js +++ b/src/runner/openEvents.js @@ -29,12 +29,12 @@ const getOpenQueueEntries = async (tx, filterAppSpecificEvents = true) => { new Date(startTime.getTime() - 30 * MS_IN_DAYS).toISOString(), ")" ) - .columns("type", "subType", "namespace") + .columns("count(ID) as count", "type", "subType", "namespace") .groupBy("type", "subType", "namespace") ); const result = []; - for (const { type, subType, namespace } of entries) { + for (const { type, subType, namespace, count } of entries) { if (config.isCapOutboxEvent(type)) { const { srvName, actionName } = config.normalizeSubType(type, subType); try { @@ -48,14 +48,14 @@ const getOpenQueueEntries = async (tx, filterAppSpecificEvents = true) => { config.addCAPServiceWithoutEnvConfig(subType, service); } if (config.shouldBeProcessedInThisApplication(type, subType, namespace)) { - result.push({ namespace, type, subType }); + result.push({ namespace, type, subType, count }); } } } catch { /* ignore catch */ } finally { if (!filterAppSpecificEvents) { - result.push({ namespace, type, subType }); + result.push({ namespace, type, subType, count }); } } } else { @@ -64,10 +64,10 @@ const getOpenQueueEntries = async (tx, filterAppSpecificEvents = true) => { config.getEventConfig(type, subType, namespace) && config.shouldBeProcessedInThisApplication(type, subType, namespace) ) { - result.push({ namespace, type, subType }); + result.push({ namespace, type, subType, count }); } } else { - result.push({ namespace, type, subType }); + result.push({ namespace, type, subType, count }); } } } diff --git a/src/runner/runner.js b/src/runner/runner.js index 9295041c..e2d290d3 100644 --- a/src/runner/runner.js +++ b/src/runner/runner.js @@ -14,6 +14,7 @@ const common = require("../shared/common"); const config = require("../config"); const redisPub = require("../redis/redisPub"); const openEvents = require("./openEvents"); +const eventQueueStats = require("../shared/eventQueueStats"); const { runEventCombinationForTenant } = require("./runnerHelper"); const { trace } = require("../shared/openTelemetry"); @@ -141,7 +142,7 @@ const _executeEventsAllTenantsRedis = async (tenantIds) => { } catch (err) { logger.error("executing event queue run for multi instance and tenant failed", err); } - + const tenantCounts = {}; for (const tenantId of tenantIds) { try { await cds.tx({ tenant: tenantId }, async (tx) => { @@ -160,6 +161,16 @@ const _executeEventsAllTenantsRedis = async (tenantIds) => { tenantId, entries: entries.length, }); + tenantCounts[tenantId] = entries; + const pendingByNamespace = Object.fromEntries(config.processingNamespaces.map((name) => [name, 0])); + for (const entry of entries) { + pendingByNamespace[entry.namespace] = (pendingByNamespace[entry.namespace] ?? 0) + entry.count; + } + for (const [namespace, count] of Object.entries(pendingByNamespace)) { + eventQueueStats + .setTenantCounter(tenantId, namespace, eventQueueStats.StatusField.Pending, count) + .catch((err) => logger.error("updating tenant stats failed", err, { tenantId, namespace })); + } if (!entries.length) { return; } @@ -178,6 +189,17 @@ const _executeEventsAllTenantsRedis = async (tenantIds) => { logger.error("broadcasting events for tenant failed", { tenantId }, err); } } + const globalPendingByNamespace = Object.fromEntries(config.processingNamespaces.map((namespace) => [namespace, 0])); + for (const tenantEntries of Object.values(tenantCounts)) { + for (const entry of tenantEntries) { + globalPendingByNamespace[entry.namespace] = (globalPendingByNamespace[entry.namespace] ?? 0) + entry.count; + } + } + for (const [namespace, count] of Object.entries(globalPendingByNamespace)) { + eventQueueStats + .setGlobalCounter(namespace, eventQueueStats.StatusField.Pending, count) + .catch((err) => logger.error("updating global stats failed", err, { namespace })); + } }; const _executeEventsAllTenants = async (tenantIds) => { @@ -367,6 +389,15 @@ const _singleTenantRedis = async () => { logger.info("broadcasting events for run", { entries: entries.length, }); + const pendingByNamespace = Object.fromEntries(config.processingNamespaces.map((name) => [name, 0])); + for (const entry of entries) { + pendingByNamespace[entry.namespace] = (pendingByNamespace[entry.namespace] ?? 0) + entry.count; + } + for (const [namespace, count] of Object.entries(pendingByNamespace)) { + eventQueueStats + .setGlobalCounter(namespace, eventQueueStats.StatusField.Pending, count) + .catch((err) => logger.error("updating global stats failed", err, { namespace })); + } if (!entries.length) { return; } diff --git a/src/shared/eventQueueStats.js b/src/shared/eventQueueStats.js new file mode 100644 index 00000000..4bba3ecf --- /dev/null +++ b/src/shared/eventQueueStats.js @@ -0,0 +1,209 @@ +"use strict"; + +const cds = require("@sap/cds"); + +const redis = require("./redis"); +const config = require("../config"); + +const COMPONENT_NAME = "/eventQueue/eventQueueStats"; + +const StatusField = { + Pending: "pending", + InProgress: "inProgress", +}; + +const _tenantKey = (tenantId) => `${config.redisNamespace(true)}##stats##tenant##${tenantId}`; +const _globalKey = () => `${config.redisNamespace(true)}##stats##global`; +const _keyPrefix = (namespace) => `${config.redisNamespace(false)}##${namespace}`; + +/** + * Atomically adjusts a tenant's event counter for the given status field. + * + * @param {string} tenantId + * @param {string} field - one of StatusField.* + * @param {number} increment - positive to increment, negative to decrement + */ +const adjustTenantCounter = async (tenantId, field, increment) => { + try { + const client = await redis.createMainClientAndConnect(); + await client.hIncrBy(_tenantKey(tenantId), field, increment); + } catch (err) { + cds.log(COMPONENT_NAME).error("failed to adjust tenant stats counter", err, { tenantId, field, increment }); + } +}; + +/** + * Atomically adjusts the global event counter for the given status field. + * Also updates the `updatedAt` timestamp on the global hash. + * + * @param {string} field - one of StatusField.* + * @param {number} increment - positive to increment, negative to decrement + */ +const adjustGlobalCounter = async (field, increment) => { + try { + const client = await redis.createMainClientAndConnect(); + await client.hIncrBy(_globalKey(), field, increment); + } catch (err) { + cds.log(COMPONENT_NAME).error("failed to adjust global stats counter", err, { field, increment }); + } +}; + +/** + * Increments a tenant counter and the matching global counter in a single call. + * + * @param {string} tenantId + * @param {string} field - one of StatusField.* + * @param {number} [increment=1] + */ +const incrementCounters = async (tenantId, field, increment = 1) => { + await Promise.allSettled([adjustTenantCounter(tenantId, field, increment), adjustGlobalCounter(field, increment)]); +}; + +/** + * Decrements a tenant counter and the matching global counter in a single call. + * + * @param {string} tenantId + * @param {string} field - one of StatusField.* + * @param {number} [decrement=1] + */ +const decrementCounters = async (tenantId, field, decrement = 1) => { + await Promise.allSettled([adjustTenantCounter(tenantId, field, -decrement), adjustGlobalCounter(field, -decrement)]); +}; + +/** + * Returns the current stats hash for a single tenant. + * All counter values are returned as integers; missing fields default to 0. + * + * @param {string} tenantId + * @returns {Promise<{pending: number, inProgress: number}>} + */ +const getTenantStats = async (tenantId) => { + try { + const client = await redis.createMainClientAndConnect(); + const raw = await client.hGetAll(_tenantKey(tenantId)); + return _parseCounterHash(raw); + } catch (err) { + cds.log(COMPONENT_NAME).error("failed to read tenant stats", err, { tenantId }); + return _emptyCounters(); + } +}; + +/** + * Returns the current global stats hash. + * All counter values are returned as integers; missing fields default to 0. + * + * @returns {Promise<{pending: number, inProgress: number}>} + */ +const getGlobalStats = async () => { + try { + const client = await redis.createMainClientAndConnect(); + const raw = await client.hGetAll(_globalKey()); + return _parseCounterHash(raw); + } catch (err) { + cds.log(COMPONENT_NAME).error("failed to read global stats", err); + return _emptyCounters(); + } +}; + +/** + * Deletes the stats hash for a specific tenant. + * Intended for use during tenant offboarding. It does not adjust the global stats still will be fixed with the next global run + * + * @param {string} tenantId + */ +const setTenantCounter = async (tenantId, namespace, field, value) => { + try { + const client = await redis.createMainClientAndConnect(); + await client.hSet(`${_keyPrefix(namespace)}##stats##tenant##${tenantId}`, field, value); + } catch (err) { + cds.log(COMPONENT_NAME).error("failed to set tenant stats counter", err, { tenantId, namespace, field, value }); + } +}; + +const setGlobalCounter = async (namespace, field, value) => { + try { + const client = await redis.createMainClientAndConnect(); + await client.hSet(`${_keyPrefix(namespace)}##stats##global`, field, value); + } catch (err) { + cds.log(COMPONENT_NAME).error("failed to set global stats counter", err, { namespace, field, value }); + } +}; + +const getAllNamespaceStats = async () => { + const namespaces = config.processingNamespaces; + const client = await redis.createMainClientAndConnect(); + const results = await Promise.allSettled( + namespaces.map(async (namespace) => { + const raw = await client.hGetAll(`${_keyPrefix(namespace)}##stats##global`); + return { namespace, stats: _parseCounterHash(raw) }; + }) + ); + const out = {}; + for (const result of results) { + if (result.status === "fulfilled") { + out[result.value.namespace] = result.value.stats; + } else { + cds.log(COMPONENT_NAME).error("failed to read namespace stats", result.reason); + } + } + return out; +}; + +const deleteTenantStats = async (tenantId) => { + try { + const client = await redis.createMainClientAndConnect(); + await client.del(_tenantKey(tenantId)); + } catch (err) { + cds.log(COMPONENT_NAME).error("failed to delete tenant stats", err, { tenantId }); + } +}; + +/** + * Resets the inProgress counter to 0 for all processing namespaces (global + all tenants). + * Called on instance startup to clean up stale counts left by a previous crash. + */ +const resetInProgressCounters = async () => { + try { + const clientOrCluster = await redis.createMainClientAndConnect(); + const clients = redis.isClusterMode() ? clientOrCluster.masters.map((master) => master.client) : [clientOrCluster]; + + const globalOps = config.processingNamespaces.map((namespace) => + clientOrCluster.hSet(`${_keyPrefix(namespace)}##stats##global`, StatusField.InProgress, 0) + ); + await Promise.allSettled(globalOps); + + // NOTE: use SCAN because KEYS is not supported for cluster clients + for (const client of clients) { + for await (const key of client.scanIterator({ MATCH: "*##stats##tenant##*", COUNT: 1000 })) { + await client.hSet(key, StatusField.InProgress, 0); + } + } + } catch (err) { + cds.log(COMPONENT_NAME).error("failed to reset inProgress counters on startup", err); + } +}; + +const _parseCounterHash = (raw) => ({ + [StatusField.Pending]: raw[StatusField.Pending] != null ? parseInt(raw[StatusField.Pending]) : 0, + [StatusField.InProgress]: raw[StatusField.InProgress] != null ? parseInt(raw[StatusField.InProgress]) : 0, +}); + +const _emptyCounters = () => ({ + [StatusField.Pending]: 0, + [StatusField.InProgress]: 0, +}); + +module.exports = { + StatusField, + incrementCounters, + decrementCounters, + adjustTenantCounter, + adjustGlobalCounter, + setTenantCounter, + setGlobalCounter, + getAllNamespaceStats, + getTenantStats, + getGlobalStats, + deleteTenantStats, + resetInProgressCounters, +}; diff --git a/src/shared/openTelemetry.js b/src/shared/openTelemetry.js index f9d5896a..96af84e0 100644 --- a/src/shared/openTelemetry.js +++ b/src/shared/openTelemetry.js @@ -12,9 +12,13 @@ const cds = require("@sap/cds"); const otel = _resilientRequire("@opentelemetry/api"); const config = require("../config"); +const eventQueueStats = require("./eventQueueStats"); const COMPONENT_NAME = "/shared/openTelemetry"; +let _statsSnapshot = null; +let _metricsInitialized = false; + const trace = async (context, label, fn, { attributes = {}, newRootSpan = false, traceContext } = {}) => { if (!config.enableTelemetry || !otel) { return fn(); @@ -110,4 +114,69 @@ const getCurrentTraceContext = () => { return carrier; }; -module.exports = { trace, getCurrentTraceContext }; +const _refreshStats = async () => { + try { + const namespaces = await eventQueueStats.getAllNamespaceStats(); + _statsSnapshot = { namespaces, lastRefreshedAt: Date.now() }; + } catch (err) { + cds.log(COMPONENT_NAME).error("failed to refresh queue stats for metrics", err); + } +}; + +const initMetrics = () => { + if (_metricsInitialized || !config.enableTelemetry || !config.redisEnabled || !otel?.metrics) { + return; + } + const meterProvider = otel.metrics.getMeterProvider?.(); + if (!meterProvider) { + return; + } + + _metricsInitialized = true; + + eventQueueStats + .resetInProgressCounters() + .catch((err) => cds.log(COMPONENT_NAME).error("failed to reset inProgress counters", err)); + + const meter = otel.metrics.getMeter("@cap-js-community/event-queue"); + + const pendingGauge = meter.createObservableGauge("cap.event_queue.jobs.pending", { + description: "Current number of jobs waiting to be processed.", + unit: "1", + }); + const inProgressGauge = meter.createObservableGauge("cap.event_queue.jobs.in_progress", { + description: "Current number of jobs actively being processed by workers.", + unit: "1", + }); + const refreshAgeGauge = meter.createObservableGauge("cap.event_queue.stats.refresh_age", { + description: "Age of the most recent queue statistics snapshot.", + unit: "s", + }); + + _statsSnapshot = { + lastRefreshedAt: Date.now(), + namespaces: Object.fromEntries( + config.processingNamespaces.map((namespace) => [namespace, { pending: 0, inProgress: 0 }]) + ), + }; + _refreshStats(); + + meter.addBatchObservableCallback( + (observableResult) => { + if (!_statsSnapshot) { + return; + } + observableResult.observe(refreshAgeGauge, (Date.now() - _statsSnapshot.lastRefreshedAt) / 1000); + for (const [namespace, stats] of Object.entries(_statsSnapshot.namespaces)) { + const attrs = { "queue.namespace": namespace }; + observableResult.observe(pendingGauge, stats.pending, attrs); + observableResult.observe(inProgressGauge, stats.inProgress, attrs); + } + }, + [pendingGauge, inProgressGauge, refreshAgeGauge] + ); + + setInterval(_refreshStats, 30_000).unref(); +}; + +module.exports = { trace, getCurrentTraceContext, initMetrics }; diff --git a/test-integration/__snapshots__/e2e-redis.test.js.snap b/test-integration/__snapshots__/e2e-redis.test.js.snap index 2f662a8e..161f47ef 100644 --- a/test-integration/__snapshots__/e2e-redis.test.js.snap +++ b/test-integration/__snapshots__/e2e-redis.test.js.snap @@ -11,7 +11,9 @@ exports[`end-to-end redis broadcast checkAndInsertPeriodicEvents should insert n exports[`end-to-end redis broadcast checkAndInsertPeriodicEvents should insert new events and runner should broadcast + process events 2`] = ` [ "EVENT_QUEUE##default##RUN_REDIS_CHECK", + "EVENT_QUEUE##default##stats##global", "EVENT_QUEUE##default####TEST_STATIC", + "EVENT_QUEUE##default##stats##tenant##undefined", ] `; @@ -24,6 +26,8 @@ exports[`end-to-end redis broadcast insert entry: redis broadcast + process 1`] exports[`end-to-end redis broadcast insert entry: redis broadcast + process 2`] = ` [ + "EVENT_QUEUE##default##stats##tenant##undefined", + "EVENT_QUEUE##default##stats##global", "EVENT_QUEUE##default####TEST_STATIC", ] `; @@ -34,4 +38,9 @@ exports[`end-to-end runner should select open events and process + validate skip ] `; -exports[`end-to-end runner should select open events and process + validate skip broadcast 2`] = `[]`; +exports[`end-to-end runner should select open events and process + validate skip broadcast 2`] = ` +[ + "EVENT_QUEUE##default##stats##tenant##undefined", + "EVENT_QUEUE##default##stats##global", +] +`; diff --git a/test-integration/__snapshots__/runner.test.js.snap b/test-integration/__snapshots__/runner.test.js.snap index 54c00387..9009169c 100644 --- a/test-integration/__snapshots__/runner.test.js.snap +++ b/test-integration/__snapshots__/runner.test.js.snap @@ -86,6 +86,16 @@ exports[`runner redis multi tenant no open events 2`] = ` "PX": 60000, }, }, + "EVENT_QUEUE##default##stats##global": { + "hash": { + "pending": "0", + }, + }, + "EVENT_QUEUE##default##stats##tenant##TEST_STATIC": { + "hash": { + "pending": "0", + }, + }, } `; @@ -95,6 +105,7 @@ exports[`runner redis multi tenant tenant id filter should not acquire lock - on "e9bb8ec0-c85e-4035-b7cf-1b11ba8e5792", [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", @@ -105,6 +116,7 @@ exports[`runner redis multi tenant tenant id filter should not acquire lock - on "cd805323-879c-4bf7-b19c-8ffbbee22e1f", [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", @@ -115,6 +127,7 @@ exports[`runner redis multi tenant tenant id filter should not acquire lock - on "9f3ed8f0-8aaf-439e-a96a-04cd5b680c59", [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", @@ -126,7 +139,20 @@ exports[`runner redis multi tenant tenant id filter should not acquire lock - on exports[`runner redis multi tenant tenant id filter should not acquire lock - only process tenants based on tenant filter with open events - split into two instances 2`] = `[]`; -exports[`runner redis multi tenant tenant id filter should not acquire lock - only process tenants based on tenant filter with open events - split into two instances 3`] = `{}`; +exports[`runner redis multi tenant tenant id filter should not acquire lock - only process tenants based on tenant filter with open events - split into two instances 3`] = ` +{ + "EVENT_QUEUE##default##stats##global": { + "hash": { + "pending": "2", + }, + }, + "EVENT_QUEUE##default##stats##tenant##TEST_STATIC": { + "hash": { + "pending": "1", + }, + }, +} +`; exports[`runner redis multi tenant with open events - broadcast should be called 1`] = ` [ @@ -134,6 +160,7 @@ exports[`runner redis multi tenant with open events - broadcast should be called "cd805323-879c-4bf7-b19c-8ffbbee22e1f", [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", @@ -144,6 +171,7 @@ exports[`runner redis multi tenant with open events - broadcast should be called "9f3ed8f0-8aaf-439e-a96a-04cd5b680c59", [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", @@ -154,6 +182,7 @@ exports[`runner redis multi tenant with open events - broadcast should be called "e9bb8ec0-c85e-4035-b7cf-1b11ba8e5792", [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", @@ -203,6 +232,16 @@ exports[`runner redis multi tenant with open events - broadcast should be called "PX": 60000, }, }, + "EVENT_QUEUE##default##stats##global": { + "hash": { + "pending": "3", + }, + }, + "EVENT_QUEUE##default##stats##tenant##TEST_STATIC": { + "hash": { + "pending": "1", + }, + }, } `; @@ -233,6 +272,11 @@ exports[`runner redis single tenant no open events 2`] = ` "PX": 1425000, }, }, + "EVENT_QUEUE##default##stats##global": { + "hash": { + "pending": "0", + }, + }, } `; @@ -263,5 +307,10 @@ exports[`runner redis single tenant with open events - broadcast should be calle "PX": 1425000, }, }, + "EVENT_QUEUE##default##stats##global": { + "hash": { + "pending": "1", + }, + }, } `; diff --git a/test-integration/dbHandlerStats.test.js b/test-integration/dbHandlerStats.test.js new file mode 100644 index 00000000..5f836ec5 --- /dev/null +++ b/test-integration/dbHandlerStats.test.js @@ -0,0 +1,384 @@ +"use strict"; + +const path = require("path"); + +const mockRedis = require("../test/mocks/redisMock"); +jest.mock("../src/shared/redis", () => mockRedis); + +const cds = require("@sap/cds"); +cds.test(__dirname + "/_env"); + +const basePath = path.join(__dirname, "..", "test", "asset", "outboxProject"); +cds.env.requires.StandardService = { + impl: path.join(basePath, "srv/service/standard-service.js"), + outbox: { kind: "persistent-outbox" }, +}; + +cds.env.requires.NotificationService = { + impl: path.join(basePath, "srv/service/service.js"), + outbox: { kind: "persistent-outbox" }, +}; + +const eventQueue = require("../src"); +const config = require("../src/config"); +const { getTenantStats, getGlobalStats, StatusField } = require("../src/shared/eventQueueStats"); +const { EventProcessingStatus } = require("../src/constants"); +const { processEventQueue } = require("../src/processEventQueue"); +const testHelper = require("../test/helper"); +const { Logger: mockLogger } = require("../test/mocks/logger"); + +describe("dbHandler - stats tracking on HANA", () => { + let context, tx, loggerMock; + + beforeAll(async () => { + eventQueue.config.initialized = false; + await eventQueue.initialize({ + processEventsAfterPublish: false, + registerAsEventProcessor: false, + insertEventsBeforeCommit: true, + useAsCAPOutbox: true, + userId: "dummyTestUser", + }); + const db = await cds.connect.to("db"); + cds.emit("connect", db); + config.redisEnabled = true; + eventQueue.registerEventQueueDbHandler(db); + loggerMock = mockLogger(); + }); + + beforeEach(async () => { + context = new cds.EventContext({ user: "testUser", tenant: 123 }); + tx = cds.tx(context); + await cds.tx({}, async (tx2) => { + await tx2.run(DELETE.from("sap.eventqueue.Lock")); + await tx2.run(DELETE.from("sap.eventqueue.Event")); + }); + await commitAndOpenNew(); + mockRedis.clearState(); + jest.clearAllMocks(); + }); + + afterEach(async () => { + await tx.rollback(); + }); + + afterAll(async () => { + config.redisEnabled = false; + await cds.disconnect(); + await cds.shutdown(); + }); + + it("increments pending counter by 1 after single send and commit", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(1); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(1); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("accumulates pending counter for multiple sends in same transaction", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(3); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(3); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("does not increment counter when transaction is rolled back", async () => { + const innerTx = cds.tx(context); + const service = (await cds.connect.to("StandardService")).tx(innerTx.context); + await service.send("main", {}); + await innerTx.rollback(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + }); + + it("does not increment counter when redisEnabled is false", async () => { + config.redisEnabled = false; + try { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + } finally { + config.redisEnabled = true; + } + }); + + it("tracks stats per tenant while global counter aggregates", async () => { + const service123 = (await cds.connect.to("StandardService")).tx(context); + await service123.send("main", {}); + await service123.send("main", {}); + await commitAndOpenNew(); + + const ctx456 = new cds.EventContext({ user: "testUser", tenant: 456 }); + const tx456 = cds.tx(ctx456); + const service456 = (await cds.connect.to("StandardService")).tx(ctx456); + await service456.send("main", {}); + await tx456.commit(); + + const stats123 = await getTenantStats(123); + expect(stats123[StatusField.Pending]).toBe(2); + + const stats456 = await getTenantStats(456); + expect(stats456[StatusField.Pending]).toBe(1); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(3); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + describe("UPDATE handler — HANA affectedRows behavior", () => { + it("Open → InProgress: HANA affectedRows returns correct count for bulk update of 3 rows", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=3 + + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); + + // On HANA, affectedRows must be 3 — not the fallback of 1 + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(3); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + expect(globalStats[StatusField.InProgress]).toBe(3); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("InProgress → Done: HANA affectedRows correctly decrements all inProgress", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=3 + + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); // pending=0, inProgress=3 + + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Done }) + .where({ status: EventProcessingStatus.InProgress }) + ); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("InProgress → Error: HANA affectedRows correctly restores all rows as pending", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=3 + + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); // pending=0, inProgress=3 + + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Error }) + .where({ status: EventProcessingStatus.InProgress }) + ); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(3); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(3); + expect(globalStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("InProgress → Exceeded: HANA affectedRows correctly decrements all inProgress", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=3 + + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); // pending=0, inProgress=3 + + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Exceeded }) + .where({ status: EventProcessingStatus.InProgress }) + ); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("UPDATE matching 0 rows does not affect counters", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await commitAndOpenNew(); // pending=1 + + // WHERE clause matches nothing — affectedRows=0, must not change counters + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Done }) + .where({ status: EventProcessingStatus.InProgress }) // nothing is InProgress yet + ); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(1); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("two UPDATEs in one transaction accumulate into a single succeeded handler call", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + // Open→InProgress then InProgress→Done without an intermediate commit + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Done }) + .where({ status: EventProcessingStatus.InProgress }) + ); + await commitAndOpenNew(); + + // Net delta: pending -2, inProgress +2 then -2 → both counters at 0 + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("does not adjust counters when redisEnabled is false", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + config.redisEnabled = false; + try { + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(2); + expect(tenantStats[StatusField.InProgress]).toBe(0); + } finally { + config.redisEnabled = true; + } + }); + + it("does not adjust counters when transaction is rolled back", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + const innerTx = cds.tx(context); + await innerTx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await innerTx.rollback(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(2); + expect(tenantStats[StatusField.InProgress]).toBe(0); + }); + }); + + describe("processEventQueue integration — stats via real processing", () => { + it("successful processing transitions pending → inProgress → Done (counters reach zero)", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await commitAndOpenNew(); + + expect((await getTenantStats(123))[StatusField.Pending]).toBe(1); + expect((await getTenantStats(123))[StatusField.InProgress]).toBe(0); + + await processEventQueue(tx.context, "CAP_OUTBOX", "StandardService"); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + expect(globalStats[StatusField.InProgress]).toBe(0); + + await testHelper.selectEventQueueAndExpectDone(tx, { expectedLength: 1 }); + }); + + it("failed processing transitions pending → inProgress → Error → back to pending", async () => { + const service = cds.outboxed(await cds.connect.to("NotificationService")).tx(context); + await service.send("errorEvent", { to: "to", subject: "subject", body: "body" }); + await commitAndOpenNew(); + + expect((await getTenantStats(123))[StatusField.Pending]).toBe(1); + expect((await getTenantStats(123))[StatusField.InProgress]).toBe(0); + + await processEventQueue(tx.context, "CAP_OUTBOX", "NotificationService"); + await commitAndOpenNew(); + + // Error state means the event will be retried → counts as pending + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(1); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(1); + expect(globalStats[StatusField.InProgress]).toBe(0); + + await testHelper.selectEventQueueAndExpectError(tx, { expectedLength: 1 }); + }); + }); + + const commitAndOpenNew = async () => { + await tx.commit(); + context = new cds.EventContext({ user: "testUser", tenant: 123 }); + tx = cds.tx(context); + }; +}); diff --git a/test-integration/e2e-redis.test.js b/test-integration/e2e-redis.test.js index e310968c..4ee0b4c2 100644 --- a/test-integration/e2e-redis.test.js +++ b/test-integration/e2e-redis.test.js @@ -82,7 +82,6 @@ describe("end-to-end", () => { beforeEach(async () => { await DELETE.from("sap.eventqueue.Lock"); await DELETE.from("sap.eventqueue.Event"); - await DELETE.from("cds.outbox.Messages"); jest.clearAllMocks(); redisMock.clearState(); redisMock.clearTestState(); diff --git a/test-integration/keep-alive-tx-handling-e2e.test.js b/test-integration/keep-alive-tx-handling-e2e.test.js index 748e34fa..f7783815 100644 --- a/test-integration/keep-alive-tx-handling-e2e.test.js +++ b/test-integration/keep-alive-tx-handling-e2e.test.js @@ -20,6 +20,7 @@ const distributedLock = require("../src/shared/distributedLock"); const periodicEvents = require("../src/periodicEvents"); const { publishEvent } = require("../src/publishEvent"); const redisPub = require("../src/redis/redisPub"); +const eventQueueStats = require("../src/shared/eventQueueStats"); const configFilePath = path.join(__dirname, "..", "./test", "asset", "config.yml"); @@ -48,6 +49,7 @@ describe("keep-alive-tx-handling-e2e", () => { registerAsEventProcessor: false, isEventQueueActive: false, }); + jest.spyOn(eventQueueStats, "incrementCounters").mockResolvedValue(); loggerMock = mockLogger(); const db = await cds.connect.to("db"); @@ -110,6 +112,7 @@ describe("keep-alive-tx-handling-e2e", () => { subType: isolatedNoParallel.subType, }) ); + const { db } = cds.services; const { Event } = cds.entities("sap.eventqueue"); let forUpdateCounter = 0; diff --git a/test-integration/runner.test.js b/test-integration/runner.test.js index 79e8a153..366fa217 100644 --- a/test-integration/runner.test.js +++ b/test-integration/runner.test.js @@ -7,6 +7,7 @@ cds.test(__dirname + "/_env"); const mockRedis = require("../test/mocks/redisMock"); jest.mock("../src/shared/redis", () => mockRedis); +const { getTenantStats, getGlobalStats, StatusField } = require("../src/shared/eventQueueStats"); const WorkerQueue = require("../src/shared/WorkerQueue"); const processEventQueue = require("../src/processEventQueue"); const openEvents = require("../src/runner/openEvents"); @@ -176,6 +177,49 @@ describe("runner", () => { expect(loggerMock.callsLengths().error).toEqual(0); }); + describe("stats tracking", () => { + it("sets pending counter per tenant and globally after runner with open events", async () => { + await cds.tx({}, async (tx2) => { + await periodicEvents.checkAndInsertPeriodicEvents(tx2.context); + }); + mockTenantIds(tenantIds); + jest.spyOn(redisPub, "broadcastEvent").mockResolvedValue(); + jest.spyOn(periodicEvents, "checkAndInsertPeriodicEvents").mockResolvedValue(); + + await Promise.allSettled([runner.__._multiTenancyRedis(), runner.__._multiTenancyRedis()]); + await Promise.allSettled(WorkerQueue.instance.runningPromises); + await new Promise(setImmediate); + + for (const tenantId of tenantIds) { + const stats = await getTenantStats(tenantId); + expect(stats[StatusField.Pending]).toBe(1); + } + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(tenantIds.length); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("does not update stats when there are no open events", async () => { + mockTenantIds(tenantIds); + jest.spyOn(redisPub, "broadcastEvent").mockResolvedValue(); + jest.spyOn(periodicEvents, "checkAndInsertPeriodicEvents").mockResolvedValue(); + + await Promise.allSettled([runner.__._multiTenancyRedis()]); + await Promise.allSettled(WorkerQueue.instance.runningPromises); + await new Promise(setImmediate); + + for (const tenantId of tenantIds) { + const stats = await getTenantStats(tenantId); + expect(stats[StatusField.Pending]).toBe(0); + } + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + }); + describe("tenant id filter should not acquire lock - only process tenants based on tenant filter", () => { it("always false", async () => { await cds.tx({}, async (tx2) => { diff --git a/test/__snapshots__/baseFunctionality.test.js.snap b/test/__snapshots__/baseFunctionality.test.js.snap index 3c7688e1..d1c4249a 100644 --- a/test/__snapshots__/baseFunctionality.test.js.snap +++ b/test/__snapshots__/baseFunctionality.test.js.snap @@ -111,31 +111,37 @@ exports[`baseFunctionality ad-hoc events error handling missing event implementa exports[`baseFunctionality getOpenQueueEntries event types in error should be considered 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DBKeepAlive", "type": "HealthCheckKeepAlive_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DB", "type": "HealthCheck_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", }, { + "count": 1, "namespace": "default", "subType": "everyFiveMin", "type": "TimeSpecificEveryFiveMin_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "cron", "type": "TimeSpecificEveryMin_PERIODIC", @@ -146,26 +152,31 @@ exports[`baseFunctionality getOpenQueueEntries event types in error should be co exports[`baseFunctionality getOpenQueueEntries event types in progress should be ignored 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DBKeepAlive", "type": "HealthCheckKeepAlive_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DB", "type": "HealthCheck_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "everyFiveMin", "type": "TimeSpecificEveryFiveMin_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "cron", "type": "TimeSpecificEveryMin_PERIODIC", @@ -176,46 +187,55 @@ exports[`baseFunctionality getOpenQueueEntries event types in progress should be exports[`baseFunctionality getOpenQueueEntries filterAppSpecificEvents return open event types 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "AppInstance", "type": "AppSpecific_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "AppName", "type": "AppSpecific_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "both", "type": "AppSpecific_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DBKeepAlive", "type": "HealthCheckKeepAlive_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DB", "type": "HealthCheck_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", }, { + "count": 1, "namespace": "default", "subType": "everyFiveMin", "type": "TimeSpecificEveryFiveMin_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "cron", "type": "TimeSpecificEveryMin_PERIODIC", @@ -226,31 +246,37 @@ exports[`baseFunctionality getOpenQueueEntries filterAppSpecificEvents return op exports[`baseFunctionality getOpenQueueEntries return open event types 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DBKeepAlive", "type": "HealthCheckKeepAlive_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DB", "type": "HealthCheck_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", }, { + "count": 1, "namespace": "default", "subType": "everyFiveMin", "type": "TimeSpecificEveryFiveMin_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "cron", "type": "TimeSpecificEveryMin_PERIODIC", @@ -261,11 +287,13 @@ exports[`baseFunctionality getOpenQueueEntries return open event types 2`] = ` exports[`baseFunctionality getOpenQueueEntries should respect app instance configuration both open event relevant for app 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "AppInstance", "type": "AppSpecific", }, { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", @@ -276,6 +304,7 @@ exports[`baseFunctionality getOpenQueueEntries should respect app instance confi exports[`baseFunctionality getOpenQueueEntries should respect app instance configuration one open event for app and one not for this app 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", @@ -286,11 +315,13 @@ exports[`baseFunctionality getOpenQueueEntries should respect app instance confi exports[`baseFunctionality getOpenQueueEntries should respect app instance configuration one open event for app and one not for this app but redis should ignore filter 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "AppInstance", "type": "AppSpecific", }, { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", @@ -301,11 +332,13 @@ exports[`baseFunctionality getOpenQueueEntries should respect app instance confi exports[`baseFunctionality getOpenQueueEntries should respect app name configuration both open event relevant for app 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "AppName", "type": "AppSpecific", }, { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", @@ -316,6 +349,7 @@ exports[`baseFunctionality getOpenQueueEntries should respect app name configura exports[`baseFunctionality getOpenQueueEntries should respect app name configuration one open event for app and one not for this app 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", @@ -326,11 +360,13 @@ exports[`baseFunctionality getOpenQueueEntries should respect app name configura exports[`baseFunctionality getOpenQueueEntries should respect app name configuration one open event for app and one not for this app but redis should ignore filter 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "AppName", "type": "AppSpecific", }, { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", diff --git a/test/__snapshots__/eventQueueOutbox.test.js.snap b/test/__snapshots__/eventQueueOutbox.test.js.snap index 33f65d63..9ce8cc2c 100644 --- a/test/__snapshots__/eventQueueOutbox.test.js.snap +++ b/test/__snapshots__/eventQueueOutbox.test.js.snap @@ -419,26 +419,31 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true req.data should be stored exports[`event-queue outbox monkeyPatchCAPOutbox=true return open event types 1`] = ` [ { + "count": 1, "namespace": "default", "subType": "NotificationService", "type": "CAP_OUTBOX", }, { + "count": 1, "namespace": "default", "subType": "NotificationServicePeriodic.main", "type": "CAP_OUTBOX_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "NotificationServicePeriodic.randomOffset", "type": "CAP_OUTBOX_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "QueueService.main", "type": "CAP_OUTBOX_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "sapafcsdk.scheduling.ProviderService.timeBucketAction", "type": "CAP_OUTBOX_PERIODIC", diff --git a/test/dbHandlerStats.test.js b/test/dbHandlerStats.test.js new file mode 100644 index 00000000..2bcd8565 --- /dev/null +++ b/test/dbHandlerStats.test.js @@ -0,0 +1,359 @@ +"use strict"; + +const path = require("path"); + +const mockRedis = require("./mocks/redisMock"); +jest.mock("../src/shared/redis", () => mockRedis); + +const cds = require("@sap/cds/lib"); + +const eventQueue = require("../src"); +const config = require("../src/config"); +const { getTenantStats, getGlobalStats, StatusField } = require("../src/shared/eventQueueStats"); +const { EventProcessingStatus } = require("../src/constants"); +const { processEventQueue } = require("../src/processEventQueue"); +const testHelper = require("./helper"); +const { Logger: mockLogger } = require("./mocks/logger"); + +const outboxProject = path.join(__dirname, "asset", "outboxProject"); + +cds.env.requires.StandardService = { + impl: path.join(outboxProject, "srv/service/standard-service.js"), + outbox: { kind: "persistent-outbox" }, +}; + +cds.env.requires.NotificationService = { + impl: path.join(outboxProject, "srv/service/service.js"), + outbox: { kind: "persistent-outbox" }, +}; + +cds.test(outboxProject); + +describe("dbHandler - stats tracking via CAP outbox", () => { + let context, tx, loggerMock; + + beforeAll(async () => { + eventQueue.config.initialized = false; + await eventQueue.initialize({ + processEventsAfterPublish: false, + registerAsEventProcessor: false, + insertEventsBeforeCommit: true, + useAsCAPOutbox: true, + userId: "dummyTestUser", + }); + cds.emit("connect", await cds.connect.to("db")); + config.redisEnabled = true; + eventQueue.registerEventQueueDbHandler(cds.db); + loggerMock = mockLogger(); + }); + + beforeEach(async () => { + context = new cds.EventContext({ user: "testUser", tenant: 123 }); + tx = cds.tx(context); + await tx.run(DELETE.from("sap.eventqueue.Lock")); + await tx.run(DELETE.from("sap.eventqueue.Event")); + await commitAndOpenNew(); + mockRedis.clearState(); + jest.clearAllMocks(); + }); + + afterEach(async () => { + await tx.rollback(); + }); + + afterAll(async () => { + config.redisEnabled = false; + await cds.shutdown; + }); + + it("increments pending counter by 1 after single send and commit", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(1); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(1); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("accumulates pending counter for multiple sends in same transaction", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(3); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(3); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("does not increment counter when transaction is rolled back", async () => { + const innerTx = cds.tx(context); + const service = (await cds.connect.to("StandardService")).tx(innerTx.context); + await service.send("main", {}); + await innerTx.rollback(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + }); + + it("does not increment counter when redisEnabled is false", async () => { + config.redisEnabled = false; + try { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + } finally { + config.redisEnabled = true; + } + }); + + it("tracks stats per tenant while global counter aggregates", async () => { + // tenant 123: send 2 events + const service123 = (await cds.connect.to("StandardService")).tx(context); + await service123.send("main", {}); + await service123.send("main", {}); + await commitAndOpenNew(); + + // tenant 456: send 1 event in its own tx + const ctx456 = new cds.EventContext({ user: "testUser", tenant: 456 }); + const tx456 = cds.tx(ctx456); + const service456 = (await cds.connect.to("StandardService")).tx(ctx456); + await service456.send("main", {}); + await tx456.commit(); + + const stats123 = await getTenantStats(123); + expect(stats123[StatusField.Pending]).toBe(2); + + const stats456 = await getTenantStats(456); + expect(stats456[StatusField.Pending]).toBe(1); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(3); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + describe("UPDATE handler — direct status transitions", () => { + it("Open → InProgress: decrements pending, increments inProgress", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(2); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + expect(globalStats[StatusField.InProgress]).toBe(2); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("InProgress → Done: decrements inProgress, no pending change", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); // pending=0, inProgress=2 + + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Done }) + .where({ status: EventProcessingStatus.InProgress }) + ); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("InProgress → Error: decrements inProgress, increments pending (will be retried)", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); // pending=0, inProgress=2 + + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Error }) + .where({ status: EventProcessingStatus.InProgress }) + ); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(2); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(2); + expect(globalStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("InProgress → Exceeded: decrements inProgress, no pending change", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); // pending=0, inProgress=2 + + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Exceeded }) + .where({ status: EventProcessingStatus.InProgress }) + ); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("two UPDATEs in one transaction accumulate into a single succeeded handler call", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + // Open→InProgress then InProgress→Done without an intermediate commit + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Done }) + .where({ status: EventProcessingStatus.InProgress }) + ); + await commitAndOpenNew(); + + // Net delta: pending -2, inProgress +2 then -2 → both counters at 0 + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("does not adjust counters when redisEnabled is false", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + config.redisEnabled = false; + try { + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(2); + expect(tenantStats[StatusField.InProgress]).toBe(0); + } finally { + config.redisEnabled = true; + } + }); + + it("does not adjust counters when transaction is rolled back", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + const innerTx = cds.tx(context); + await innerTx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await innerTx.rollback(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(2); + expect(tenantStats[StatusField.InProgress]).toBe(0); + }); + }); + + describe("processEventQueue integration — stats via real processing", () => { + it("successful processing transitions pending → inProgress → Done (counters reach zero)", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await commitAndOpenNew(); + + expect((await getTenantStats(123))[StatusField.Pending]).toBe(1); + expect((await getTenantStats(123))[StatusField.InProgress]).toBe(0); + + await processEventQueue(tx.context, "CAP_OUTBOX", "StandardService"); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + expect(globalStats[StatusField.InProgress]).toBe(0); + + await testHelper.selectEventQueueAndExpectDone(tx, { expectedLength: 1 }); + }); + + it("failed processing transitions pending → inProgress → Error → back to pending", async () => { + const service = cds.outboxed(await cds.connect.to("NotificationService")).tx(context); + await service.send("errorEvent", { to: "to", subject: "subject", body: "body" }); + await commitAndOpenNew(); + + expect((await getTenantStats(123))[StatusField.Pending]).toBe(1); + expect((await getTenantStats(123))[StatusField.InProgress]).toBe(0); + + await processEventQueue(tx.context, "CAP_OUTBOX", "NotificationService"); + await commitAndOpenNew(); + + // Error state means the event will be retried → counts as pending + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(1); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(1); + expect(globalStats[StatusField.InProgress]).toBe(0); + + await testHelper.selectEventQueueAndExpectError(tx, { expectedLength: 1 }); + }); + }); + + const commitAndOpenNew = async () => { + await tx.commit(); + context = new cds.EventContext({ user: "testUser", tenant: 123 }); + tx = cds.tx(context); + }; +}); diff --git a/test/eventQueueStats.test.js b/test/eventQueueStats.test.js new file mode 100644 index 00000000..0e908046 --- /dev/null +++ b/test/eventQueueStats.test.js @@ -0,0 +1,185 @@ +"use strict"; + +const path = require("path"); + +const cds = require("@sap/cds/lib"); + +const mockRedis = require("./mocks/redisMock"); +jest.mock("../src/shared/redis", () => mockRedis); + +const eventQueue = require("../src"); +const { + StatusField, + incrementCounters, + decrementCounters, + adjustTenantCounter, + adjustGlobalCounter, + getTenantStats, + getGlobalStats, + deleteTenantStats, + resetInProgressCounters, +} = require("../src/shared/eventQueueStats"); + +const project = __dirname + "/.."; +cds.test(project); + +describe("eventQueueStats", () => { + beforeAll(async () => { + const configFilePath = path.join(__dirname, "asset", "config.yml"); + await eventQueue.initialize({ + configFilePath, + processEventsAfterPublish: false, + registerAsEventProcessor: false, + }); + }); + + beforeEach(() => { + mockRedis.clearState(); + }); + + afterAll(() => cds.shutdown); + + describe("incrementCounters / decrementCounters", () => { + it("increments tenant and global counters", async () => { + await incrementCounters("t1", StatusField.Pending, 3); + + const tenant = await getTenantStats("t1"); + expect(tenant.pending).toBe(3); + expect(tenant.inProgress).toBe(0); + + const global = await getGlobalStats(); + expect(global.pending).toBe(3); + }); + + it("decrements tenant and global counters", async () => { + await incrementCounters("t1", StatusField.Pending, 5); + await decrementCounters("t1", StatusField.Pending, 2); + + const tenant = await getTenantStats("t1"); + expect(tenant.pending).toBe(3); + + const global = await getGlobalStats(); + expect(global.pending).toBe(3); + }); + + it("multiple tenants are tracked independently", async () => { + await incrementCounters("t1", StatusField.Pending, 2); + await incrementCounters("t2", StatusField.Pending, 5); + + expect((await getTenantStats("t1")).pending).toBe(2); + expect((await getTenantStats("t2")).pending).toBe(5); + }); + + it("global counter aggregates across all tenants", async () => { + await incrementCounters("t1", StatusField.InProgress, 1); + await incrementCounters("t2", StatusField.InProgress, 4); + + const global = await getGlobalStats(); + expect(global.inProgress).toBe(5); + }); + }); + + describe("adjustTenantCounter", () => { + it("creates hash with zero base when first incremented", async () => { + await adjustTenantCounter("t1", StatusField.InProgress, 1); + + const stats = await getTenantStats("t1"); + expect(stats.inProgress).toBe(1); + expect(stats.pending).toBe(0); + }); + + it("supports negative increments", async () => { + await adjustTenantCounter("t1", StatusField.Pending, 10); + await adjustTenantCounter("t1", StatusField.Pending, -3); + + expect((await getTenantStats("t1")).pending).toBe(7); + }); + }); + + describe("adjustGlobalCounter", () => { + it("increments the global counter for the given field", async () => { + await adjustGlobalCounter(StatusField.Pending, 7); + + const global = await getGlobalStats(); + expect(global.pending).toBe(7); + }); + }); + + describe("getTenantStats", () => { + it("returns all-zero object for unknown tenant", async () => { + const stats = await getTenantStats("unknown-tenant"); + expect(stats).toEqual({ pending: 0, inProgress: 0 }); + }); + }); + + describe("getGlobalStats", () => { + it("returns all-zero object when no data exists", async () => { + const stats = await getGlobalStats(); + expect(stats).toEqual({ pending: 0, inProgress: 0 }); + }); + }); + + describe("deleteTenantStats", () => { + it("removes the tenant hash", async () => { + await incrementCounters("t1", StatusField.Pending, 5); + await deleteTenantStats("t1"); + + const stats = await getTenantStats("t1"); + expect(stats).toEqual({ pending: 0, inProgress: 0 }); + }); + + it("does not throw when tenant does not exist", async () => { + await expect(deleteTenantStats("nonexistent")).resolves.toBeUndefined(); + }); + }); + + describe("resetInProgressCounters", () => { + it("resets global inProgress to 0 for all configured namespaces", async () => { + await incrementCounters("t1", StatusField.InProgress, 5); + + await resetInProgressCounters(); + + const global = await getGlobalStats(); + expect(global.inProgress).toBe(0); + }); + + it("does not touch the pending counter", async () => { + await incrementCounters("t1", StatusField.Pending, 3); + await incrementCounters("t1", StatusField.InProgress, 2); + + await resetInProgressCounters(); + + const global = await getGlobalStats(); + expect(global.pending).toBe(3); + expect(global.inProgress).toBe(0); + }); + + it("resets inProgress in tenant hash keys found via scan", async () => { + await incrementCounters("t1", StatusField.InProgress, 4); + await incrementCounters("t2", StatusField.InProgress, 2); + + await resetInProgressCounters(); + + const t1 = await getTenantStats("t1"); + expect(t1.inProgress).toBe(0); + + const t2 = await getTenantStats("t2"); + expect(t2.inProgress).toBe(0); + }); + + it("preserves tenant pending counter after reset", async () => { + await incrementCounters("t1", StatusField.Pending, 7); + await incrementCounters("t1", StatusField.InProgress, 3); + + await resetInProgressCounters(); + + const t1 = await getTenantStats("t1"); + expect(t1.pending).toBe(7); + expect(t1.inProgress).toBe(0); + }); + + it("resolves without error when no keys exist", async () => { + await expect(resetInProgressCounters()).resolves.toBeUndefined(); + }); + }); +}); diff --git a/test/mocks/redisMock.js b/test/mocks/redisMock.js index a0f03202..1465ebcb 100644 --- a/test/mocks/redisMock.js +++ b/test/mocks/redisMock.js @@ -2,7 +2,8 @@ let state = {}; let testState = {}; -const _createMainClientAndConnect = async () => ({ + +const _buildClient = () => ({ get: async (key) => state[key]?.value ?? null, exists: async (key) => Object.prototype.hasOwnProperty.call(state, key), set: async (key, value, options) => { @@ -13,15 +14,92 @@ const _createMainClientAndConnect = async () => ({ testState[key] = { value, options }; return "OK"; }, - del: async (key) => delete state[key], + del: async (key) => { + const existed = Object.prototype.hasOwnProperty.call(state, key); + delete state[key]; + return existed ? 1 : 0; + }, + hIncrBy: async (key, field, increment) => { + if (!state[key]) { + state[key] = { hash: {} }; + } + const current = parseInt(state[key].hash[field] ?? "0", 10); + state[key].hash[field] = String(current + increment); + return current + increment; + }, + hSet: async (key, field, value) => { + if (!state[key]) { + state[key] = { hash: {} }; + } + const isNew = !Object.prototype.hasOwnProperty.call(state[key].hash, field); + state[key].hash[field] = String(value); + return isNew ? 1 : 0; + }, + hGetAll: async (key) => { + return state[key]?.hash ?? {}; + }, + scanIterator: ({ MATCH } = {}) => { + const regex = MATCH + ? new RegExp( + "^" + + MATCH.replace(/[.+^${}()|[\]\\]/g, "\\$&") + .replace(/\*/g, ".*") + .replace(/\?/g, ".") + + "$" + ) + : null; + const matchingKeys = Object.keys(state).filter((k) => !regex || regex.test(k)); + return (async function* () { + for (const key of matchingKeys) { + yield key; + } + })(); + }, + multi: () => { + const ops = []; + const pipeline = { + hIncrBy: (key, field, increment) => { + ops.push(async () => { + if (!state[key]) { + state[key] = { hash: {} }; + } + const current = parseInt(state[key].hash[field] ?? "0", 10); + state[key].hash[field] = String(current + increment); + return current + increment; + }); + return pipeline; + }, + hSet: (key, field, value) => { + ops.push(async () => { + if (!state[key]) { + state[key] = { hash: {} }; + } + state[key].hash[field] = String(value); + }); + return pipeline; + }, + exec: async () => { + const results = []; + for (const op of ops) { + results.push(await op()); + } + return results; + }, + }; + return pipeline; + }, _: { state, }, }); +const _createMainClientAndConnect = async () => _buildClient(); + module.exports = { attachRedisUnsubscribeHandler: () => {}, subscribeRedisChannel: () => {}, + publishMessage: async () => {}, + isClusterMode: () => false, createClientAndConnect: _createMainClientAndConnect, createMainClientAndConnect: _createMainClientAndConnect, closeSubscribeClient: () => {}, diff --git a/test/redisPubSub.test.js b/test/redisPubSub.test.js index 37fcd9ed..24df9362 100644 --- a/test/redisPubSub.test.js +++ b/test/redisPubSub.test.js @@ -10,6 +10,7 @@ const setTimeoutSpy = jest.spyOn(global, "setTimeout").mockImplementation((first }); const distributedLock = require("../src/shared/distributedLock"); +const eventQueueStats = require("../src/shared/eventQueueStats"); const checkLockExistsSpy = jest.spyOn(distributedLock, "checkLockExists"); const config = require("../src/config"); const redisPub = require("../src/redis/redisPub"); @@ -67,6 +68,7 @@ describe("eventQueue Redis Events and DB Handlers", () => { eventQueue.registerEventQueueDbHandler(cds.db); loggerMock = mockLogger(); jest.spyOn(cds.utils, "uuid").mockReturnValue("6e31047a-d2b5-4e3c-83d8-deab20165956"); + jest.spyOn(eventQueueStats, "incrementCounters").mockResolvedValue(); }); beforeEach(async () => {