Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion listener/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import { NotificationAPI } from './services/notification-api';
import { CleanupService } from './services/cleanup-service';
import { initializeDatabase } from './database/database';
import { DiscordNotificationService } from './services/discord-notification';
import {
IndexingReconciliationEngine,
createDefaultAlertSink,
} from './services/indexing-reconciliation-engine';
import { eventRegistry } from './store/event-registry';
import logger from './utils/logger';
import { loadConfig, ConfigError } from './config';
Expand All @@ -25,19 +29,28 @@ async function main() {
let notificationAPI: NotificationAPI | null = null;
let templateService: NotificationTemplateService | null = null;
let cleanupService: CleanupService | null = null;
let reconciliationEngine: IndexingReconciliationEngine | null = null;

try {
logger.info('Initializing database');
const db = await initializeDatabase(config.databasePath);

// Rebuild registry with configured event TTL
if (config.cleanup) {
eventRegistry['ttlMs'] = config.cleanup.eventRetentionMs;
eventRegistry.setTtlMs(config.cleanup.eventRetentionMs);
}

cleanupService = new CleanupService(db, eventRegistry, config.cleanup);
cleanupService.start();

reconciliationEngine = new IndexingReconciliationEngine({
db,
rpcUrl: config.stellarRpcUrl,
contractAddresses: config.contractAddresses.map((c) => c.address),
alertSink: createDefaultAlertSink(config.discord?.webhookUrl),
});
reconciliationEngine.start();

const templateRepository = new NotificationTemplateRepository(
db,
new TemplateAuditTrail(db),
Expand Down Expand Up @@ -86,6 +99,10 @@ async function main() {
await cleanupService.stop();
}

if (reconciliationEngine) {
reconciliationEngine.stop();
}

if (scheduler) {
await scheduler.stop();
}
Expand Down
81 changes: 81 additions & 0 deletions listener/src/services/indexing-gap-detector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
export interface ChainEventRef {
contractAddress: string;
eventId: string;
ledger: number;
txHash?: string;
}

export interface IndexedEventRef {
contractAddress: string;
eventId: string;
ledger: number;
txHash?: string;
}

export interface IndexingGap {
contractAddress: string;
eventId: string;
ledger: number;
txHash?: string;
}

export interface GapDetectionResult {
missingEvents: IndexingGap[];
missingLedgers: number[];
checkedOnChain: number;
checkedIndexed: number;
}

function fingerprint(contractAddress: string, eventId: string): string {
return `${contractAddress}:${eventId}`;
}

/**
* Detect gaps between on-chain events and indexed store records.
*
* - missingEvents: chain events that are not present in the indexed set
* - missingLedgers: ledgers where the chain emitted >=1 event, but the index has none
*/
export function detectIndexingGaps(
onChain: ChainEventRef[],
indexed: IndexedEventRef[]
): GapDetectionResult {
const indexedSet = new Set(indexed.map((e) => fingerprint(e.contractAddress, e.eventId)));
const indexedByLedger = new Map<number, number>();
indexed.forEach((e) => indexedByLedger.set(e.ledger, (indexedByLedger.get(e.ledger) ?? 0) + 1));

const chainByLedger = new Map<number, ChainEventRef[]>();
onChain.forEach((e) => {
const ledgerEvents = chainByLedger.get(e.ledger) ?? [];
ledgerEvents.push(e);
chainByLedger.set(e.ledger, ledgerEvents);
});

const missingEvents: IndexingGap[] = [];
for (const ev of onChain) {
if (!indexedSet.has(fingerprint(ev.contractAddress, ev.eventId))) {
missingEvents.push({
contractAddress: ev.contractAddress,
eventId: ev.eventId,
ledger: ev.ledger,
txHash: ev.txHash,
});
}
}

const missingLedgers: number[] = [];
for (const [ledger, events] of chainByLedger.entries()) {
if (events.length > 0 && (indexedByLedger.get(ledger) ?? 0) === 0) {
missingLedgers.push(ledger);
}
}
missingLedgers.sort((a, b) => a - b);

return {
missingEvents: missingEvents.sort((a, b) => a.ledger - b.ledger),
missingLedgers,
checkedOnChain: onChain.length,
checkedIndexed: indexed.length,
};
}

61 changes: 61 additions & 0 deletions listener/src/services/indexing-reconciliation-engine.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import type { GapAlertSink } from './indexing-reconciliation-engine';
import { IndexingReconciliationEngine } from './indexing-reconciliation-engine';

jest.mock('../utils/logger', () => ({
__esModule: true,
default: {
debug: jest.fn(),
info: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
},
}));

// We keep this unit test fully isolated: no RPC calls, no SQLite.
describe('IndexingReconciliationEngine', () => {
it('detects a missing on-chain event, logs it, and triggers an alert', async () => {
const logger = (await import('../utils/logger')).default as any;

const alertSink: GapAlertSink = {
notify: jest.fn(async () => {}),
};

const engine = new IndexingReconciliationEngine({
db: {} as any,
rpcUrl: 'http://example.invalid',
contractAddresses: ['C1'],
alertSink,
getNetworkTipLedger: async () => 103,
fetchOnChainEvents: async () => [
{ contractAddress: 'C1', eventId: 'E1', ledger: 101, txHash: 'T1' },
{ contractAddress: 'C1', eventId: 'E2', ledger: 102, txHash: 'T2' },
{ contractAddress: 'C1', eventId: 'E3', ledger: 103, txHash: 'T3' },
],
fetchIndexedEvents: async () => [
{ contractAddress: 'C1', eventId: 'E1', ledger: 101, txHash: 'T1' },
// Missing E2 (ledger 102)
{ contractAddress: 'C1', eventId: 'E3', ledger: 103, txHash: 'T3' },
],
});

await engine.runOnce();

expect(logger.error).toHaveBeenCalledWith(
'Indexing gap detected',
expect.objectContaining({
contractAddress: 'C1',
missingEvents: 1,
})
);
expect(alertSink.notify).toHaveBeenCalledTimes(1);
expect((alertSink.notify as jest.Mock).mock.calls[0][0]).toEqual(
expect.objectContaining({
contractAddress: 'C1',
gaps: expect.objectContaining({
missingEvents: [expect.objectContaining({ eventId: 'E2', ledger: 102 })],
missingLedgers: [102],
}),
})
);
});
});
Loading
Loading