Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions app/backend/src/ingestion/__tests__/schema-drift-detector.unit.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import { xdr, nativeToScVal } from "@stellar/stellar-sdk";
import { SchemaDriftDetector } from "../schema-drift-detector";
import type { SorobanEventType } from "../types/contract-event.types";

function mapVal(entries: Record<string, xdr.ScVal>): xdr.ScVal {
const mapEntries = Object.entries(entries).map(
([k, v]) => new xdr.ScMapEntry({ key: xdr.ScVal.scvSymbol(k), val: v }),
);
return xdr.ScVal.scvMap(mapEntries);
}

const TOKEN = "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC";
function addressVal(pubkey: string): xdr.ScVal {
return nativeToScVal(pubkey);
}

describe("SchemaDriftDetector", () => {
const detector = new SchemaDriftDetector();

describe("detectFieldDrift – EscrowDeposited", () => {
const eventName: SorobanEventType = "EscrowDeposited";

it("returns no drift for a complete v2 payload", () => {
const map = detector.decodeToMap(
mapVal({
amount_due: nativeToScVal(1_000n, { type: "i128" }),
amount_paid: nativeToScVal(1_000n, { type: "i128" }),
expires_at: nativeToScVal(9999999n, { type: "u64" }),
ledger_sequence: nativeToScVal(42, { type: "u32" }),
schema_version: nativeToScVal(2, { type: "u32" }),
timestamp: nativeToScVal(1700000000n, { type: "u64" }),
token: addressVal(TOKEN),
}),
);

const result = detector.detectFieldDrift(eventName, map);
expect(result.driftType).toBeNull();
expect(result.missingFields).toHaveLength(0);
});

it("returns FIELD_MISMATCH when token is absent", () => {
const map = detector.decodeToMap(
mapVal({
amount_due: nativeToScVal(1_000n, { type: "i128" }),
amount_paid: nativeToScVal(1_000n, { type: "i128" }),
expires_at: nativeToScVal(9999999n, { type: "u64" }),
timestamp: nativeToScVal(1700000000n, { type: "u64" }),
// token is missing
}),
);

const result = detector.detectFieldDrift(eventName, map);
expect(result.driftType).toBe("FIELD_MISMATCH");
expect(result.missingFields).toContain("token");
});

it("does NOT flag schema_version absence as drift (optional field)", () => {
const map = detector.decodeToMap(
mapVal({
amount_due: nativeToScVal(1_000n, { type: "i128" }),
amount_paid: nativeToScVal(1_000n, { type: "i128" }),
expires_at: nativeToScVal(9999999n, { type: "u64" }),
timestamp: nativeToScVal(1700000000n, { type: "u64" }),
token: addressVal(TOKEN),
// schema_version intentionally absent (v1 legacy)
}),
);

const result = detector.detectFieldDrift(eventName, map);
expect(result.driftType).toBeNull();
expect(result.missingFields).toHaveLength(0);
});

it("reports unexpected extra fields without triggering FIELD_MISMATCH", () => {
const map = detector.decodeToMap(
mapVal({
amount_due: nativeToScVal(1_000n, { type: "i128" }),
amount_paid: nativeToScVal(1_000n, { type: "i128" }),
expires_at: nativeToScVal(9999999n, { type: "u64" }),
timestamp: nativeToScVal(1700000000n, { type: "u64" }),
token: addressVal(TOKEN),
new_field_from_future_contract: nativeToScVal("future", { type: "string" }),
}),
);

const result = detector.detectFieldDrift(eventName, map);
expect(result.driftType).toBeNull(); // not a hard rejection
expect(result.unexpectedFields).toContain("new_field_from_future_contract");
});

it("missingFields are sorted alphabetically", () => {
const map = detector.decodeToMap(
mapVal({
// Only timestamp present; amount_due, amount_paid, expires_at, token missing
timestamp: nativeToScVal(1700000000n, { type: "u64" }),
}),
);

const result = detector.detectFieldDrift(eventName, map);
expect(result.driftType).toBe("FIELD_MISMATCH");
expect(result.missingFields).toEqual([...result.missingFields].sort());
});
});

describe("detectFieldDrift – unknown event name", () => {
it("returns no drift for unknown event names (handled elsewhere)", () => {
const result = detector.detectFieldDrift(
"UnknownEvent" as SorobanEventType,
{},
);
expect(result.driftType).toBeNull();
expect(result.missingFields).toHaveLength(0);
});
});

describe("decodeToMap", () => {
it("returns an empty map for void ScVal", () => {
const result = detector.decodeToMap(xdr.ScVal.scvVoid());
expect(result).toEqual({});
});

it("correctly decodes key-value map entries", () => {
const data = mapVal({
foo: nativeToScVal(42n, { type: "i128" }),
});
const result = detector.decodeToMap(data);
expect(Object.keys(result)).toContain("foo");
});
});

describe("extractSafePayloadSnapshot", () => {
it("returns a string-keyed snapshot without throwing", () => {
const data = mapVal({
amount: nativeToScVal(12345n, { type: "i128" }),
token: addressVal(TOKEN),
});
const snapshot = detector.extractSafePayloadSnapshot(data);
expect(snapshot).toHaveProperty("amount");
expect(snapshot).toHaveProperty("token");
expect(typeof snapshot.amount).toBe("string");
});
});
});
177 changes: 177 additions & 0 deletions app/backend/src/ingestion/__tests__/schema-observability.unit.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import { SchemaObservabilityService } from "../schema-observability.service";
import type { RawHorizonContractEvent } from "../soroban-event.parser";

