Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
**/.project
**/.settings
*.code-workspace
CLAUDE.md

# package managers
bower_components/
Expand All @@ -30,6 +31,7 @@ data/afc.sqlite-wal
data/test.sqlite
coverage
Claude.md
.claude/

# temp
temp/
Expand Down
127 changes: 103 additions & 24 deletions src/dbHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ const cds = require("@sap/cds");

const redisPub = require("./redis/redisPub");
const config = require("./config");
const eventQueueStats = require("./shared/eventQueueStats");
const { EventProcessingStatus } = require("./constants");

const COMPONENT_NAME = "/eventQueue/dbHandler";
const registeredHandlers = {
eventQueueDbHandler: false,
beforeDbHandler: false,
updateDbHandler: false,
};

const registerEventQueueDbHandler = (dbService) => {
Expand All @@ -26,36 +29,112 @@ const registerEventQueueDbHandler = (dbService) => {
req.tx._.eventQueuePublishEvents = req.tx._.eventQueuePublishEvents ?? {};
const eventQueuePublishEvents = req.tx._.eventQueuePublishEvents;
const data = Array.isArray(req.query.INSERT.entries) ? req.query.INSERT.entries : [req.query.INSERT.entries];
const eventCombinations = Object.keys(
data.reduce((result, event) => {
const key = [event.type, event.subType, event.namespace].join("##");
if (
!config.hasEventAfterCommitFlag(event.type, event.subType, event.namespace) ||
eventQueuePublishEvents[key]
) {
return result;
}

req.tx._.eventQueueStatsOpenCount = (req.tx._.eventQueueStatsOpenCount ?? 0) + data.length;
const newCombinations = data.reduce((result, event) => {
const key = [event.type, event.subType, event.namespace].join("##");
if (config.hasEventAfterCommitFlag(event.type, event.subType, event.namespace) && !eventQueuePublishEvents[key]) {
eventQueuePublishEvents[key] = true;
result[key] = true;
return result;
}, {})
);
result.push(key);
}
return result;
}, []);

eventCombinations.length &&
req.tx._.eventQueueBroadcastCombinations ??= [];
req.tx._.eventQueueBroadcastCombinations.push(...newCombinations);
if (!req.tx._.eventQueueSucceededHandlerRegistered) {
req.tx._.eventQueueSucceededHandlerRegistered = true;
req.on("succeeded", () => {
const events = eventCombinations.map((eventCombination) => {
const [type, subType, namespace] = eventCombination.split("##");
return { type, subType, namespace };
});

redisPub.broadcastEvent(req.tenant, events).catch((err) => {
cds.log(COMPONENT_NAME).error("db handler failure during broadcasting event", err, {
tenant: req.tenant,
events,
if (config.redisEnabled && req.tx._.eventQueueStatsOpenCount) {
eventQueueStats
.incrementCounters(req.tenant, eventQueueStats.StatusField.Pending, req.tx._.eventQueueStatsOpenCount)
.catch((err) => {
cds.log(COMPONENT_NAME).error("db handler failure during updating event stats", err, {
tenant: req.tenant,
});
});
}
const combinations = req.tx._.eventQueueBroadcastCombinations;
if (combinations.length) {
const events = combinations.map((combination) => {
const [type, subType, namespace] = combination.split("##");
return { type, subType, namespace };
});
});
redisPub.broadcastEvent(req.tenant, events).catch((err) => {
cds.log(COMPONENT_NAME).error("db handler failure during broadcasting event", err, {
tenant: req.tenant,
events,
});
});
}
});
}
});

if (!registeredHandlers.updateDbHandler) {
registeredHandlers.updateDbHandler = true;
dbService.after("UPDATE", def, (count, req) => {
const newStatus = req.query.UPDATE?.data?.status;
if (newStatus == null) {
return;
}

req.tx._ = req.tx._ ?? {};
req.tx._.eventQueueStatsPendingDelta = req.tx._.eventQueueStatsPendingDelta ?? 0;
req.tx._.eventQueueStatsInProgressDelta = req.tx._.eventQueueStatsInProgressDelta ?? 0;

if (newStatus === EventProcessingStatus.InProgress) {
req.tx._.eventQueueStatsPendingDelta -= count;
req.tx._.eventQueueStatsInProgressDelta += count;
} else if (newStatus === EventProcessingStatus.Error) {
req.tx._.eventQueueStatsInProgressDelta -= count;
req.tx._.eventQueueStatsPendingDelta += count;
} else if (
newStatus === EventProcessingStatus.Done ||
newStatus === EventProcessingStatus.Exceeded ||
newStatus === EventProcessingStatus.Suspended
) {
req.tx._.eventQueueStatsInProgressDelta -= count;
}

if (!req.tx._.eventQueueUpdateSucceededHandlerRegistered) {
req.tx._.eventQueueUpdateSucceededHandlerRegistered = true;
req.on("succeeded", () => {
if (!config.redisEnabled) {
return;
}

const pendingDelta = req.tx._.eventQueueStatsPendingDelta;
const inProgressDelta = req.tx._.eventQueueStatsInProgressDelta;
const ops = [];

if (pendingDelta !== 0) {
ops.push(
eventQueueStats.adjustTenantCounter(req.tenant, eventQueueStats.StatusField.Pending, pendingDelta),
eventQueueStats.adjustGlobalCounter(eventQueueStats.StatusField.Pending, pendingDelta)
);
}
if (inProgressDelta !== 0) {
ops.push(
eventQueueStats.adjustTenantCounter(req.tenant, eventQueueStats.StatusField.InProgress, inProgressDelta),
eventQueueStats.adjustGlobalCounter(eventQueueStats.StatusField.InProgress, inProgressDelta)
);
}
Promise.allSettled(ops).then((results) => {
for (const result of results) {
if (result.status === "rejected") {
cds
.log(COMPONENT_NAME)
.error("db handler failure during updating event stats on update", result.reason, {
tenant: req.tenant,
});
}
}
});
});
}
});
}
};

module.exports = {
Expand Down
2 changes: 2 additions & 0 deletions src/initialize.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const { getAllTenantIds } = require("./shared/cdsHelper");
const { EventProcessingStatus } = require("./constants");
const distributedLock = require("./shared/distributedLock");
const EventQueueError = require("./EventQueueError");
const { initMetrics } = require("./shared/openTelemetry");

const readFileAsync = promisify(fs.readFile);

Expand Down Expand Up @@ -125,6 +126,7 @@ const initialize = async (options = {}) => {
runInterval: config.runInterval,
useAsCAPQueue: config.useAsCAPQueue,
});
initMetrics();
resolveFn();
};

