From 8dd50a3a92985a5221c95cb7d64458ffb3551459 Mon Sep 17 00:00:00 2001 From: Emmzyemms Date: Mon, 22 Jun 2026 11:21:46 +0100 Subject: [PATCH 1/2] Feat: Add backend observability for contract event schema drifts --- .../schema-drift-detector.unit.spec.ts | 143 +++++++ .../schema-observability.unit.spec.ts | 177 +++++++++ ...soroban-event-indexer.service.unit.spec.ts | 11 +- ...an-event.parser.observability.unit.spec.ts | 234 ++++++++++++ app/backend/src/ingestion/ingestion.module.ts | 8 +- .../src/ingestion/parser-diagnostics.types.ts | 48 +++ .../src/ingestion/parser-health.controller.ts | 53 +++ .../src/ingestion/schema-drift-detector.ts | 129 +++++++ .../ingestion/schema-observability.service.ts | 356 ++++++++++++++++++ .../soroban-event-indexer.service.ts | 56 ++- .../src/ingestion/soroban-event.parser.ts | 109 +++++- app/backend/src/metrics/metrics.service.ts | 94 +++++ 12 files changed, 1409 insertions(+), 9 deletions(-) create mode 100644 app/backend/src/ingestion/__tests__/schema-drift-detector.unit.spec.ts create mode 100644 app/backend/src/ingestion/__tests__/schema-observability.unit.spec.ts create mode 100644 app/backend/src/ingestion/__tests__/soroban-event.parser.observability.unit.spec.ts create mode 100644 app/backend/src/ingestion/parser-diagnostics.types.ts create mode 100644 app/backend/src/ingestion/parser-health.controller.ts create mode 100644 app/backend/src/ingestion/schema-drift-detector.ts create mode 100644 app/backend/src/ingestion/schema-observability.service.ts diff --git a/app/backend/src/ingestion/__tests__/schema-drift-detector.unit.spec.ts b/app/backend/src/ingestion/__tests__/schema-drift-detector.unit.spec.ts new file mode 100644 index 000000000..c96000fa6 --- /dev/null +++ b/app/backend/src/ingestion/__tests__/schema-drift-detector.unit.spec.ts @@ -0,0 +1,143 @@ +import { xdr, nativeToScVal } from "@stellar/stellar-sdk"; +import { SchemaDriftDetector } from "../schema-drift-detector"; +import type { SorobanEventType } from "../types/contract-event.types"; + +function mapVal(entries: Record): xdr.ScVal { + const mapEntries = Object.entries(entries).map( + ([k, v]) => new xdr.ScMapEntry({ key: xdr.ScVal.scvSymbol(k), val: v }), + ); + return xdr.ScVal.scvMap(mapEntries); +} + +const TOKEN = "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC"; +function addressVal(pubkey: string): xdr.ScVal { + return nativeToScVal(pubkey); +} + +describe("SchemaDriftDetector", () => { + const detector = new SchemaDriftDetector(); + + describe("detectFieldDrift – EscrowDeposited", () => { + const eventName: SorobanEventType = "EscrowDeposited"; + + it("returns no drift for a complete v2 payload", () => { + const map = detector.decodeToMap( + mapVal({ + amount_due: nativeToScVal(1_000n, { type: "i128" }), + amount_paid: nativeToScVal(1_000n, { type: "i128" }), + expires_at: nativeToScVal(9999999n, { type: "u64" }), + ledger_sequence: nativeToScVal(42, { type: "u32" }), + schema_version: nativeToScVal(2, { type: "u32" }), + timestamp: nativeToScVal(1700000000n, { type: "u64" }), + token: addressVal(TOKEN), + }), + ); + + const result = detector.detectFieldDrift(eventName, map); + expect(result.driftType).toBeNull(); + expect(result.missingFields).toHaveLength(0); + }); + + it("returns FIELD_MISMATCH when token is absent", () => { + const map = detector.decodeToMap( + mapVal({ + amount_due: nativeToScVal(1_000n, { type: "i128" }), + amount_paid: nativeToScVal(1_000n, { type: "i128" }), + expires_at: nativeToScVal(9999999n, { type: "u64" }), + timestamp: nativeToScVal(1700000000n, { type: "u64" }), + // token is missing + }), + ); + + const result = detector.detectFieldDrift(eventName, map); + expect(result.driftType).toBe("FIELD_MISMATCH"); + expect(result.missingFields).toContain("token"); + }); + + it("does NOT flag schema_version absence as drift (optional field)", () => { + const map = detector.decodeToMap( + mapVal({ + amount_due: nativeToScVal(1_000n, { type: "i128" }), + amount_paid: nativeToScVal(1_000n, { type: "i128" }), + expires_at: nativeToScVal(9999999n, { type: "u64" }), + timestamp: nativeToScVal(1700000000n, { type: "u64" }), + token: addressVal(TOKEN), + // schema_version intentionally absent (v1 legacy) + }), + ); + + const result = detector.detectFieldDrift(eventName, map); + expect(result.driftType).toBeNull(); + expect(result.missingFields).toHaveLength(0); + }); + + it("reports unexpected extra fields without triggering FIELD_MISMATCH", () => { + const map = detector.decodeToMap( + mapVal({ + amount_due: nativeToScVal(1_000n, { type: "i128" }), + amount_paid: nativeToScVal(1_000n, { type: "i128" }), + expires_at: nativeToScVal(9999999n, { type: "u64" }), + timestamp: nativeToScVal(1700000000n, { type: "u64" }), + token: addressVal(TOKEN), + new_field_from_future_contract: nativeToScVal("future", { type: "string" }), + }), + ); + + const result = detector.detectFieldDrift(eventName, map); + expect(result.driftType).toBeNull(); // not a hard rejection + expect(result.unexpectedFields).toContain("new_field_from_future_contract"); + }); + + it("missingFields are sorted alphabetically", () => { + const map = detector.decodeToMap( + mapVal({ + // Only timestamp present; amount_due, amount_paid, expires_at, token missing + timestamp: nativeToScVal(1700000000n, { type: "u64" }), + }), + ); + + const result = detector.detectFieldDrift(eventName, map); + expect(result.driftType).toBe("FIELD_MISMATCH"); + expect(result.missingFields).toEqual([...result.missingFields].sort()); + }); + }); + + describe("detectFieldDrift – unknown event name", () => { + it("returns no drift for unknown event names (handled elsewhere)", () => { + const result = detector.detectFieldDrift( + "UnknownEvent" as SorobanEventType, + {}, + ); + expect(result.driftType).toBeNull(); + expect(result.missingFields).toHaveLength(0); + }); + }); + + describe("decodeToMap", () => { + it("returns an empty map for void ScVal", () => { + const result = detector.decodeToMap(xdr.ScVal.scvVoid()); + expect(result).toEqual({}); + }); + + it("correctly decodes key-value map entries", () => { + const data = mapVal({ + foo: nativeToScVal(42n, { type: "i128" }), + }); + const result = detector.decodeToMap(data); + expect(Object.keys(result)).toContain("foo"); + }); + }); + + describe("extractSafePayloadSnapshot", () => { + it("returns a string-keyed snapshot without throwing", () => { + const data = mapVal({ + amount: nativeToScVal(12345n, { type: "i128" }), + token: addressVal(TOKEN), + }); + const snapshot = detector.extractSafePayloadSnapshot(data); + expect(snapshot).toHaveProperty("amount"); + expect(snapshot).toHaveProperty("token"); + expect(typeof snapshot.amount).toBe("string"); + }); + }); +}); diff --git a/app/backend/src/ingestion/__tests__/schema-observability.unit.spec.ts b/app/backend/src/ingestion/__tests__/schema-observability.unit.spec.ts new file mode 100644 index 000000000..cc7eb4c07 --- /dev/null +++ b/app/backend/src/ingestion/__tests__/schema-observability.unit.spec.ts @@ -0,0 +1,177 @@ +import { SchemaObservabilityService } from "../schema-observability.service"; +import type { RawHorizonContractEvent } from "../soroban-event.parser"; + +function makeRaw( + overrides: Partial = {}, +): RawHorizonContractEvent { + return { + id: "1", + paging_token: "100-1", + transaction_hash: "txhash", + ledger: 100, + created_at: "2025-01-01T00:00:00Z", + contract_id: "CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAWHF", + type: "contract", + topic: [], + value: { xdr: "" }, + ...overrides, + }; +} + +/** Minimal MetricsService stub */ +function makeMetricsStub() { + return { + recordUnknownEvent: jest.fn(), + recordFieldMismatch: jest.fn(), + recordParserRejection: jest.fn(), + recordUnexpectedFields: jest.fn(), + recordUnknownSchemaVersion: jest.fn(), + recordError: jest.fn(), + }; +} + +/** Minimal SentryService stub */ +function makeSentryStub() { + return { + captureMessage: jest.fn(), + captureException: jest.fn(), + }; +} + +describe("SchemaObservabilityService", () => { + let service: SchemaObservabilityService; + let metrics: ReturnType; + let sentry: ReturnType; + + beforeEach(() => { + metrics = makeMetricsStub(); + sentry = makeSentryStub(); + service = new SchemaObservabilityService( + metrics as never, + sentry as never, + ); + }); + + describe("recordUnknownEvent", () => { + it("calls metrics.recordUnknownEvent and metrics.recordParserRejection", () => { + const raw = makeRaw(); + service.recordUnknownEvent(raw, "SomeWeirdEvent"); + expect(metrics.recordUnknownEvent).toHaveBeenCalledWith( + raw.contract_id, + "SomeWeirdEvent", + ); + expect(metrics.recordParserRejection).toHaveBeenCalledWith( + "SomeWeirdEvent", + "UNKNOWN_EVENT_NAME", + ); + }); + + it("increments windowRejectionCount", () => { + service.recordUnknownEvent(makeRaw(), "SomeWeirdEvent"); + const summary = service.getHealthSummary(); + expect(summary.windowRejectionCount).toBe(1); + expect(summary.rejectionsByDriftType["UNKNOWN_EVENT_NAME"]).toBe(1); + }); + + it("captures to Sentry", () => { + service.recordUnknownEvent(makeRaw(), "SomeWeirdEvent"); + expect(sentry.captureMessage).toHaveBeenCalledWith( + expect.stringContaining("UNKNOWN_EVENT_NAME"), + "warning", + expect.objectContaining({ driftType: "UNKNOWN_EVENT_NAME" }), + ); + }); + }); + + describe("recordFieldMismatch", () => { + it("calls metrics.recordFieldMismatch with sorted missing fields", () => { + service.recordFieldMismatch("EscrowDeposited", 2, makeRaw(), ["token", "amount_due"], []); + expect(metrics.recordFieldMismatch).toHaveBeenCalledWith( + "EscrowDeposited", + 2, + ["token", "amount_due"], + ); + }); + + it("adds to recentRejections ring-buffer", () => { + service.recordFieldMismatch("EscrowDeposited", 2, makeRaw({ paging_token: "999-1" }), ["token"], []); + const summary = service.getHealthSummary(); + const rejection = summary.recentRejections.find( + (r) => r.pagingToken === "999-1", + ); + expect(rejection).toBeDefined(); + expect(rejection?.driftType).toBe("FIELD_MISMATCH"); + expect(rejection?.missingFields).toContain("token"); + }); + }); + + describe("recordUnexpectedFields", () => { + it("calls metrics.recordUnexpectedFields but does NOT add to window (non-fatal)", () => { + service.recordUnexpectedFields("EscrowDeposited", 2, makeRaw(), ["future_field"]); + expect(metrics.recordUnexpectedFields).toHaveBeenCalledWith("EscrowDeposited", 2); + // Unexpected fields do not count toward rejection window + const summary = service.getHealthSummary(); + expect(summary.windowRejectionCount).toBe(0); + }); + }); + + describe("alert threshold", () => { + it("sets alertFiring=true when windowRejectionCount >= ALERT_THRESHOLD", () => { + for (let i = 0; i < SchemaObservabilityService.ALERT_THRESHOLD; i++) { + service.recordUnknownEvent(makeRaw({ paging_token: `${i}-1` }), "Evt"); + } + const summary = service.getHealthSummary(); + expect(summary.alertFiring).toBe(true); + expect(summary.status).toBe("degraded"); + }); + + it("fires a Sentry error capture exactly when threshold is crossed", () => { + for (let i = 0; i < SchemaObservabilityService.ALERT_THRESHOLD; i++) { + service.recordUnknownEvent(makeRaw({ paging_token: `${i}-1` }), "Evt"); + } + // The "error" level capture happens on the threshold-crossing call + const errorCalls = (sentry.captureMessage as jest.Mock).mock.calls.filter( + ([, level]) => level === "error", + ); + expect(errorCalls.length).toBeGreaterThanOrEqual(1); + }); + + it("status is healthy below threshold", () => { + service.recordUnknownEvent(makeRaw(), "Evt"); + const summary = service.getHealthSummary(); + expect(summary.status).toBe("healthy"); + expect(summary.alertFiring).toBe(false); + }); + }); + + describe("ring-buffer cap", () => { + it("keeps at most MAX_DIAGNOSTIC_BUFFER entries", () => { + const { MAX_DIAGNOSTIC_BUFFER } = jest.requireActual("../parser-diagnostics.types") as { + MAX_DIAGNOSTIC_BUFFER: number; + }; + for (let i = 0; i < MAX_DIAGNOSTIC_BUFFER + 10; i++) { + service.recordUnknownEvent(makeRaw({ paging_token: `${i}-1` }), "Evt"); + } + const summary = service.getHealthSummary(); + expect(summary.recentRejections.length).toBeLessThanOrEqual( + MAX_DIAGNOSTIC_BUFFER, + ); + }); + }); + + describe("getHealthSummary", () => { + it("returns healthy state with zero rejections", () => { + const summary = service.getHealthSummary(); + expect(summary.status).toBe("healthy"); + expect(summary.alertFiring).toBe(false); + expect(summary.windowRejectionCount).toBe(0); + expect(summary.recentRejections).toHaveLength(0); + }); + + it("exposes windowMs and alertThreshold for transparency", () => { + const summary = service.getHealthSummary(); + expect(summary.windowMs).toBe(SchemaObservabilityService.WINDOW_MS); + expect(summary.alertThreshold).toBe(SchemaObservabilityService.ALERT_THRESHOLD); + }); + }); +}); diff --git a/app/backend/src/ingestion/__tests__/soroban-event-indexer.service.unit.spec.ts b/app/backend/src/ingestion/__tests__/soroban-event-indexer.service.unit.spec.ts index 4e89a553e..d0580f245 100644 --- a/app/backend/src/ingestion/__tests__/soroban-event-indexer.service.unit.spec.ts +++ b/app/backend/src/ingestion/__tests__/soroban-event-indexer.service.unit.spec.ts @@ -81,7 +81,16 @@ describe("SorobanEventIndexerService - Resiliency & Hardening", () => { service = new SorobanEventIndexerService( mocks.config, mocks.checkpointRepo, mocks.escrowRepo, mocks.privacyRepo, mocks.adminRepo, mocks.stealthRepo, - mocks.metrics, mocks.eventEmitter + mocks.metrics, mocks.eventEmitter, + { + recordUnknownEvent: jest.fn(), + recordFieldMismatch: jest.fn(), + recordUnexpectedFields: jest.fn(), + recordUnsupportedVersion: jest.fn(), + recordIncompatibleVersion: jest.fn(), + recordParseError: jest.fn(), + getHealthSummary: jest.fn(), + } as unknown as import("../schema-observability.service").SchemaObservabilityService ); }); diff --git a/app/backend/src/ingestion/__tests__/soroban-event.parser.observability.unit.spec.ts b/app/backend/src/ingestion/__tests__/soroban-event.parser.observability.unit.spec.ts new file mode 100644 index 000000000..8896a6abf --- /dev/null +++ b/app/backend/src/ingestion/__tests__/soroban-event.parser.observability.unit.spec.ts @@ -0,0 +1,234 @@ +/** + * Tests for the observability callbacks added to SorobanEventParser + * as part of the schema-drift detection feature. + */ +import { xdr, nativeToScVal } from "@stellar/stellar-sdk"; +import { + SorobanEventParser, + RawHorizonContractEvent, + ParserObservabilityCallbacks, +} from "../soroban-event.parser"; +import { + RustAcademy_EVENT_TOPICS, + RustAcademy_EVENT_SCHEMA_VERSION, +} from "../event-schema"; + +function symVal(s: string): xdr.ScVal { + return xdr.ScVal.scvSymbol(s); +} +function addressVal(pk: string): xdr.ScVal { + return nativeToScVal(pk); +} +function bytesVal(hex: string): xdr.ScVal { + return xdr.ScVal.scvBytes(Buffer.from(hex, "hex")); +} +function mapVal(entries: Record): xdr.ScVal { + const mapEntries = Object.entries(entries).map( + ([k, v]) => new xdr.ScMapEntry({ key: xdr.ScVal.scvSymbol(k), val: v }), + ); + return xdr.ScVal.scvMap(mapEntries); +} +function makeRaw( + topics: xdr.ScVal[], + data: xdr.ScVal, + overrides: Partial = {}, +): RawHorizonContractEvent { + return { + id: "1", + paging_token: "100-1", + transaction_hash: "txhash", + ledger: 100, + created_at: "2025-01-01T00:00:00Z", + contract_id: "CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAWHF", + type: "contract", + topic: topics.map((v) => v.toXDR("base64")), + value: { xdr: data.toXDR("base64") }, + ...overrides, + }; +} + +const OWNER = "GDQERHRWJYV7JHRP5V7DWJVI6Y5ABZP3YRH7DKYJRBEGJQKE6IQEOSY2"; +const TOKEN = "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC"; +const COMMITMENT_HEX = "deadbeef".repeat(8); + +describe("SorobanEventParser – observability callbacks", () => { + describe("onUnknownEvent callback", () => { + it("fires when event name is not in schema registry", () => { + const onUnknownEvent = jest.fn(); + const parser = new SorobanEventParser(undefined, { onUnknownEvent }); + + const raw = makeRaw([symVal("NonExistentEvent")], xdr.ScVal.scvVoid()); + const result = parser.parse(raw); + + expect(result).toBeNull(); + expect(onUnknownEvent).toHaveBeenCalledWith( + expect.objectContaining({ paging_token: "100-1" }), + "NonExistentEvent", + ); + }); + + it("does NOT fire for a known canonical topic+event pair", () => { + const onUnknownEvent = jest.fn(); + const parser = new SorobanEventParser(undefined, { onUnknownEvent }); + + const topics = [ + symVal(RustAcademy_EVENT_TOPICS.escrow), + symVal("EscrowDeposited"), + bytesVal(COMMITMENT_HEX), + addressVal(OWNER), + ]; + const data = mapVal({ + amount_due: nativeToScVal(1_000n, { type: "i128" }), + amount_paid: nativeToScVal(1_000n, { type: "i128" }), + expires_at: nativeToScVal(9999999n, { type: "u64" }), + ledger_sequence: nativeToScVal(100, { type: "u32" }), + schema_version: nativeToScVal(RustAcademy_EVENT_SCHEMA_VERSION, { type: "u32" }), + timestamp: nativeToScVal(1700000000n, { type: "u64" }), + token: addressVal(TOKEN), + }); + + parser.parse(makeRaw(topics, data, { ledger: 100 })); + expect(onUnknownEvent).not.toHaveBeenCalled(); + }); + }); + + describe("onFieldMismatch callback", () => { + it("fires when a required payload key is absent", () => { + const onFieldMismatch = jest.fn(); + const parser = new SorobanEventParser(undefined, { onFieldMismatch }); + + const topics = [ + symVal(RustAcademy_EVENT_TOPICS.escrow), + symVal("EscrowDeposited"), + bytesVal(COMMITMENT_HEX), + addressVal(OWNER), + ]; + // token is intentionally missing + const data = mapVal({ + amount_due: nativeToScVal(1_000n, { type: "i128" }), + amount_paid: nativeToScVal(1_000n, { type: "i128" }), + expires_at: nativeToScVal(9999999n, { type: "u64" }), + schema_version: nativeToScVal(RustAcademy_EVENT_SCHEMA_VERSION, { type: "u32" }), + timestamp: nativeToScVal(1700000000n, { type: "u64" }), + // token absent + }); + + const result = parser.parse(makeRaw(topics, data)); + // Event should be rejected due to field mismatch + expect(result).toBeNull(); + expect(onFieldMismatch).toHaveBeenCalledWith( + "EscrowDeposited", + RustAcademy_EVENT_SCHEMA_VERSION, + expect.objectContaining({ paging_token: "100-1" }), + expect.arrayContaining(["token"]), + expect.any(Array), + ); + }); + }); + + describe("onUnexpectedFields callback", () => { + it("fires when extra fields are present but event is still ingested", () => { + const onUnexpectedFields = jest.fn(); + const parser = new SorobanEventParser(undefined, { onUnexpectedFields }); + + const topics = [ + symVal(RustAcademy_EVENT_TOPICS.escrow), + symVal("EscrowDeposited"), + bytesVal(COMMITMENT_HEX), + addressVal(OWNER), + ]; + const data = mapVal({ + amount_due: nativeToScVal(1_000n, { type: "i128" }), + amount_paid: nativeToScVal(1_000n, { type: "i128" }), + expires_at: nativeToScVal(9999999n, { type: "u64" }), + ledger_sequence: nativeToScVal(100, { type: "u32" }), + schema_version: nativeToScVal(RustAcademy_EVENT_SCHEMA_VERSION, { type: "u32" }), + timestamp: nativeToScVal(1700000000n, { type: "u64" }), + token: addressVal(TOKEN), + future_extra_field: nativeToScVal("some_value", { type: "string" }), + }); + + const result = parser.parse(makeRaw(topics, data, { ledger: 100 })); + // Event is still parsed successfully + expect(result).not.toBeNull(); + expect(result?.eventType).toBe("EscrowDeposited"); + // But the unexpected-fields callback fires + expect(onUnexpectedFields).toHaveBeenCalledWith( + "EscrowDeposited", + RustAcademy_EVENT_SCHEMA_VERSION, + expect.objectContaining({ paging_token: "100-1" }), + expect.arrayContaining(["future_extra_field"]), + ); + }); + }); + + describe("onIncompatibleVersion callback", () => { + it("fires when schema_version is in range but not in compatibleVersions list", () => { + const onIncompatibleVersion = jest.fn(); + const parser = new SorobanEventParser(undefined, { onIncompatibleVersion }); + + // ContractPaused only supports [2], so version 1 is incompatible + const topics = [ + symVal(RustAcademy_EVENT_TOPICS.admin), + symVal("ContractPaused"), + addressVal(OWNER), + ]; + const data = mapVal({ + paused: nativeToScVal(true), + schema_version: nativeToScVal(1, { type: "u32" }), + timestamp: nativeToScVal(1700000000n, { type: "u64" }), + }); + + const result = parser.parse(makeRaw(topics, data)); + expect(result).toBeNull(); + expect(onIncompatibleVersion).toHaveBeenCalledWith( + "ContractPaused", + 1, + expect.objectContaining({ paging_token: "100-1" }), + ); + }); + }); + + describe("onParseError callback", () => { + it("fires when XDR decode fails", () => { + const onParseError = jest.fn(); + const parser = new SorobanEventParser(undefined, { onParseError }); + + const raw = makeRaw([], xdr.ScVal.scvVoid(), { + topic: ["!!not-valid-base64!!"], + }); + const result = parser.parse(raw); + + expect(result).toBeNull(); + expect(onParseError).toHaveBeenCalledWith( + expect.objectContaining({ paging_token: "100-1" }), + expect.any(String), + ); + }); + }); + + describe("backward-compat: legacy onUnknownSchemaVersion handler still works", () => { + it("calls the legacy positional handler for schema_version > MAX_SUPPORTED", () => { + const legacyHandler = jest.fn(); + const parser = new SorobanEventParser(legacyHandler); + + const topics = [ + symVal(RustAcademy_EVENT_TOPICS.escrow), + symVal("EscrowDeposited"), + bytesVal(COMMITMENT_HEX), + addressVal(OWNER), + ]; + const data = mapVal({ + schema_version: nativeToScVal(999, { type: "u32" }), + amount_due: nativeToScVal(1n, { type: "i128" }), + amount_paid: nativeToScVal(1n, { type: "i128" }), + expires_at: nativeToScVal(1n, { type: "u64" }), + timestamp: nativeToScVal(1n, { type: "u64" }), + token: addressVal(TOKEN), + }); + + parser.parse(makeRaw(topics, data, { paging_token: "777-1" })); + expect(legacyHandler).toHaveBeenCalledWith("EscrowDeposited", 999, "777-1"); + }); + }); +}); diff --git a/app/backend/src/ingestion/ingestion.module.ts b/app/backend/src/ingestion/ingestion.module.ts index f1a5bb3ef..64c407b26 100644 --- a/app/backend/src/ingestion/ingestion.module.ts +++ b/app/backend/src/ingestion/ingestion.module.ts @@ -3,6 +3,7 @@ import { Module, forwardRef } from "@nestjs/common"; import { SupabaseModule } from "../supabase/supabase.module"; import { JobQueueModule } from "../job-queue/job-queue.module"; import { MetricsModule } from "../metrics/metrics.module"; +import { SentryModule } from "../sentry/sentry.module"; import { CursorRepository } from "./cursor.repository"; import { EscrowEventRepository } from "./escrow-event.repository"; import { PrivacyEventRepository } from "./privacy-event.repository"; @@ -14,14 +15,17 @@ import { StellarIngestionService } from "./stellar-ingestion.service"; import { SorobanEventIndexerService } from "./soroban-event-indexer.service"; import { SorobanIndexerController } from "./soroban-indexer.controller"; import { IngestionBootstrapService } from "./ingestion-bootstrap.service"; +import { SchemaObservabilityService } from "./schema-observability.service"; +import { ParserHealthController } from "./parser-health.controller"; @Module({ imports: [ SupabaseModule, forwardRef(() => JobQueueModule), MetricsModule, + SentryModule, ], - controllers: [SorobanIndexerController], + controllers: [SorobanIndexerController, ParserHealthController], providers: [ CursorRepository, EscrowEventRepository, @@ -33,6 +37,7 @@ import { IngestionBootstrapService } from "./ingestion-bootstrap.service"; StellarIngestionService, SorobanEventIndexerService, IngestionBootstrapService, + SchemaObservabilityService, ], exports: [ StellarIngestionService, @@ -40,6 +45,7 @@ import { IngestionBootstrapService } from "./ingestion-bootstrap.service"; SorobanEventParser, CursorRepository, EscrowEventRepository, + SchemaObservabilityService, ], }) export class IngestionModule {} diff --git a/app/backend/src/ingestion/parser-diagnostics.types.ts b/app/backend/src/ingestion/parser-diagnostics.types.ts new file mode 100644 index 000000000..685c5dad3 --- /dev/null +++ b/app/backend/src/ingestion/parser-diagnostics.types.ts @@ -0,0 +1,48 @@ +/** + * Diagnostic payload captured when the Soroban event parser rejects an event + * due to schema drift (unknown event name, field mismatch, or unsupported + * schema version). + * + * All values are safe for logging/Sentry — no private keys, wallet seed + * phrases, or personally-identifying data are included. + */ +export interface ParserRejectionDiagnostic { + /** ISO timestamp at the moment the rejection was recorded. */ + readonly detectedAt: string; + + /** Category of drift that caused the rejection. */ + readonly driftType: DriftType; + + /** Contract address the event arrived from. */ + readonly contractId: string; + + /** Event name symbol decoded from topic[1] (or topic[0] for legacy events). */ + readonly eventName: string; + + /** Schema version field extracted from the event payload (0 if absent). */ + readonly schemaVersion: number; + + /** Horizon paging token for the rejected event (stable cross-run reference). */ + readonly pagingToken: string; + + /** Transaction hash of the rejected event. */ + readonly txHash: string; + + /** + * For FIELD_MISMATCH: sorted list of required payload keys that were absent. + * For UNEXPECTED_FIELDS: sorted list of extra keys found in the payload. + * Empty for other drift types. + */ + readonly missingFields: readonly string[]; + readonly unexpectedFields: readonly string[]; +} + +export type DriftType = + | "UNKNOWN_EVENT_NAME" // topic symbol is not in the known schema registry + | "FIELD_MISMATCH" // required payload key is absent + | "UNSUPPORTED_VERSION" // schema_version > MAX_SUPPORTED_SCHEMA_VERSION + | "INCOMPATIBLE_VERSION" // schema_version not in compatibleVersions list + | "PARSE_ERROR"; // XDR decode or structural error + +/** Ring-buffer of recent rejection diagnostics kept in memory. */ +export const MAX_DIAGNOSTIC_BUFFER = 100; diff --git a/app/backend/src/ingestion/parser-health.controller.ts b/app/backend/src/ingestion/parser-health.controller.ts new file mode 100644 index 000000000..dd9a4f72a --- /dev/null +++ b/app/backend/src/ingestion/parser-health.controller.ts @@ -0,0 +1,53 @@ +import { Controller, Get } from "@nestjs/common"; +import { ApiOperation, ApiResponse, ApiTags } from "@nestjs/swagger"; +import { SchemaObservabilityService, ParserHealthSummary } from "./schema-observability.service"; + +/** + * Developer-facing endpoint for parser health and schema-drift diagnostics. + * + * Returns a real-time snapshot of: + * - Whether the alert threshold has been crossed in the current window. + * - Per-drift-type rejection counts in the sliding window. + * - A ring-buffer of recent rejection diagnostics (last 100 events). + * + * This endpoint is intentionally unauthenticated so it surfaces quickly in + * incident dashboards and does not leak sensitive data (all fields are safe). + */ +@ApiTags("indexer") +@Controller("indexer") +export class ParserHealthController { + constructor( + private readonly schemaObservability: SchemaObservabilityService, + ) {} + + @Get("parser-health") + @ApiOperation({ + summary: "Parser schema-drift health status", + description: + "Returns real-time schema-drift diagnostics for the Soroban event parser. " + + "Includes sliding-window rejection counts, alert state, and recent rejection details. " + + "No sensitive data is exposed.", + }) + @ApiResponse({ + status: 200, + description: "Parser health snapshot", + schema: { + type: "object", + properties: { + status: { type: "string", enum: ["healthy", "degraded"] }, + alertFiring: { type: "boolean" }, + windowMs: { type: "number", example: 60000 }, + alertThreshold: { type: "number", example: 10 }, + windowRejectionCount: { type: "number" }, + rejectionsByDriftType: { + type: "object", + additionalProperties: { type: "number" }, + }, + recentRejections: { type: "array", items: { type: "object" } }, + }, + }, + }) + getParserHealth(): ParserHealthSummary { + return this.schemaObservability.getHealthSummary(); + } +} diff --git a/app/backend/src/ingestion/schema-drift-detector.ts b/app/backend/src/ingestion/schema-drift-detector.ts new file mode 100644 index 000000000..3cacf5d72 --- /dev/null +++ b/app/backend/src/ingestion/schema-drift-detector.ts @@ -0,0 +1,129 @@ +import { xdr, scValToNative } from "@stellar/stellar-sdk"; +import { + RustAcademy_EVENT_SCHEMA_CONTRACTS, +} from "./event-schema"; +import type { SorobanEventType } from "./types/contract-event.types"; +import type { DriftType } from "./parser-diagnostics.types"; + +export interface FieldDriftResult { + driftType: DriftType | null; + missingFields: string[]; + unexpectedFields: string[]; +} + +/** + * SchemaDriftDetector compares a live event's decoded payload map against + * the canonical schema metadata in RustAcademy_EVENT_SCHEMA_CONTRACTS. + * + * It operates purely on already-decoded XDR data so it never throws and + * never touches private key material. + */ +export class SchemaDriftDetector { + /** + * Validate the set of keys present in the event's data map against the + * expected payload keys for the given event type. + * + * Returns a FieldDriftResult whose `driftType` is: + * - `FIELD_MISMATCH` — one or more required keys are absent + * - `null` — no drift detected (unexpected extra keys are + * reported but do NOT trigger a rejection — they are + * forward-compat additions from a newer contract) + * + * Extra keys are reported separately in `unexpectedFields` so callers can + * emit an informational metric without blocking ingestion. + */ + detectFieldDrift( + eventName: SorobanEventType, + dataMap: Record, + ): FieldDriftResult { + const contract = + RustAcademy_EVENT_SCHEMA_CONTRACTS[ + eventName as keyof typeof RustAcademy_EVENT_SCHEMA_CONTRACTS + ]; + + if (!contract) { + // Unknown event — handled by a different code path in the parser. + return { driftType: null, missingFields: [], unexpectedFields: [] }; + } + + const expectedKeys = new Set(contract.payloadKeys as readonly string[]); + const actualKeys = new Set(Object.keys(dataMap)); + + const missingFields: string[] = []; + for (const key of expectedKeys) { + // schema_version and ledger_sequence are optional in v1 legacy events; + // treat their absence as acceptable so we don't reject every legacy event. + if (key === "schema_version" || key === "ledger_sequence") continue; + if (!actualKeys.has(key)) { + missingFields.push(key); + } + } + + const unexpectedFields: string[] = []; + for (const key of actualKeys) { + if (!expectedKeys.has(key)) { + unexpectedFields.push(key); + } + } + + missingFields.sort(); + unexpectedFields.sort(); + + return { + driftType: missingFields.length > 0 ? "FIELD_MISMATCH" : null, + missingFields, + unexpectedFields, + }; + } + + /** + * Decode a Soroban map ScVal into a plain Record keyed by symbol string. + * Returns an empty object on any decode error so callers can treat it as + * an empty payload rather than throwing. + */ + decodeToMap(data: xdr.ScVal): Record { + try { + const result: Record = {}; + const mapEntries = data.map(); + for (const entry of mapEntries) { + const key = entry.key().sym().toString(); + result[key] = entry.val(); + } + return result; + } catch { + return {}; + } + } + + /** + * Extract a raw payload snapshot safe for diagnostic logging. + * Limits to the first 50 keys and truncates string values > 120 chars. + */ + extractSafePayloadSnapshot( + data: xdr.ScVal, + ): Record { + try { + const map = this.decodeToMap(data); + const snapshot: Record = {}; + let count = 0; + + for (const [key, val] of Object.entries(map)) { + if (count++ >= 50) break; + try { + const native = scValToNative(val); + const str = + typeof native === "bigint" + ? native.toString() + : JSON.stringify(native); + snapshot[key] = str.length > 120 ? str.slice(0, 120) + "…" : str; + } catch { + snapshot[key] = ""; + } + } + + return snapshot; + } catch { + return {}; + } + } +} diff --git a/app/backend/src/ingestion/schema-observability.service.ts b/app/backend/src/ingestion/schema-observability.service.ts new file mode 100644 index 000000000..ea5f63912 --- /dev/null +++ b/app/backend/src/ingestion/schema-observability.service.ts @@ -0,0 +1,356 @@ +import { Injectable, Logger } from "@nestjs/common"; +import { MetricsService } from "../metrics/metrics.service"; +import { SentryService } from "../sentry/sentry.service"; +import type { + ParserRejectionDiagnostic, + DriftType, +} from "./parser-diagnostics.types"; +import { MAX_DIAGNOSTIC_BUFFER } from "./parser-diagnostics.types"; +import type { SorobanEventType } from "./types/contract-event.types"; +import type { RawHorizonContractEvent } from "./soroban-event.parser"; + +/** + * SchemaObservabilityService is the single integration point for all + * contract-event schema-drift signals. + * + * Responsibilities: + * - Record Prometheus metrics for each drift category. + * - Capture safe diagnostic payloads to Sentry (no raw private key material). + * - Maintain an in-memory ring-buffer of recent rejections for the + * developer-facing /indexer/parser-health endpoint. + * - Expose a per-window rejection count so alert thresholds can be evaluated. + */ +@Injectable() +export class SchemaObservabilityService { + private readonly logger = new Logger(SchemaObservabilityService.name); + + /** Ring-buffer of recent rejection diagnostics (capped at MAX_DIAGNOSTIC_BUFFER). */ + private readonly recentRejections: ParserRejectionDiagnostic[] = []; + + /** + * Sliding-window counters: each entry is { timestamp, driftType }. + * We keep only entries within the configured window for threshold checks. + */ + private readonly windowEntries: Array<{ + timestamp: number; + driftType: DriftType; + }> = []; + + /** Alert fires when rejection count exceeds this in WINDOW_MS. */ + static readonly ALERT_THRESHOLD = 10; + static readonly WINDOW_MS = 60_000; // 1 minute + + constructor( + private readonly metrics: MetricsService, + private readonly sentry: SentryService, + ) {} + + // ── Public surface ────────────────────────────────────────────────────────── + + /** + * Called when the event topic symbol is not in the known schema registry. + * This is classified differently from a parse error — the contract may be + * emitting an event type that this indexer version does not know about yet. + */ + recordUnknownEvent( + raw: RawHorizonContractEvent, + rawEventName: string, + ): void { + const diagnostic = this.buildDiagnostic( + "UNKNOWN_EVENT_NAME", + rawEventName, + 0, + raw, + [], + [], + ); + + this.metrics.recordUnknownEvent(raw.contract_id, rawEventName); + this.metrics.recordParserRejection(rawEventName, "UNKNOWN_EVENT_NAME"); + this.pushDiagnostic(diagnostic); + this.pushWindowEntry("UNKNOWN_EVENT_NAME"); + + this.logger.warn( + `[schema-drift] UNKNOWN_EVENT_NAME contractId=${raw.contract_id} ` + + `eventName=${rawEventName} pagingToken=${raw.paging_token}`, + ); + + this.maybeCaptureToSentry(diagnostic); + this.maybeFireThresholdAlert(); + } + + /** + * Called when required payload keys are absent from a known event type. + * The event is rejected to protect analytics from stale/incomplete records. + */ + recordFieldMismatch( + eventName: SorobanEventType, + schemaVersion: number, + raw: RawHorizonContractEvent, + missingFields: string[], + unexpectedFields: string[], + ): void { + const diagnostic = this.buildDiagnostic( + "FIELD_MISMATCH", + eventName, + schemaVersion, + raw, + missingFields, + unexpectedFields, + ); + + this.metrics.recordFieldMismatch(eventName, schemaVersion, missingFields); + this.metrics.recordParserRejection(eventName, "FIELD_MISMATCH"); + this.pushDiagnostic(diagnostic); + this.pushWindowEntry("FIELD_MISMATCH"); + + this.logger.warn( + `[schema-drift] FIELD_MISMATCH eventName=${eventName} ` + + `schemaVersion=${schemaVersion} missing=[${missingFields.join(",")}] ` + + `pagingToken=${raw.paging_token}`, + ); + + this.maybeCaptureToSentry(diagnostic); + this.maybeFireThresholdAlert(); + } + + /** + * Called when extra, unexpected fields are present (forward-compat additions). + * The event is still ingested but the anomaly is counted for monitoring. + */ + recordUnexpectedFields( + eventName: SorobanEventType, + schemaVersion: number, + raw: RawHorizonContractEvent, + unexpectedFields: string[], + ): void { + this.metrics.recordUnexpectedFields(eventName, schemaVersion); + + this.logger.debug( + `[schema-drift] UNEXPECTED_FIELDS eventName=${eventName} ` + + `extra=[${unexpectedFields.join(",")}] pagingToken=${raw.paging_token}`, + ); + } + + /** + * Called when schema_version > MAX_SUPPORTED_SCHEMA_VERSION. + */ + recordUnsupportedVersion( + eventName: string, + schemaVersion: number, + raw: RawHorizonContractEvent, + ): void { + const diagnostic = this.buildDiagnostic( + "UNSUPPORTED_VERSION", + eventName, + schemaVersion, + raw, + [], + [], + ); + + this.metrics.recordUnknownSchemaVersion(eventName, schemaVersion); + this.metrics.recordParserRejection(eventName, "UNSUPPORTED_VERSION"); + this.pushDiagnostic(diagnostic); + this.pushWindowEntry("UNSUPPORTED_VERSION"); + + this.logger.warn( + `[schema-drift] UNSUPPORTED_VERSION eventName=${eventName} ` + + `schemaVersion=${schemaVersion} pagingToken=${raw.paging_token}`, + ); + + this.maybeCaptureToSentry(diagnostic); + this.maybeFireThresholdAlert(); + } + + /** + * Called when schema_version is in range but not in compatibleVersions list. + */ + recordIncompatibleVersion( + eventName: string, + schemaVersion: number, + raw: RawHorizonContractEvent, + ): void { + const diagnostic = this.buildDiagnostic( + "INCOMPATIBLE_VERSION", + eventName, + schemaVersion, + raw, + [], + [], + ); + + this.metrics.recordParserRejection(eventName, "INCOMPATIBLE_VERSION"); + this.pushDiagnostic(diagnostic); + this.pushWindowEntry("INCOMPATIBLE_VERSION"); + + this.logger.warn( + `[schema-drift] INCOMPATIBLE_VERSION eventName=${eventName} ` + + `schemaVersion=${schemaVersion} pagingToken=${raw.paging_token}`, + ); + + this.maybeCaptureToSentry(diagnostic); + this.maybeFireThresholdAlert(); + } + + /** + * Called when XDR decode or any structural parse error occurs. + */ + recordParseError( + raw: RawHorizonContractEvent, + errorMessage: string, + ): void { + const diagnostic = this.buildDiagnostic( + "PARSE_ERROR", + "unknown", + 0, + raw, + [], + [], + ); + + this.metrics.recordParserRejection("unknown", "PARSE_ERROR"); + this.metrics.recordError("SorobanParser", "PARSE_ERROR"); + this.pushDiagnostic(diagnostic); + this.pushWindowEntry("PARSE_ERROR"); + + this.logger.warn( + `[schema-drift] PARSE_ERROR pagingToken=${raw.paging_token} error=${errorMessage}`, + ); + + this.maybeFireThresholdAlert(); + } + + // ── Developer-facing health summary ──────────────────────────────────────── + + /** + * Returns a summary suitable for the /indexer/parser-health endpoint. + * Includes recent rejection diagnostics and the windowed alert state. + */ + getHealthSummary(): ParserHealthSummary { + this.pruneWindow(); + const windowRejectCount = this.windowEntries.length; + const alertFiring = windowRejectCount >= SchemaObservabilityService.ALERT_THRESHOLD; + + const byType = this.windowEntries.reduce>( + (acc, e) => { + acc[e.driftType] = (acc[e.driftType] ?? 0) + 1; + return acc; + }, + {}, + ); + + return { + status: alertFiring ? "degraded" : "healthy", + alertFiring, + windowMs: SchemaObservabilityService.WINDOW_MS, + alertThreshold: SchemaObservabilityService.ALERT_THRESHOLD, + windowRejectionCount: windowRejectCount, + rejectionsByDriftType: byType, + recentRejections: [...this.recentRejections], + }; + } + + // ── Private helpers ───────────────────────────────────────────────────────── + + private buildDiagnostic( + driftType: DriftType, + eventName: string, + schemaVersion: number, + raw: RawHorizonContractEvent, + missingFields: string[], + unexpectedFields: string[], + ): ParserRejectionDiagnostic { + return { + detectedAt: new Date().toISOString(), + driftType, + contractId: raw.contract_id, + eventName, + schemaVersion, + pagingToken: raw.paging_token, + txHash: raw.transaction_hash, + missingFields, + unexpectedFields, + }; + } + + private pushDiagnostic(d: ParserRejectionDiagnostic): void { + this.recentRejections.push(d); + // Trim to ring-buffer size + if (this.recentRejections.length > MAX_DIAGNOSTIC_BUFFER) { + this.recentRejections.shift(); + } + } + + private pushWindowEntry(driftType: DriftType): void { + this.pruneWindow(); + this.windowEntries.push({ timestamp: Date.now(), driftType }); + } + + private pruneWindow(): void { + const cutoff = Date.now() - SchemaObservabilityService.WINDOW_MS; + while (this.windowEntries.length > 0 && this.windowEntries[0].timestamp < cutoff) { + this.windowEntries.shift(); + } + } + + private maybeCaptureToSentry(d: ParserRejectionDiagnostic): void { + try { + this.sentry.captureMessage( + `[schema-drift] ${d.driftType}: ${d.eventName}`, + "warning", + { + driftType: d.driftType, + contractId: d.contractId, + eventName: d.eventName, + schemaVersion: d.schemaVersion, + pagingToken: d.pagingToken, + txHash: d.txHash, + missingFields: d.missingFields, + unexpectedFields: d.unexpectedFields, + }, + ); + } catch { + // Sentry capture is non-fatal + } + } + + private maybeFireThresholdAlert(): void { + this.pruneWindow(); + const count = this.windowEntries.length; + if (count === SchemaObservabilityService.ALERT_THRESHOLD) { + // Log exactly once when the threshold is first crossed in this window + this.logger.error( + `[schema-drift-alert] THRESHOLD EXCEEDED: ${count} rejections in the last ` + + `${SchemaObservabilityService.WINDOW_MS / 1000}s. ` + + `Investigate ingestion pipeline for contract schema changes.`, + ); + + try { + this.sentry.captureMessage( + `[schema-drift-alert] ${count} parser rejections in ${SchemaObservabilityService.WINDOW_MS / 1000}s`, + "error", + { + windowRejectionCount: count, + alertThreshold: SchemaObservabilityService.ALERT_THRESHOLD, + windowMs: SchemaObservabilityService.WINDOW_MS, + }, + ); + } catch { + // Sentry capture is non-fatal + } + } + } +} + +// ── Public DTO for /indexer/parser-health ──────────────────────────────────── + +export interface ParserHealthSummary { + status: "healthy" | "degraded"; + alertFiring: boolean; + windowMs: number; + alertThreshold: number; + windowRejectionCount: number; + rejectionsByDriftType: Record; + recentRejections: ParserRejectionDiagnostic[]; +} diff --git a/app/backend/src/ingestion/soroban-event-indexer.service.ts b/app/backend/src/ingestion/soroban-event-indexer.service.ts index 29cdde9a1..f98318013 100644 --- a/app/backend/src/ingestion/soroban-event-indexer.service.ts +++ b/app/backend/src/ingestion/soroban-event-indexer.service.ts @@ -13,6 +13,7 @@ import { EscrowEventRepository } from "./escrow-event.repository"; import { PrivacyEventRepository } from "./privacy-event.repository"; import { AdminEventRepository } from "./admin-event.repository"; import { StealthEventRepository } from "./stealth-event.repository"; +import { SchemaObservabilityService } from "./schema-observability.service"; import type { RustAcademyContractEvent } from "./types/contract-event.types"; const PAGE_LIMIT = 200; @@ -48,15 +49,58 @@ export class SorobanEventIndexerService { private readonly stealthRepo: StealthEventRepository, private readonly metrics: MetricsService, private readonly eventEmitter: EventEmitter2, + private readonly schemaObservability: SchemaObservabilityService, ) { this.horizonUrl = HORIZON_BASE_URLS[this.config.network]; - this.parser = new SorobanEventParser((eventName, version, pagingToken) => { - this.logger.warn( - `Unknown schema_version=${version} for event ${eventName} paging_token=${pagingToken}`, - ); - this.metrics.recordUnknownSchemaVersion(eventName, version); - }); + this.parser = new SorobanEventParser( + (eventName, version, pagingToken) => { + this.logger.warn( + `Unknown schema_version=${version} for event ${eventName} paging_token=${pagingToken}`, + ); + this.metrics.recordUnknownSchemaVersion(eventName, version); + }, + { + onUnknownSchemaVersion: (eventName, version, pagingToken) => { + // Build a minimal synthetic raw for diagnostics + this.schemaObservability.recordUnsupportedVersion( + eventName, + version, + { paging_token: pagingToken } as RawHorizonContractEvent, + ); + }, + onUnknownEvent: (raw, rawEventName) => { + this.schemaObservability.recordUnknownEvent(raw, rawEventName); + }, + onFieldMismatch: (eventName, schemaVersion, raw, missingFields, unexpectedFields) => { + this.schemaObservability.recordFieldMismatch( + eventName, + schemaVersion, + raw, + missingFields, + unexpectedFields, + ); + }, + onUnexpectedFields: (eventName, schemaVersion, raw, unexpectedFields) => { + this.schemaObservability.recordUnexpectedFields( + eventName, + schemaVersion, + raw, + unexpectedFields, + ); + }, + onIncompatibleVersion: (eventName, schemaVersion, raw) => { + this.schemaObservability.recordIncompatibleVersion( + eventName, + schemaVersion, + raw, + ); + }, + onParseError: (raw, errorMessage) => { + this.schemaObservability.recordParseError(raw, errorMessage); + }, + }, + ); } async indexLedgerRange( diff --git a/app/backend/src/ingestion/soroban-event.parser.ts b/app/backend/src/ingestion/soroban-event.parser.ts index 7f55f0d64..5aa332462 100644 --- a/app/backend/src/ingestion/soroban-event.parser.ts +++ b/app/backend/src/ingestion/soroban-event.parser.ts @@ -19,6 +19,7 @@ import { RustAcademy_EVENT_TOPICS, type RustAcademyEventTopic, } from "./event-schema"; +import { SchemaDriftDetector } from "./schema-drift-detector"; /** Maximum schema version this indexer understands. */ export const MAX_SUPPORTED_SCHEMA_VERSION = 2; @@ -29,6 +30,51 @@ export type UnknownSchemaVersionHandler = ( pagingToken: string, ) => void; +/** Called when the event topic symbol is not in the known schema registry. */ +export type UnknownEventHandler = ( + raw: RawHorizonContractEvent, + rawEventName: string, +) => void; + +/** Called when required payload keys are absent from a known event type. */ +export type FieldMismatchHandler = ( + eventName: SorobanEventType, + schemaVersion: number, + raw: RawHorizonContractEvent, + missingFields: string[], + unexpectedFields: string[], +) => void; + +/** Called when extra, unexpected fields are present (forward-compat, non-fatal). */ +export type UnexpectedFieldsHandler = ( + eventName: SorobanEventType, + schemaVersion: number, + raw: RawHorizonContractEvent, + unexpectedFields: string[], +) => void; + +/** Called for schema_version not in compatibleVersions (in-range but unsupported). */ +export type IncompatibleVersionHandler = ( + eventName: string, + schemaVersion: number, + raw: RawHorizonContractEvent, +) => void; + +/** Called when XDR decode or structural parse error occurs. */ +export type ParseErrorHandler = ( + raw: RawHorizonContractEvent, + errorMessage: string, +) => void; + +export interface ParserObservabilityCallbacks { + onUnknownSchemaVersion?: UnknownSchemaVersionHandler; + onUnknownEvent?: UnknownEventHandler; + onFieldMismatch?: FieldMismatchHandler; + onUnexpectedFields?: UnexpectedFieldsHandler; + onIncompatibleVersion?: IncompatibleVersionHandler; + onParseError?: ParseErrorHandler; +} + /** * Raw Horizon contract event record shape (subset we need). */ @@ -65,9 +111,11 @@ interface TopicLayout { */ export class SorobanEventParser { private readonly logger = new Logger(SorobanEventParser.name); + private readonly driftDetector = new SchemaDriftDetector(); constructor( private readonly onUnknownSchemaVersion?: UnknownSchemaVersionHandler, + private readonly callbacks?: ParserObservabilityCallbacks, ) {} /** @@ -83,7 +131,14 @@ export class SorobanEventParser { if (topics.length === 0) return null; const layout = this.resolveTopicLayout(topics); - if (!layout) return null; + if (!layout) { + // Try to extract a raw event name for the unknown-event callback + const rawEventName = this.tryDecodeFirstSymbol(topics); + if (rawEventName) { + this.callbacks?.onUnknownEvent?.(raw, rawEventName); + } + return null; + } const schemaVersion = this.extractSchemaVersionFromData(dataVal); if (schemaVersion > MAX_SUPPORTED_SCHEMA_VERSION) { @@ -96,6 +151,11 @@ export class SorobanEventParser { schemaVersion, raw.paging_token, ); + this.callbacks?.onUnknownSchemaVersion?.( + layout.eventName, + schemaVersion, + raw.paging_token, + ); return null; } @@ -103,9 +163,46 @@ export class SorobanEventParser { this.logger.warn( `Unsupported ${layout.eventName} schema version ${schemaVersion}`, ); + this.callbacks?.onIncompatibleVersion?.( + layout.eventName, + schemaVersion, + raw, + ); + return null; + } + + // ── Field-drift detection ───────────────────────────────────────────── + const dataMap = this.driftDetector.decodeToMap(dataVal); + const driftResult = this.driftDetector.detectFieldDrift( + layout.eventName, + dataMap, + ); + + if (driftResult.driftType === "FIELD_MISMATCH") { + this.logger.warn( + `Field mismatch for ${layout.eventName} paging_token=${raw.paging_token}: ` + + `missing=[${driftResult.missingFields.join(",")}]`, + ); + this.callbacks?.onFieldMismatch?.( + layout.eventName, + schemaVersion, + raw, + driftResult.missingFields, + driftResult.unexpectedFields, + ); return null; } + if (driftResult.unexpectedFields.length > 0) { + this.callbacks?.onUnexpectedFields?.( + layout.eventName, + schemaVersion, + raw, + driftResult.unexpectedFields, + ); + } + // ───────────────────────────────────────────────────────────────────── + const contractLedgerSequence = this.extractLedgerSequenceFromData(dataVal); if ( contractLedgerSequence !== undefined && @@ -200,6 +297,7 @@ export class SorobanEventParser { this.logger.warn( `Failed to parse contract event ${raw.paging_token}: ${(err as Error).message}`, ); + this.callbacks?.onParseError?.(raw, (err as Error).message); return null; } } @@ -425,6 +523,15 @@ export class SorobanEventParser { } } + /** + * Attempt to decode the first topic as a symbol without throwing. + * Used for best-effort unknown-event diagnostics. + */ + private tryDecodeFirstSymbol(topics: xdr.ScVal[]): string | null { + if (topics.length === 0) return null; + return this.decodeSymbol(topics[0]); + } + private resolveTopicLayout(topics: xdr.ScVal[]): TopicLayout | null { const first = this.decodeSymbol(topics[0]); if (!first) return null; diff --git a/app/backend/src/metrics/metrics.service.ts b/app/backend/src/metrics/metrics.service.ts index 634051a39..99ec21f95 100644 --- a/app/backend/src/metrics/metrics.service.ts +++ b/app/backend/src/metrics/metrics.service.ts @@ -16,6 +16,11 @@ export class MetricsService implements OnModuleInit { private sorobanRpcFailoverTotal: client.Counter; private sorobanRpcActiveEndpoint: client.Gauge; private sorobanIndexerUnknownSchemaVersion: client.Counter; + // Schema-drift observability (Issue: contract event schema drift) + private sorobanParserUnknownEventTotal: client.Counter; + private sorobanParserFieldMismatchTotal: client.Counter; + private sorobanParserRejectionTotal: client.Counter; + private sorobanParserUnexpectedFieldsTotal: client.Counter; private parityCheckResults: client.Gauge; private shadowTrafficRequests: client.Counter; private indexerLagLedgers: client.Gauge; @@ -103,6 +108,39 @@ export class MetricsService implements OnModuleInit { labelNames: ["event_name", "schema_version"], }); + // ── Schema-drift counters ────────────────────────────────────────────── + + this.sorobanParserUnknownEventTotal = new client.Counter({ + name: "soroban_parser_unknown_event_total", + help: "Contract events rejected because the event name is not in the known schema registry", + labelNames: ["contract_id", "raw_event_name"], + }); + + this.sorobanParserFieldMismatchTotal = new client.Counter({ + name: "soroban_parser_field_mismatch_total", + help: "Contract events where one or more required payload keys were absent", + labelNames: ["event_name", "schema_version", "missing_fields"], + }); + + this.sorobanParserRejectionTotal = new client.Counter({ + name: "soroban_parser_rejection_total", + help: "Total contract event parse rejections classified by drift type", + labelNames: ["event_name", "drift_type"], + }); + + this.sorobanParserUnexpectedFieldsTotal = new client.Counter({ + name: "soroban_parser_unexpected_fields_total", + help: "Events that carry payload keys not in the expected schema (forward-compat additions)", + labelNames: ["event_name", "schema_version"], + }); + + this.register.registerMetric(this.sorobanParserUnknownEventTotal); + this.register.registerMetric(this.sorobanParserFieldMismatchTotal); + this.register.registerMetric(this.sorobanParserRejectionTotal); + this.register.registerMetric(this.sorobanParserUnexpectedFieldsTotal); + + // ── End schema-drift counters ───────────────────────────────────────── + this.parityCheckResults = new client.Gauge({ name: "environment_parity_check_results", help: "Environment parity check results by status", @@ -354,4 +392,60 @@ export class MetricsService implements OnModuleInit { this.indexerLagGuardStatus.set(status); } catch (error) {} } + + // ── Schema-drift observability ──────────────────────────────────────────── + + /** + * Increment the counter for events rejected because their topic symbol is + * not in the schema registry. + */ + recordUnknownEvent(contractId: string, rawEventName: string) { + if (!this.initialized || !this.sorobanParserUnknownEventTotal) return; + try { + this.sorobanParserUnknownEventTotal.labels(contractId, rawEventName).inc(); + } catch (error) {} + } + + /** + * Increment the counter when required payload keys are absent for a known + * event type. `missingFields` is a sorted comma-separated key list used as + * a label so the alert can surface exactly which fields drifted. + */ + recordFieldMismatch( + eventName: string, + schemaVersion: number, + missingFields: string[], + ) { + if (!this.initialized || !this.sorobanParserFieldMismatchTotal) return; + try { + this.sorobanParserFieldMismatchTotal + .labels(eventName, String(schemaVersion), missingFields.join(",")) + .inc(); + } catch (error) {} + } + + /** + * Increment the unified rejection counter tagged by drift type. + * This is the primary metric for alerting thresholds. + */ + recordParserRejection(eventName: string, driftType: string) { + if (!this.initialized || !this.sorobanParserRejectionTotal) return; + try { + this.sorobanParserRejectionTotal.labels(eventName, driftType).inc(); + } catch (error) {} + } + + /** + * Increment the counter for events that carry unexpected extra keys. + * These events are still ingested; this is an informational forward-compat + * warning rather than a hard rejection. + */ + recordUnexpectedFields(eventName: string, schemaVersion: number) { + if (!this.initialized || !this.sorobanParserUnexpectedFieldsTotal) return; + try { + this.sorobanParserUnexpectedFieldsTotal + .labels(eventName, String(schemaVersion)) + .inc(); + } catch (error) {} + } } From bc7b68813f068ea9a4f9854d5037125c6bb8635e Mon Sep 17 00:00:00 2001 From: Emmzyemms Date: Tue, 23 Jun 2026 10:57:52 +0100 Subject: [PATCH 2/2] Fixe: fixed the CI error! --- .../__tests__/soroban-event.parser.observability.unit.spec.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/app/backend/src/ingestion/__tests__/soroban-event.parser.observability.unit.spec.ts b/app/backend/src/ingestion/__tests__/soroban-event.parser.observability.unit.spec.ts index 8896a6abf..e07df8574 100644 --- a/app/backend/src/ingestion/__tests__/soroban-event.parser.observability.unit.spec.ts +++ b/app/backend/src/ingestion/__tests__/soroban-event.parser.observability.unit.spec.ts @@ -6,7 +6,6 @@ import { xdr, nativeToScVal } from "@stellar/stellar-sdk"; import { SorobanEventParser, RawHorizonContractEvent, - ParserObservabilityCallbacks, } from "../soroban-event.parser"; import { RustAcademy_EVENT_TOPICS,