function makeRaw(
overrides: Partial<RawHorizonContractEvent> = {},
): RawHorizonContractEvent {
return {
id: "1",
paging_token: "100-1",
transaction_hash: "txhash",
ledger: 100,
created_at: "2025-01-01T00:00:00Z",
contract_id: "CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAWHF",
type: "contract",
topic: [],
value: { xdr: "" },
...overrides,
};
}

/** Minimal MetricsService stub */
function makeMetricsStub() {
return {
recordUnknownEvent: jest.fn(),
recordFieldMismatch: jest.fn(),
recordParserRejection: jest.fn(),
recordUnexpectedFields: jest.fn(),
recordUnknownSchemaVersion: jest.fn(),
recordError: jest.fn(),
};
}

/** Minimal SentryService stub */
function makeSentryStub() {
return {
captureMessage: jest.fn(),
captureException: jest.fn(),
};
}

describe("SchemaObservabilityService", () => {
let service: SchemaObservabilityService;
let metrics: ReturnType<typeof makeMetricsStub>;
let sentry: ReturnType<typeof makeSentryStub>;

beforeEach(() => {
metrics = makeMetricsStub();
sentry = makeSentryStub();
service = new SchemaObservabilityService(
metrics as never,
sentry as never,
);
});

describe("recordUnknownEvent", () => {
it("calls metrics.recordUnknownEvent and metrics.recordParserRejection", () => {
const raw = makeRaw();
service.recordUnknownEvent(raw, "SomeWeirdEvent");
expect(metrics.recordUnknownEvent).toHaveBeenCalledWith(
raw.contract_id,
"SomeWeirdEvent",
);
expect(metrics.recordParserRejection).toHaveBeenCalledWith(
"SomeWeirdEvent",
"UNKNOWN_EVENT_NAME",
);
});

it("increments windowRejectionCount", () => {
service.recordUnknownEvent(makeRaw(), "SomeWeirdEvent");
const summary = service.getHealthSummary();
expect(summary.windowRejectionCount).toBe(1);
expect(summary.rejectionsByDriftType["UNKNOWN_EVENT_NAME"]).toBe(1);
});

it("captures to Sentry", () => {
service.recordUnknownEvent(makeRaw(), "SomeWeirdEvent");
expect(sentry.captureMessage).toHaveBeenCalledWith(
expect.stringContaining("UNKNOWN_EVENT_NAME"),
"warning",
expect.objectContaining({ driftType: "UNKNOWN_EVENT_NAME" }),
);
});
});

describe("recordFieldMismatch", () => {
it("calls metrics.recordFieldMismatch with sorted missing fields", () => {
service.recordFieldMismatch("EscrowDeposited", 2, makeRaw(), ["token", "amount_due"], []);
expect(metrics.recordFieldMismatch).toHaveBeenCalledWith(
"EscrowDeposited",
2,
["token", "amount_due"],
);
});

it("adds to recentRejections ring-buffer", () => {
service.recordFieldMismatch("EscrowDeposited", 2, makeRaw({ paging_token: "999-1" }), ["token"], []);
const summary = service.getHealthSummary();
const rejection = summary.recentRejections.find(
(r) => r.pagingToken === "999-1",
);
expect(rejection).toBeDefined();
expect(rejection?.driftType).toBe("FIELD_MISMATCH");
expect(rejection?.missingFields).toContain("token");
});
});

describe("recordUnexpectedFields", () => {
it("calls metrics.recordUnexpectedFields but does NOT add to window (non-fatal)", () => {
service.recordUnexpectedFields("EscrowDeposited", 2, makeRaw(), ["future_field"]);
expect(metrics.recordUnexpectedFields).toHaveBeenCalledWith("EscrowDeposited", 2);
// Unexpected fields do not count toward rejection window
const summary = service.getHealthSummary();
expect(summary.windowRejectionCount).toBe(0);
});
});

describe("alert threshold", () => {
it("sets alertFiring=true when windowRejectionCount >= ALERT_THRESHOLD", () => {
for (let i = 0; i < SchemaObservabilityService.ALERT_THRESHOLD; i++) {
service.recordUnknownEvent(makeRaw({ paging_token: `${i}-1` }), "Evt");
}
const summary = service.getHealthSummary();
expect(summary.alertFiring).toBe(true);
expect(summary.status).toBe("degraded");
});

it("fires a Sentry error capture exactly when threshold is crossed", () => {
for (let i = 0; i < SchemaObservabilityService.ALERT_THRESHOLD; i++) {
service.recordUnknownEvent(makeRaw({ paging_token: `${i}-1` }), "Evt");
}
// The "error" level capture happens on the threshold-crossing call
const errorCalls = (sentry.captureMessage as jest.Mock).mock.calls.filter(
([, level]) => level === "error",
);
expect(errorCalls.length).toBeGreaterThanOrEqual(1);
});

it("status is healthy below threshold", () => {
service.recordUnknownEvent(makeRaw(), "Evt");
const summary = service.getHealthSummary();
expect(summary.status).toBe("healthy");
expect(summary.alertFiring).toBe(false);
});
});

describe("ring-buffer cap", () => {
it("keeps at most MAX_DIAGNOSTIC_BUFFER entries", () => {
const { MAX_DIAGNOSTIC_BUFFER } = jest.requireActual("../parser-diagnostics.types") as {
MAX_DIAGNOSTIC_BUFFER: number;
};
for (let i = 0; i < MAX_DIAGNOSTIC_BUFFER + 10; i++) {
service.recordUnknownEvent(makeRaw({ paging_token: `${i}-1` }), "Evt");
}
const summary = service.getHealthSummary();
expect(summary.recentRejections.length).toBeLessThanOrEqual(
MAX_DIAGNOSTIC_BUFFER,
);
});
});

describe("getHealthSummary", () => {
it("returns healthy state with zero rejections", () => {
const summary = service.getHealthSummary();
expect(summary.status).toBe("healthy");
expect(summary.alertFiring).toBe(false);
expect(summary.windowRejectionCount).toBe(0);
expect(summary.recentRejections).toHaveLength(0);
});

it("exposes windowMs and alertThreshold for transparency", () => {
const summary = service.getHealthSummary();
expect(summary.windowMs).toBe(SchemaObservabilityService.WINDOW_MS);
expect(summary.alertThreshold).toBe(SchemaObservabilityService.ALERT_THRESHOLD);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,16 @@ describe("SorobanEventIndexerService - Resiliency & Hardening", () => {
service = new SorobanEventIndexerService(
mocks.config, mocks.checkpointRepo, mocks.escrowRepo,
mocks.privacyRepo, mocks.adminRepo, mocks.stealthRepo,
mocks.metrics, mocks.eventEmitter
mocks.metrics, mocks.eventEmitter,
{
recordUnknownEvent: jest.fn(),
recordFieldMismatch: jest.fn(),
recordUnexpectedFields: jest.fn(),
recordUnsupportedVersion: jest.fn(),
recordIncompatibleVersion: jest.fn(),
recordParseError: jest.fn(),
getHealthSummary: jest.fn(),
} as unknown as import("../schema-observability.service").SchemaObservabilityService
);
});

Expand Down
Loading