Expand Down
12 changes: 6 additions & 6 deletions src/runner/openEvents.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ const getOpenQueueEntries = async (tx, filterAppSpecificEvents = true) => {
new Date(startTime.getTime() - 30 * MS_IN_DAYS).toISOString(),
")"
)
.columns("type", "subType", "namespace")
.columns("count(ID) as count", "type", "subType", "namespace")
.groupBy("type", "subType", "namespace")
);

const result = [];
for (const { type, subType, namespace } of entries) {
for (const { type, subType, namespace, count } of entries) {
if (config.isCapOutboxEvent(type)) {
const { srvName, actionName } = config.normalizeSubType(type, subType);
try {
Expand All @@ -48,14 +48,14 @@ const getOpenQueueEntries = async (tx, filterAppSpecificEvents = true) => {
config.addCAPServiceWithoutEnvConfig(subType, service);
}
if (config.shouldBeProcessedInThisApplication(type, subType, namespace)) {
result.push({ namespace, type, subType });
result.push({ namespace, type, subType, count });
}
}
} catch {
/* ignore catch */
} finally {
if (!filterAppSpecificEvents) {
result.push({ namespace, type, subType });
result.push({ namespace, type, subType, count });
}
}
} else {
Expand All @@ -64,10 +64,10 @@ const getOpenQueueEntries = async (tx, filterAppSpecificEvents = true) => {
config.getEventConfig(type, subType, namespace) &&
config.shouldBeProcessedInThisApplication(type, subType, namespace)
) {
result.push({ namespace, type, subType });
result.push({ namespace, type, subType, count });
}
} else {
result.push({ namespace, type, subType });
result.push({ namespace, type, subType, count });
}
}
}
Expand Down
33 changes: 32 additions & 1 deletion src/runner/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const common = require("../shared/common");
const config = require("../config");
const redisPub = require("../redis/redisPub");
const openEvents = require("./openEvents");
const eventQueueStats = require("../shared/eventQueueStats");
const { runEventCombinationForTenant } = require("./runnerHelper");
const { trace } = require("../shared/openTelemetry");

Expand Down Expand Up @@ -141,7 +142,7 @@ const _executeEventsAllTenantsRedis = async (tenantIds) => {
} catch (err) {
logger.error("executing event queue run for multi instance and tenant failed", err);
}

const tenantCounts = {};
for (const tenantId of tenantIds) {
try {
await cds.tx({ tenant: tenantId }, async (tx) => {
Expand All @@ -160,6 +161,16 @@ const _executeEventsAllTenantsRedis = async (tenantIds) => {
tenantId,
entries: entries.length,
});
tenantCounts[tenantId] = entries;
const pendingByNamespace = Object.fromEntries(config.processingNamespaces.map((name) => [name, 0]));
for (const entry of entries) {
pendingByNamespace[entry.namespace] = (pendingByNamespace[entry.namespace] ?? 0) + entry.count;
}
for (const [namespace, count] of Object.entries(pendingByNamespace)) {
eventQueueStats
.setTenantCounter(tenantId, namespace, eventQueueStats.StatusField.Pending, count)
.catch((err) => logger.error("updating tenant stats failed", err, { tenantId, namespace }));
}
if (!entries.length) {
return;
}
Expand All @@ -178,6 +189,17 @@ const _executeEventsAllTenantsRedis = async (tenantIds) => {
logger.error("broadcasting events for tenant failed", { tenantId }, err);
}
}
const globalPendingByNamespace = Object.fromEntries(config.processingNamespaces.map((namespace) => [namespace, 0]));
for (const tenantEntries of Object.values(tenantCounts)) {
for (const entry of tenantEntries) {
globalPendingByNamespace[entry.namespace] = (globalPendingByNamespace[entry.namespace] ?? 0) + entry.count;
}
}
for (const [namespace, count] of Object.entries(globalPendingByNamespace)) {
eventQueueStats
.setGlobalCounter(namespace, eventQueueStats.StatusField.Pending, count)
.catch((err) => logger.error("updating global stats failed", err, { namespace }));
}
};

const _executeEventsAllTenants = async (tenantIds) => {
Expand Down Expand Up @@ -367,6 +389,15 @@ const _singleTenantRedis = async () => {
logger.info("broadcasting events for run", {
entries: entries.length,
});
const pendingByNamespace = Object.fromEntries(config.processingNamespaces.map((name) => [name, 0]));
for (const entry of entries) {
pendingByNamespace[entry.namespace] = (pendingByNamespace[entry.namespace] ?? 0) + entry.count;
}
for (const [namespace, count] of Object.entries(pendingByNamespace)) {
eventQueueStats
.setGlobalCounter(namespace, eventQueueStats.StatusField.Pending, count)
.catch((err) => logger.error("updating global stats failed", err, { namespace }));
}
if (!entries.length) {
return;
}
Expand Down
Loading