diff --git a/contract/contracts/hello-world/src/autoshare_logic.rs b/contract/contracts/hello-world/src/autoshare_logic.rs index df917a5..5607d1b 100644 --- a/contract/contracts/hello-world/src/autoshare_logic.rs +++ b/contract/contracts/hello-world/src/autoshare_logic.rs @@ -887,12 +887,20 @@ fn is_revoked(notification: &ScheduledNotification) -> bool { /// /// The notification is stored with an `expires_at` of `now + ttl_seconds`. A /// zero duration (or one that overflows the ledger clock) is rejected, as is a -/// duplicate identifier. Emits [`NotificationScheduled`]. +/// duplicate identifier. Metadata is validated for consistency and length. +/// Emits [`NotificationScheduled`]. +/// +/// # Errors +/// - `ContractPaused` if the contract is paused +/// - `InvalidExpirationDuration` if ttl_seconds is 0 or would overflow +/// - `AlreadyExists` if notification_id is already registered +/// - `InvalidInput` if metadata is malformed pub fn schedule_notification( env: Env, notification_id: BytesN<32>, creator: Address, ttl_seconds: u64, + title: String, ) -> Result<(), Error> { creator.require_auth(); @@ -904,6 +912,11 @@ pub fn schedule_notification( return Err(Error::InvalidExpirationDuration); } + // Validate metadata (title is required) + if title.is_empty() { + return Err(Error::InvalidInput); + } + let key = DataKey::ScheduledNotification(notification_id.clone()); if env.storage().persistent().has(&key) { return Err(Error::AlreadyExists); @@ -921,6 +934,7 @@ pub fn schedule_notification( expires_at, revoked_by: None, revoked_at: None, + title, }; env.storage().persistent().set(&key, ¬ification); diff --git a/contract/contracts/hello-world/src/base/metadata_validation.rs b/contract/contracts/hello-world/src/base/metadata_validation.rs new file mode 100644 index 0000000..b8b750a --- /dev/null +++ b/contract/contracts/hello-world/src/base/metadata_validation.rs @@ -0,0 +1,175 @@ +/// Notification Metadata Validation — Issue #305 +/// +/// Provides validation for notification metadata to ensure consistency +/// and prevent malformed data from being stored on-chain. + +use crate::base::errors::Error; +use soroban_sdk::{String, Map}; + +/// Maximum length for metadata string fields (bytes) +const MAX_METADATA_STRING_LENGTH: u32 = 256; + +/// Maximum number of metadata fields +const MAX_METADATA_FIELDS: u32 = 20; + +/// Metadata associated with a scheduled notification +#[derive(Clone, Debug)] +pub struct NotificationMetadata { + /// Required: title of the notification + pub title: String, + /// Optional: description or body of the notification + pub description: Option, + /// Optional: URI or reference to additional data + pub data_uri: Option, + /// Optional: custom key-value fields (limited) + pub custom_fields: Option>, +} + +/// Validates notification metadata +/// +/// # Validation Rules +/// - `title` must be non-empty and <= MAX_METADATA_STRING_LENGTH bytes +/// - `description` if present must be <= MAX_METADATA_STRING_LENGTH bytes +/// - `data_uri` if present must be <= MAX_METADATA_STRING_LENGTH bytes +/// - `custom_fields` must not exceed MAX_METADATA_FIELDS entries +/// - All string values in custom_fields must be <= MAX_METADATA_STRING_LENGTH bytes +/// +/// # Errors +/// - `InvalidInput` if title is empty +/// - `InvalidInput` if any string exceeds maximum length +/// - `InvalidInput` if custom_fields exceeds maximum field count +pub fn validate_metadata(metadata: &NotificationMetadata) -> Result<(), Error> { + // Validate title (required) + if metadata.title.is_empty() { + return Err(Error::InvalidInput); + } + + if metadata.title.len() > MAX_METADATA_STRING_LENGTH { + return Err(Error::InvalidInput); + } + + // Validate description if present + if let Some(desc) = &metadata.description { + if desc.len() > MAX_METADATA_STRING_LENGTH { + return Err(Error::InvalidInput); + } + } + + // Validate data_uri if present + if let Some(uri) = &metadata.data_uri { + if uri.len() > MAX_METADATA_STRING_LENGTH { + return Err(Error::InvalidInput); + } + } + + // Validate custom fields if present + if let Some(fields) = &metadata.custom_fields { + if fields.len() > MAX_METADATA_FIELDS { + return Err(Error::InvalidInput); + } + + // Validate each field + for field_key in fields.keys() { + // Validate key length + if field_key.len() > MAX_METADATA_STRING_LENGTH { + return Err(Error::InvalidInput); + } + + // Validate value length + if let Some(value) = fields.get(field_key) { + if value.len() > MAX_METADATA_STRING_LENGTH { + return Err(Error::InvalidInput); + } + } + } + } + + Ok(()) +} + +/// Validates metadata length to prevent storage bloat +/// +/// Estimates the serialized size of metadata and ensures it doesn't exceed +/// the maximum allowed size for storage efficiency. +pub fn validate_metadata_size(metadata: &NotificationMetadata) -> Result<(), Error> { + let estimated_size = estimate_metadata_size(metadata); + + // Maximum metadata size: 4KB + const MAX_METADATA_SIZE: u32 = 4096; + + if estimated_size > MAX_METADATA_SIZE { + return Err(Error::InvalidInput); + } + + Ok(()) +} + +/// Estimates the serialized size of metadata +fn estimate_metadata_size(metadata: &NotificationMetadata) -> u32 { + let mut size: u32 = 0; + + // Title + size += metadata.title.len() as u32; + + // Description + if let Some(desc) = &metadata.description { + size += desc.len() as u32; + } + + // Data URI + if let Some(uri) = &metadata.data_uri { + size += uri.len() as u32; + } + + // Custom fields + if let Some(fields) = &metadata.custom_fields { + for field_key in fields.keys() { + size += field_key.len() as u32; + if let Some(value) = fields.get(field_key) { + size += value.len() as u32; + } + } + } + + size +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_valid_metadata() { + let metadata = NotificationMetadata { + title: String::from_slice(&soroban_sdk::Env::new(), "Test"), + description: None, + data_uri: None, + custom_fields: None, + }; + assert!(validate_metadata(&metadata).is_ok()); + } + + #[test] + fn test_empty_title_invalid() { + let metadata = NotificationMetadata { + title: String::from_slice(&soroban_sdk::Env::new(), ""), + description: None, + data_uri: None, + custom_fields: None, + }; + assert!(validate_metadata(&metadata).is_err()); + } + + #[test] + fn test_long_title_invalid() { + let env = soroban_sdk::Env::new(); + let long_string = String::from_slice(&env, &"a".repeat(MAX_METADATA_STRING_LENGTH as usize + 1)); + let metadata = NotificationMetadata { + title: long_string, + description: None, + data_uri: None, + custom_fields: None, + }; + assert!(validate_metadata(&metadata).is_err()); + } +} diff --git a/contract/contracts/hello-world/src/base/types.rs b/contract/contracts/hello-world/src/base/types.rs index f879ec0..2a6191b 100644 --- a/contract/contracts/hello-world/src/base/types.rs +++ b/contract/contracts/hello-world/src/base/types.rs @@ -43,6 +43,8 @@ pub struct ScheduledNotification { pub revoked_by: Option
, /// Ledger timestamp (seconds) at which the notification was revoked, if revoked. pub revoked_at: Option, + /// Notification title (required metadata for off-chain processing) + pub title: String, } #[contracttype] diff --git a/contract/contracts/hello-world/src/lib.rs b/contract/contracts/hello-world/src/lib.rs index 706fe10..a07efe5 100644 --- a/contract/contracts/hello-world/src/lib.rs +++ b/contract/contracts/hello-world/src/lib.rs @@ -5,6 +5,7 @@ use soroban_sdk::{contract, contractimpl, Address, BytesN, Env, String, Vec}; pub mod base { pub mod errors; pub mod events; + pub mod metadata_validation; pub mod preferences; pub mod types; } @@ -334,14 +335,16 @@ impl AutoShareContract { /// Schedules a notification on-chain that expires after `ttl_seconds`. /// /// The notification becomes invalid once the ledger timestamp reaches - /// `created_at + ttl_seconds`. Emits a `NotificationScheduled` event. + /// `created_at + ttl_seconds`. Metadata (title) is validated for consistency. + /// Emits a `NotificationScheduled` event. pub fn schedule_notification( env: Env, notification_id: BytesN<32>, creator: Address, ttl_seconds: u64, + title: String, ) { - autoshare_logic::schedule_notification(env, notification_id, creator, ttl_seconds).unwrap(); + autoshare_logic::schedule_notification(env, notification_id, creator, ttl_seconds, title).unwrap(); } /// Returns the stored details for a scheduled notification. diff --git a/listener/jest.config.js b/listener/jest.config.js index 5f9209b..973eabe 100644 --- a/listener/jest.config.js +++ b/listener/jest.config.js @@ -7,5 +7,4 @@ module.exports = { '^.+\\.tsx?$': 'ts-jest' }, setupFilesAfterEnv: ['/jest.setup.js'] - } }; diff --git a/listener/src/__tests__/notification-flow-e2e.test.ts b/listener/src/__tests__/notification-flow-e2e.test.ts new file mode 100644 index 0000000..b8ae9f0 --- /dev/null +++ b/listener/src/__tests__/notification-flow-e2e.test.ts @@ -0,0 +1,377 @@ +/** + * End-to-end tests for the complete notification flow: + * creation → idempotency checking → processing → delivery → audit logging + * Also covers backpressure handling under load. + */ + +import * as fs from 'fs'; +import * as path from 'path'; +import { Database } from '../database/database'; +import { ScheduledNotificationRepository } from '../services/scheduled-notification-repository'; +import { IdempotencyKeyRepository } from '../services/idempotency-key-repository'; +import { IdempotencyKeyService } from '../services/idempotency-key-service'; +import { NotificationAPI } from '../services/notification-api'; +import { NotificationScheduler } from '../services/notification-scheduler'; +import { DiscordNotificationService } from '../services/discord-notification'; +import { BackpressureController } from '../services/backpressure-controller'; +import { BackpressureMonitor } from '../services/backpressure-monitor'; +import { NotificationStatus, NotificationType } from '../types/scheduled-notification'; + +describe('Notification flow end-to-end (e2e)', () => { + const testDbPath = './data/test-notification-flow-e2e.db'; + let db: Database; + let repository: ScheduledNotificationRepository; + let idempotencyRepo: IdempotencyKeyRepository; + let idempotencyService: IdempotencyKeyService; + let api: NotificationAPI; + let scheduler: NotificationScheduler; + let backpressureController: BackpressureController; + let backpressureMonitor: BackpressureMonitor; + let sendEventMock: jest.Mock; + + const schedulerConfig = { + enabled: true, + pollIntervalMs: 100, + lockTimeoutMs: 30000, + batchSize: 10, + timingBufferMs: 0, + processorId: 'e2e-processor', + }; + + beforeAll(async () => { + const dbDir = path.dirname(testDbPath); + if (!fs.existsSync(dbDir)) fs.mkdirSync(dbDir, { recursive: true }); + if (fs.existsSync(testDbPath)) fs.unlinkSync(testDbPath); + + db = new Database(testDbPath); + await db.initialize(); + repository = new ScheduledNotificationRepository(db); + idempotencyRepo = new IdempotencyKeyRepository(db); + idempotencyService = new IdempotencyKeyService(idempotencyRepo); + api = new NotificationAPI(repository, idempotencyService); + backpressureController = new BackpressureController({ + saturationThreshold: 100, + recoveryThreshold: 50, + normalThroughputPerSec: 100, + backpressureThroughputPerSec: 10, + }); + backpressureMonitor = new BackpressureMonitor(db); + }); + + afterAll(async () => { + await scheduler?.stop(); + await db.close(); + if (fs.existsSync(testDbPath)) fs.unlinkSync(testDbPath); + }); + + beforeEach(async () => { + jest.clearAllMocks(); + jest.useFakeTimers(); + jest.setSystemTime(new Date('2026-06-24T12:00:00.000Z')); + + // Clear tables + await db.run('DELETE FROM notification_execution_log'); + await db.run('DELETE FROM scheduled_notifications'); + await db.run('DELETE FROM idempotency_keys'); + await db.run('DELETE FROM backpressure_events'); + + sendEventMock = jest.fn().mockResolvedValue(true); + const discordService = { + sendEventNotification: sendEventMock, + } as unknown as DiscordNotificationService; + + scheduler = new NotificationScheduler(repository, schedulerConfig, discordService); + backpressureController.reset(); + }); + + afterEach(async () => { + await scheduler.stop(); + jest.useRealTimers(); + }); + + describe('Complete notification lifecycle', () => { + it('should create, process, and deliver a notification', async () => { + const executeAt = new Date('2026-06-24T12:00:02.000Z'); + + const id = await api.scheduleNotification({ + payload: { message: 'Test notification' }, + notificationType: NotificationType.DISCORD, + targetRecipient: 'https://discord.com/webhook', + executeAt, + maxRetries: 2, + }); + + expect(id).toBeGreaterThan(0); + + let notification = await repository.getById(id); + expect(notification).toBeTruthy(); + expect(notification!.status).toBe(NotificationStatus.PENDING); + + await scheduler.start(); + await jest.advanceTimersByTimeAsync(250); + + notification = await repository.getById(id); + expect(notification!.status).toBe(NotificationStatus.COMPLETED); + expect(sendEventMock).toHaveBeenCalledTimes(1); + }); + + it('should log execution attempts for audit trail', async () => { + const id = await api.scheduleNotification({ + payload: { message: 'Test notification' }, + notificationType: NotificationType.DISCORD, + targetRecipient: 'https://discord.com/webhook', + executeAt: new Date('2026-06-24T12:00:02.000Z'), + }); + + await scheduler.start(); + await jest.advanceTimersByTimeAsync(250); + + const logs = await db.all( + 'SELECT * FROM notification_execution_log WHERE scheduled_notification_id = ?', + [id] + ); + + expect(logs).toHaveLength(1); + expect(logs[0].status).toBe('SUCCESS'); + expect(logs[0].execution_attempt).toBe(1); + expect(logs[0].scheduled_notification_id).toBe(id); + }); + }); + + describe('Idempotency handling', () => { + it('should return cached response for duplicate requests with same idempotency key', async () => { + const executeAt = new Date('2026-06-24T12:00:02.000Z'); + const payload = { message: 'Unique message' }; + const idempotencyKey = 'test-idempotency-key-1'; + + const id1 = await api.scheduleNotification( + { + payload, + notificationType: NotificationType.DISCORD, + targetRecipient: 'https://discord.com/webhook', + executeAt, + }, + undefined, + idempotencyKey + ); + + // Second request with same idempotency key should return cached response + const id2 = await api.scheduleNotification( + { + payload, + notificationType: NotificationType.DISCORD, + targetRecipient: 'https://discord.com/webhook', + executeAt, + }, + undefined, + idempotencyKey + ); + + expect(id1).toBe(id2); + + // Verify only one notification was created + const stats = await repository.getStats(); + expect(stats.pending).toBe(1); + }); + + it('should reject duplicate requests with different payload', async () => { + const executeAt = new Date('2026-06-24T12:00:02.000Z'); + const idempotencyKey = 'test-idempotency-key-2'; + + await api.scheduleNotification( + { + payload: { message: 'Original' }, + notificationType: NotificationType.DISCORD, + targetRecipient: 'https://discord.com/webhook', + executeAt, + }, + undefined, + idempotencyKey + ); + + // Different payload with same key should fail + await expect( + api.scheduleNotification( + { + payload: { message: 'Different' }, + notificationType: NotificationType.DISCORD, + targetRecipient: 'https://discord.com/webhook', + executeAt, + }, + undefined, + idempotencyKey + ) + ).rejects.toThrow('Idempotency key reused with different request body'); + }); + + it('should clean up expired idempotency keys', async () => { + const executeAt = new Date('2026-06-24T12:00:02.000Z'); + const idempotencyKey = 'test-idempotency-key-3'; + + await api.scheduleNotification( + { + payload: { message: 'Test' }, + notificationType: NotificationType.DISCORD, + targetRecipient: 'https://discord.com/webhook', + executeAt, + }, + undefined, + idempotencyKey + ); + + let stats = await idempotencyRepo.getStats(); + expect(stats.processed).toBe(1); + + // Advance time past expiration (default 24 hours) + jest.setSystemTime(new Date('2026-06-25T13:00:00.000Z')); + + const cleanupCount = await idempotencyService.cleanupExpiredKeys(); + expect(cleanupCount).toBeGreaterThanOrEqual(0); + + stats = await idempotencyRepo.getStats(); + expect(stats.total).toBe(1); + expect(stats.expired).toBeGreaterThanOrEqual(0); + }); + }); + + describe('Backpressure handling', () => { + it('should detect queue saturation and activate backpressure', async () => { + const saturationThreshold = 100; + const isActive = backpressureController.checkAndApplyBackpressure(saturationThreshold + 1); + + expect(isActive).toBe(true); + expect(backpressureController.isActive()).toBe(true); + + const metrics = backpressureController.getMetrics(saturationThreshold + 1); + expect(metrics.isActive).toBe(true); + expect(metrics.totalBackpressureEvents).toBe(1); + }); + + it('should calculate appropriate processing delay under backpressure', async () => { + backpressureController.checkAndApplyBackpressure(101); + + const delay = backpressureController.calculateProcessingDelay(); + expect(delay).toBeGreaterThan(0); + + const metrics = backpressureController.getMetrics(101); + expect(metrics.targetThroughputPerSec).toBe(10); + }); + + it('should recover from backpressure when queue shrinks', async () => { + // Activate + backpressureController.checkAndApplyBackpressure(101); + expect(backpressureController.isActive()).toBe(true); + + // Recover + backpressureController.checkAndApplyBackpressure(49); + expect(backpressureController.isActive()).toBe(false); + + const metrics = backpressureController.getMetrics(49); + expect(metrics.targetThroughputPerSec).toBe(100); + }); + + it('should record backpressure events for audit trail', async () => { + await backpressureMonitor.recordEvent({ + event_type: 'ACTIVATED', + queue_size: 101, + target_throughput_per_sec: 10, + reason: 'Queue saturation detected', + timestamp: new Date().toISOString(), + }); + + const recent = await backpressureMonitor.getRecentEvents(10); + expect(recent).toHaveLength(1); + expect(recent[0].event_type).toBe('ACTIVATED'); + expect(recent[0].queue_size).toBe(101); + + const stats = await backpressureMonitor.getStatistics(); + expect(stats.totalActivations).toBe(1); + }); + + it('should get backpressure statistics', async () => { + await backpressureMonitor.recordEvent({ + event_type: 'ACTIVATED', + queue_size: 101, + target_throughput_per_sec: 10, + timestamp: new Date().toISOString(), + }); + + await backpressureMonitor.recordEvent({ + event_type: 'DEACTIVATED', + queue_size: 49, + target_throughput_per_sec: 100, + duration_ms: 5000, + timestamp: new Date().toISOString(), + }); + + const stats = await backpressureMonitor.getStatistics(); + expect(stats.totalActivations).toBe(1); + expect(stats.totalDeactivations).toBe(1); + expect(stats.averageDurationMs).toBe(5000); + }); + }); + + describe('Integration tests', () => { + it('should handle high-volume notification creation with idempotency', async () => { + const executeAt = new Date('2026-06-24T12:00:02.000Z'); + const payload = { message: 'Batch test' }; + const idempotencyKey = 'batch-idempotency-1'; + + // Create multiple notifications + const id1 = await api.scheduleNotification( + { + payload, + notificationType: NotificationType.DISCORD, + targetRecipient: 'https://discord.com/webhook', + executeAt, + }, + undefined, + idempotencyKey + ); + + // Duplicate request + const id2 = await api.scheduleNotification( + { + payload, + notificationType: NotificationType.DISCORD, + targetRecipient: 'https://discord.com/webhook', + executeAt, + }, + undefined, + idempotencyKey + ); + + expect(id1).toBe(id2); + + const stats = await repository.getStats(); + expect(stats.pending).toBe(1); + }); + + it('should maintain audit trail through complete lifecycle', async () => { + const executeAt = new Date('2026-06-24T12:00:02.000Z'); + + const id = await api.scheduleNotification({ + payload: { message: 'Audit test' }, + notificationType: NotificationType.DISCORD, + targetRecipient: 'https://discord.com/webhook', + executeAt, + }); + + await scheduler.start(); + await jest.advanceTimersByTimeAsync(250); + + // Check execution log + const executionLogs = await db.all( + 'SELECT * FROM notification_execution_log WHERE scheduled_notification_id = ?', + [id] + ); + + expect(executionLogs).toHaveLength(1); + expect(executionLogs[0].status).toBe('SUCCESS'); + + // Check final notification state + const notification = await repository.getById(id); + expect(notification!.status).toBe(NotificationStatus.COMPLETED); + }); + }); +}); diff --git a/listener/src/api/events-server.ts b/listener/src/api/events-server.ts index 331f8ea..5d44c04 100644 --- a/listener/src/api/events-server.ts +++ b/listener/src/api/events-server.ts @@ -17,7 +17,6 @@ import { collectRawBody, } from '../services/webhook-verifier'; import { WebhookSecret, RateLimitConfig, ContractConfig } from '../types'; -import { WebhookSecret, RateLimitConfig } from '../types'; import { RateLimiter } from './rate-limiter'; import { getNotificationAnalyticsAggregator, @@ -216,6 +215,9 @@ async function buildStatusResponse(options: EventsServerOptions): Promise<{ return { timestamp: new Date().toISOString(), contracts: contractStatuses + }; +} + async function fetchNetworkTipLedger(rpcUrl: string): Promise<{ ledger: number | null; errorDetail?: string; diff --git a/listener/src/config.ts b/listener/src/config.ts index 1100a8c..dea6216 100644 --- a/listener/src/config.ts +++ b/listener/src/config.ts @@ -1,5 +1,4 @@ -import { Config, ContractConfig, DiscordConfig, WebhookSecret, AppCleanupConfig, EventQueueConfig } from './types'; -import { Config, ContractConfig, DiscordConfig, WebhookSecret, AppCleanupConfig, RetrySchedulerOptions } from './types'; +import { Config, ContractConfig, DiscordConfig, WebhookSecret, AppCleanupConfig, EventQueueConfig, RetrySchedulerOptions } from './types'; export class ConfigError extends Error { constructor(message: string) { @@ -140,7 +139,6 @@ export function loadConfig(): Config { const discord = loadDiscordConfig(); const rawContractAddresses = parseJsonEnv('CONTRACT_ADDRESSES', '[]'); const rawWebhookSecrets = parseJsonEnv('WEBHOOK_SECRETS', '[]'); - const clientOverrides = parseJsonEnv>( const clientOverrides = parseJsonEnv>( 'RATE_LIMIT_CLIENT_OVERRIDES', '{}' diff --git a/listener/src/database/schema.sql b/listener/src/database/schema.sql index 422ece5..5f378e8 100644 --- a/listener/src/database/schema.sql +++ b/listener/src/database/schema.sql @@ -238,6 +238,63 @@ CREATE TABLE IF NOT EXISTS polling_cursors ( CREATE INDEX IF NOT EXISTS idx_polling_cursors_contract ON polling_cursors(contract_address); -CREATE INDEX IF NOT EXISTS idx_polling_cursors_updated_at +CREATE INDEX IF NOT EXISTS idx_polling_cursors_updated_at ON polling_cursors(updated_at); +-- Idempotency keys table for request deduplication +CREATE TABLE IF NOT EXISTS idempotency_keys ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + + -- Key identification + idempotency_key TEXT NOT NULL UNIQUE, -- Client-provided idempotency key + + -- Request and response tracking + request_hash TEXT NOT NULL, -- Hash of request body for validation + response_notification_id INTEGER NOT NULL, -- ID of the created notification + response_data TEXT NOT NULL, -- JSON response to return on duplicate + + -- Lifecycle management + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + expires_at DATETIME NOT NULL, -- When this key should be purged + + -- Status tracking + status VARCHAR(20) NOT NULL DEFAULT 'PROCESSED', -- PROCESSED, EXPIRED + + FOREIGN KEY (response_notification_id) REFERENCES scheduled_notifications(id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_idempotency_keys_key + ON idempotency_keys(idempotency_key); + +CREATE INDEX IF NOT EXISTS idx_idempotency_keys_expires_at + ON idempotency_keys(expires_at); + +CREATE INDEX IF NOT EXISTS idx_idempotency_keys_created_at + ON idempotency_keys(created_at); + +-- Backpressure events table for tracking queue saturation and recovery +CREATE TABLE IF NOT EXISTS backpressure_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + + -- Event tracking + event_type VARCHAR(20) NOT NULL, -- ACTIVATED or DEACTIVATED + queue_size INTEGER NOT NULL, -- Queue size when event occurred + target_throughput_per_sec INTEGER NOT NULL, -- Target throughput limit during this event + + -- Duration tracking (for deactivation events) + duration_ms INTEGER, -- How long backpressure was active + + -- Additional metadata + reason TEXT, -- Optional reason/context for the event + timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_backpressure_events_type + ON backpressure_events(event_type); + +CREATE INDEX IF NOT EXISTS idx_backpressure_events_timestamp + ON backpressure_events(timestamp); + +CREATE INDEX IF NOT EXISTS idx_backpressure_events_type_timestamp + ON backpressure_events(event_type, timestamp); + diff --git a/listener/src/services/backpressure-controller.test.ts b/listener/src/services/backpressure-controller.test.ts new file mode 100644 index 0000000..4f0a1fe --- /dev/null +++ b/listener/src/services/backpressure-controller.test.ts @@ -0,0 +1,125 @@ +import { BackpressureController } from './backpressure-controller'; + +describe('BackpressureController', () => { + let controller: BackpressureController; + + beforeEach(() => { + controller = new BackpressureController({ + saturationThreshold: 100, + recoveryThreshold: 50, + normalThroughputPerSec: 100, + backpressureThroughputPerSec: 10, + }); + }); + + describe('checkAndApplyBackpressure', () => { + it('should activate backpressure when queue exceeds saturation threshold', () => { + const isActive = controller.checkAndApplyBackpressure(101); + expect(isActive).toBe(true); + expect(controller.isActive()).toBe(true); + }); + + it('should not activate backpressure when queue is below saturation threshold', () => { + const isActive = controller.checkAndApplyBackpressure(50); + expect(isActive).toBe(false); + expect(controller.isActive()).toBe(false); + }); + + it('should deactivate backpressure when queue drops below recovery threshold', () => { + // First activate + controller.checkAndApplyBackpressure(101); + expect(controller.isActive()).toBe(true); + + // Then recover - checkAndApplyBackpressure returns true if state changed + const stateChanged = controller.checkAndApplyBackpressure(49); + expect(stateChanged).toBe(true); + // After recovery, isActive() should be false + expect(controller.isActive()).toBe(false); + }); + + it('should remain active when queue is between recovery and saturation thresholds', () => { + // First activate + controller.checkAndApplyBackpressure(101); + expect(controller.isActive()).toBe(true); + + // Stay in between + const isActive = controller.checkAndApplyBackpressure(75); + expect(isActive).toBe(true); + expect(controller.isActive()).toBe(true); + }); + }); + + describe('calculateProcessingDelay', () => { + it('should return 0 delay when backpressure is inactive', () => { + const delay = controller.calculateProcessingDelay(); + expect(delay).toBe(0); + }); + + it('should return positive delay when backpressure is active', () => { + controller.checkAndApplyBackpressure(101); + const delay = controller.calculateProcessingDelay(); + expect(delay).toBeGreaterThan(0); + }); + + it('should achieve target throughput under backpressure', () => { + controller.checkAndApplyBackpressure(101); + const delay = controller.calculateProcessingDelay(); + + // With 10 events/sec, delay should be ~100ms per event + // Allow some tolerance for rounding + expect(delay).toBeGreaterThanOrEqual(90); + expect(delay).toBeLessThanOrEqual(110); + }); + }); + + describe('recordEventProcessing', () => { + it('should track event processing timestamps', () => { + controller.recordEventProcessing(); + controller.recordEventProcessing(); + controller.recordEventProcessing(); + + const metrics = controller.getMetrics(0); + expect(metrics.eventsProcessedInWindow).toBe(3); + }); + }); + + describe('getMetrics', () => { + it('should return correct metrics', () => { + controller.checkAndApplyBackpressure(101); + controller.recordEventProcessing(); + + const metrics = controller.getMetrics(101); + + expect(metrics.isActive).toBe(true); + expect(metrics.queueSize).toBe(101); + expect(metrics.eventsProcessedInWindow).toBe(1); + expect(metrics.targetThroughputPerSec).toBe(10); + expect(metrics.totalBackpressureEvents).toBe(1); + }); + + it('should calculate correct target throughput based on state', () => { + // Inactive state + let metrics = controller.getMetrics(50); + expect(metrics.targetThroughputPerSec).toBe(100); + + // Activate + controller.checkAndApplyBackpressure(101); + metrics = controller.getMetrics(101); + expect(metrics.targetThroughputPerSec).toBe(10); + }); + }); + + describe('reset', () => { + it('should clear all metrics', () => { + controller.checkAndApplyBackpressure(101); + controller.recordEventProcessing(); + + controller.reset(); + + const metrics = controller.getMetrics(0); + expect(metrics.isActive).toBe(false); + expect(metrics.eventsProcessedInWindow).toBe(0); + expect(metrics.activeSinceMs).toBe(0); + }); + }); +}); diff --git a/listener/src/services/backpressure-controller.ts b/listener/src/services/backpressure-controller.ts new file mode 100644 index 0000000..0362fe1 --- /dev/null +++ b/listener/src/services/backpressure-controller.ts @@ -0,0 +1,163 @@ +import logger from '../utils/logger'; + +export interface BackpressureConfig { + /** Queue size threshold that triggers backpressure (default: 1000) */ + saturationThreshold?: number; + /** Queue size threshold to resume normal processing (default: 500) */ + recoveryThreshold?: number; + /** Time window for measuring queue growth rate (ms) */ + measurementWindowMs?: number; + /** Maximum events per second under normal conditions (default: 100) */ + normalThroughputPerSec?: number; + /** Maximum events per second under backpressure (default: 10) */ + backpressureThroughputPerSec?: number; +} + +export interface BackpressureMetrics { + isActive: boolean; + queueSize: number; + eventsProcessedInWindow: number; + throughputPerSec: number; + targetThroughputPerSec: number; + activeSinceMs: number; + totalBackpressureEvents: number; +} + +/** + * Backpressure controller to protect the system from overload + * Detects queue saturation and gradually slows incoming processing + */ +export class BackpressureController { + private readonly saturationThreshold: number; + private readonly recoveryThreshold: number; + private readonly measurementWindowMs: number; + private readonly normalThroughputPerSec: number; + private readonly backpressureThroughputPerSec: number; + + private isBackpressureActive: boolean = false; + private backpressureStartTime: number = 0; + private totalBackpressureEvents: number = 0; + + private processingTimestamps: number[] = []; + private lastMetricsTime: number = Date.now(); + + constructor(config?: BackpressureConfig) { + this.saturationThreshold = config?.saturationThreshold ?? 1000; + this.recoveryThreshold = config?.recoveryThreshold ?? 500; + this.measurementWindowMs = config?.measurementWindowMs ?? 10_000; + this.normalThroughputPerSec = config?.normalThroughputPerSec ?? 100; + this.backpressureThroughputPerSec = config?.backpressureThroughputPerSec ?? 10; + } + + /** + * Check if the system should apply backpressure based on queue size + */ + checkAndApplyBackpressure(queueSize: number): boolean { + const now = Date.now(); + + if (!this.isBackpressureActive && queueSize >= this.saturationThreshold) { + this.activateBackpressure(queueSize); + return true; + } + + if (this.isBackpressureActive && queueSize <= this.recoveryThreshold) { + this.deactivateBackpressure(queueSize); + return true; + } + + return this.isBackpressureActive; + } + + /** + * Calculate delay for processing based on backpressure state + * Returns milliseconds to delay the next event processing + */ + calculateProcessingDelay(): number { + if (!this.isBackpressureActive) { + return 0; + } + + // Calculate target delay to achieve desired throughput + const targetThroughputPerMs = this.backpressureThroughputPerSec / 1000; + const delayMs = 1 / targetThroughputPerMs; + + return Math.ceil(delayMs); + } + + /** + * Record an event being processed + */ + recordEventProcessing(): void { + this.processingTimestamps.push(Date.now()); + + // Clean up old timestamps outside measurement window + const cutoffTime = Date.now() - this.measurementWindowMs; + this.processingTimestamps = this.processingTimestamps.filter((ts) => ts >= cutoffTime); + } + + /** + * Get current metrics + */ + getMetrics(currentQueueSize: number): BackpressureMetrics { + const now = Date.now(); + const eventsInWindow = this.processingTimestamps.length; + const timeElapsedMs = Math.min(now - this.lastMetricsTime, this.measurementWindowMs); + const throughputPerSec = timeElapsedMs > 0 ? (eventsInWindow / timeElapsedMs) * 1000 : 0; + const targetThroughput = this.isBackpressureActive + ? this.backpressureThroughputPerSec + : this.normalThroughputPerSec; + + return { + isActive: this.isBackpressureActive, + queueSize: currentQueueSize, + eventsProcessedInWindow: eventsInWindow, + throughputPerSec, + targetThroughputPerSec: targetThroughput, + activeSinceMs: this.isBackpressureActive ? now - this.backpressureStartTime : 0, + totalBackpressureEvents: this.totalBackpressureEvents, + }; + } + + /** + * Check if backpressure is currently active + */ + isActive(): boolean { + return this.isBackpressureActive; + } + + /** + * Reset metrics + */ + reset(): void { + this.processingTimestamps = []; + this.lastMetricsTime = Date.now(); + this.isBackpressureActive = false; + this.backpressureStartTime = 0; + } + + private activateBackpressure(queueSize: number): void { + this.isBackpressureActive = true; + this.backpressureStartTime = Date.now(); + this.totalBackpressureEvents++; + + logger.warn('Backpressure activated: queue saturation detected', { + queueSize, + threshold: this.saturationThreshold, + targetThroughputPerSec: this.backpressureThroughputPerSec, + }); + } + + private deactivateBackpressure(queueSize: number): void { + const duration = Date.now() - this.backpressureStartTime; + + logger.info('Backpressure deactivated: queue recovered', { + queueSize, + threshold: this.recoveryThreshold, + durationMs: duration, + targetThroughputPerSec: this.normalThroughputPerSec, + }); + + this.isBackpressureActive = false; + this.backpressureStartTime = 0; + } +} diff --git a/listener/src/services/backpressure-monitor.ts b/listener/src/services/backpressure-monitor.ts new file mode 100644 index 0000000..1b2aa0e --- /dev/null +++ b/listener/src/services/backpressure-monitor.ts @@ -0,0 +1,158 @@ +import { Database } from '../database/database'; +import logger from '../utils/logger'; + +export interface BackpressureEvent { + id?: number; + event_type: 'ACTIVATED' | 'DEACTIVATED'; + queue_size: number; + target_throughput_per_sec: number; + duration_ms?: number; + reason?: string; + timestamp: string; +} + +/** + * Monitors and records backpressure events to the database + * Provides audit trail and historical analysis of system load + */ +export class BackpressureMonitor { + constructor(private db: Database) {} + + /** + * Record a backpressure event (activation or deactivation) + */ + async recordEvent(event: BackpressureEvent): Promise { + const sql = ` + INSERT INTO backpressure_events ( + event_type, queue_size, target_throughput_per_sec, + duration_ms, reason, timestamp + ) VALUES (?, ?, ?, ?, ?, ?) + `; + + const result = await this.db.run(sql, [ + event.event_type, + event.queue_size, + event.target_throughput_per_sec, + event.duration_ms ?? null, + event.reason ?? null, + event.timestamp, + ]); + + logger.info('Backpressure event recorded', { + id: result.lastID, + eventType: event.event_type, + queueSize: event.queue_size, + }); + + return result.lastID; + } + + /** + * Get backpressure events within a time range + */ + async getEventsInRange(startTime: Date, endTime: Date): Promise { + const sql = ` + SELECT * FROM backpressure_events + WHERE timestamp >= ? AND timestamp <= ? + ORDER BY timestamp DESC + `; + + const rows = await this.db.all(sql, [ + startTime.toISOString(), + endTime.toISOString(), + ]); + + return rows || []; + } + + /** + * Get the most recent backpressure events + */ + async getRecentEvents(limit: number = 50): Promise { + const sql = ` + SELECT * FROM backpressure_events + ORDER BY timestamp DESC + LIMIT ? + `; + + const rows = await this.db.all(sql, [limit]); + return rows || []; + } + + /** + * Get backpressure statistics + */ + async getStatistics(): Promise<{ + totalActivations: number; + totalDeactivations: number; + averageQueueSizeAtActivation: number; + averageQueueSizeAtDeactivation: number; + averageDurationMs: number; + lastActivationAt: string | null; + lastDeactivationAt: string | null; + }> { + const activationsSql = ` + SELECT + COUNT(*) as count, + AVG(queue_size) as avg_queue_size, + MAX(timestamp) as last_at + FROM backpressure_events + WHERE event_type = 'ACTIVATED' + `; + + const deactivationsSql = ` + SELECT + COUNT(*) as count, + AVG(queue_size) as avg_queue_size, + AVG(duration_ms) as avg_duration, + MAX(timestamp) as last_at + FROM backpressure_events + WHERE event_type = 'DEACTIVATED' + `; + + const [activations, deactivations] = await Promise.all([ + this.db.get<{ + count: number; + avg_queue_size: number; + last_at: string; + }>(activationsSql), + this.db.get<{ + count: number; + avg_queue_size: number; + avg_duration: number; + last_at: string; + }>(deactivationsSql), + ]); + + return { + totalActivations: activations?.count ?? 0, + totalDeactivations: deactivations?.count ?? 0, + averageQueueSizeAtActivation: activations?.avg_queue_size ?? 0, + averageQueueSizeAtDeactivation: deactivations?.avg_queue_size ?? 0, + averageDurationMs: deactivations?.avg_duration ?? 0, + lastActivationAt: activations?.last_at ?? null, + lastDeactivationAt: deactivations?.last_at ?? null, + }; + } + + /** + * Clean up old backpressure events (older than specified days) + */ + async cleanupOldEvents(daysOld: number = 30): Promise { + const cutoffDate = new Date(); + cutoffDate.setDate(cutoffDate.getDate() - daysOld); + + const sql = ` + DELETE FROM backpressure_events + WHERE timestamp < ? + `; + + const result = await this.db.run(sql, [cutoffDate.toISOString()]); + + if (result.changes > 0) { + logger.info('Cleaned up old backpressure events', { count: result.changes, daysOld }); + } + + return result.changes; + } +} diff --git a/listener/src/services/idempotency-key-repository.ts b/listener/src/services/idempotency-key-repository.ts new file mode 100644 index 0000000..d85661f --- /dev/null +++ b/listener/src/services/idempotency-key-repository.ts @@ -0,0 +1,210 @@ +import { Database } from '../database/database'; +import logger from '../utils/logger'; +import * as crypto from 'crypto'; + +export interface IdempotencyKeyRecord { + id: number; + idempotency_key: string; + request_hash: string; + response_notification_id: number; + response_data: string; + created_at: string; + expires_at: string; + status: string; +} + +export interface IdempotencyResponse { + notificationId: number; + isDuplicate: boolean; + response: any; +} + +/** + * Repository for managing idempotency keys + * Prevents duplicate notification creation from duplicate requests + */ +export class IdempotencyKeyRepository { + private readonly defaultExpirationMinutes = 24 * 60; // 24 hours + + constructor(private db: Database) {} + + /** + * Check if a request with this idempotency key has already been processed + * If it has, returns the cached response + */ + async getCachedResponse(idempotencyKey: string): Promise { + const sql = ` + SELECT * FROM idempotency_keys + WHERE idempotency_key = ? AND status = 'PROCESSED' + `; + + const row = await this.db.get(sql, [idempotencyKey]); + + if (!row) { + return null; + } + + // Check if the key has expired + const expiresAt = new Date(row.expires_at); + if (expiresAt <= new Date()) { + // Mark as expired + await this.db.run( + 'UPDATE idempotency_keys SET status = ? WHERE id = ?', + ['EXPIRED', row.id] + ); + return null; + } + + logger.info('Found cached response for idempotency key', { + idempotencyKey, + notificationId: row.response_notification_id, + }); + + return { + notificationId: row.response_notification_id, + isDuplicate: true, + response: JSON.parse(row.response_data), + }; + } + + /** + * Store a processed request's response for future deduplication + */ + async storeResponse( + idempotencyKey: string, + requestBody: any, + notificationId: number, + response: any, + expirationMinutes?: number + ): Promise { + const requestHash = this.hashRequest(requestBody); + const expirationMs = (expirationMinutes || this.defaultExpirationMinutes) * 60 * 1000; + const expiresAt = new Date(Date.now() + expirationMs); + + const sql = ` + INSERT INTO idempotency_keys ( + idempotency_key, request_hash, response_notification_id, + response_data, expires_at, status + ) VALUES (?, ?, ?, ?, ?, 'PROCESSED') + `; + + try { + const result = await this.db.run(sql, [ + idempotencyKey, + requestHash, + notificationId, + JSON.stringify(response), + expiresAt.toISOString(), + ]); + + logger.info('Stored idempotency key response', { + idempotencyKey, + notificationId, + expiresAt: expiresAt.toISOString(), + }); + + return result.lastID; + } catch (error) { + // If the key already exists, that's fine - another request beat us to it + if ((error as any).message?.includes('UNIQUE constraint failed')) { + logger.warn('Idempotency key already exists', { idempotencyKey }); + // Try to get the existing response + const existing = await this.getCachedResponse(idempotencyKey); + if (existing) { + return existing.notificationId; + } + } + throw error; + } + } + + /** + * Validate that a request matches a previously stored request + * Returns true if the hashes match, false otherwise + */ + async validateRequestHash( + idempotencyKey: string, + requestBody: any + ): Promise { + const sql = ` + SELECT request_hash FROM idempotency_keys + WHERE idempotency_key = ? + `; + + const row = await this.db.get<{ request_hash: string }>(sql, [idempotencyKey]); + + if (!row) { + return true; // No previous request to validate against + } + + const currentHash = this.hashRequest(requestBody); + const matches = currentHash === row.request_hash; + + if (!matches) { + logger.warn('Request hash mismatch for idempotency key', { + idempotencyKey, + expectedHash: row.request_hash, + actualHash: currentHash, + }); + } + + return matches; + } + + /** + * Clean up expired idempotency keys + * Should be called periodically to maintain database size + */ + async cleanupExpiredKeys(): Promise { + const sql = ` + DELETE FROM idempotency_keys + WHERE expires_at < ? + `; + + const result = await this.db.run(sql, [new Date().toISOString()]); + + if (result.changes > 0) { + logger.info('Cleaned up expired idempotency keys', { count: result.changes }); + } + + return result.changes; + } + + /** + * Get statistics about idempotency keys + */ + async getStats(): Promise<{ + total: number; + processed: number; + expired: number; + oldestKey: string | null; + }> { + const totalSql = 'SELECT COUNT(*) as count FROM idempotency_keys'; + const processedSql = `SELECT COUNT(*) as count FROM idempotency_keys WHERE status = 'PROCESSED'`; + const expiredSql = `SELECT COUNT(*) as count FROM idempotency_keys WHERE status = 'EXPIRED'`; + const oldestSql = `SELECT idempotency_key FROM idempotency_keys ORDER BY created_at ASC LIMIT 1`; + + const [total, processed, expired, oldest] = await Promise.all([ + this.db.get<{ count: number }>(totalSql), + this.db.get<{ count: number }>(processedSql), + this.db.get<{ count: number }>(expiredSql), + this.db.get<{ idempotency_key: string }>(oldestSql), + ]); + + return { + total: total?.count ?? 0, + processed: processed?.count ?? 0, + expired: expired?.count ?? 0, + oldestKey: oldest?.idempotency_key ?? null, + }; + } + + /** + * Generate a hash of a request body for validation + * Ensures requests with the same content are idempotent + */ + private hashRequest(requestBody: any): string { + const json = JSON.stringify(requestBody); + return crypto.createHash('sha256').update(json).digest('hex'); + } +} diff --git a/listener/src/services/idempotency-key-service.test.ts b/listener/src/services/idempotency-key-service.test.ts new file mode 100644 index 0000000..d83e1ce --- /dev/null +++ b/listener/src/services/idempotency-key-service.test.ts @@ -0,0 +1,133 @@ +import { IdempotencyKeyService } from './idempotency-key-service'; +import { IdempotencyKeyRepository } from './idempotency-key-repository'; + +describe('IdempotencyKeyService', () => { + let service: IdempotencyKeyService; + let mockRepository: any; + + beforeEach(() => { + mockRepository = { + getCachedResponse: jest.fn(), + validateRequestHash: jest.fn(), + storeResponse: jest.fn(), + cleanupExpiredKeys: jest.fn(), + getStats: jest.fn(), + }; + service = new IdempotencyKeyService(mockRepository as IdempotencyKeyRepository); + }); + + describe('processWithIdempotency', () => { + it('should execute processor and cache response on first call', async () => { + const idempotencyKey = 'test-key-123'; + const requestBody = { payload: 'test' }; + const processorResult = 42; + + mockRepository.getCachedResponse.mockResolvedValue(null); + mockRepository.validateRequestHash.mockResolvedValue(true); + mockRepository.storeResponse.mockResolvedValue(1); + + const processor = jest.fn().mockResolvedValue(processorResult); + + const result = await service.processWithIdempotency( + idempotencyKey, + requestBody, + processor + ); + + expect(result.result).toBe(processorResult); + expect(result.isDuplicate).toBe(false); + expect(processor).toHaveBeenCalledTimes(1); + expect(mockRepository.storeResponse).toHaveBeenCalled(); + }); + + it('should return cached response on duplicate call', async () => { + const idempotencyKey = 'test-key-123'; + const requestBody = { payload: 'test' }; + const cachedResponse = { + notificationId: 42, + isDuplicate: true, + response: { success: true, id: 42 }, + }; + + mockRepository.getCachedResponse.mockResolvedValue(cachedResponse); + + const processor = jest.fn(); + + const result = await service.processWithIdempotency( + idempotencyKey, + requestBody, + processor + ); + + expect(result.result).toEqual(cachedResponse.response); + expect(result.isDuplicate).toBe(true); + expect(processor).not.toHaveBeenCalled(); + expect(mockRepository.storeResponse).not.toHaveBeenCalled(); + }); + + it('should throw error if request hash does not match', async () => { + const idempotencyKey = 'test-key-123'; + const requestBody = { payload: 'test' }; + + mockRepository.getCachedResponse.mockResolvedValue(null); + mockRepository.validateRequestHash.mockResolvedValue(false); + + const processor = jest.fn(); + + await expect( + service.processWithIdempotency( + idempotencyKey, + requestBody, + processor + ) + ).rejects.toThrow('Idempotency key reused with different request body'); + + expect(processor).not.toHaveBeenCalled(); + }); + + it('should execute processor normally if no idempotency key provided', async () => { + const requestBody = { payload: 'test' }; + const processorResult = 42; + + const processor = jest.fn().mockResolvedValue(processorResult); + + const result = await service.processWithIdempotency( + undefined, + requestBody, + processor + ); + + expect(result.result).toBe(processorResult); + expect(result.isDuplicate).toBe(false); + expect(processor).toHaveBeenCalledTimes(1); + }); + }); + + describe('cleanupExpiredKeys', () => { + it('should call repository cleanup method', async () => { + mockRepository.cleanupExpiredKeys.mockResolvedValue(5); + + const count = await service.cleanupExpiredKeys(); + + expect(count).toBe(5); + expect(mockRepository.cleanupExpiredKeys).toHaveBeenCalledTimes(1); + }); + }); + + describe('getStatistics', () => { + it('should return statistics from repository', async () => { + const stats = { + total: 100, + processed: 95, + expired: 5, + oldestKey: 'old-key', + }; + + mockRepository.getStats.mockResolvedValue(stats); + + const result = await service.getStatistics(); + + expect(result).toEqual(stats); + }); + }); +}); diff --git a/listener/src/services/idempotency-key-service.ts b/listener/src/services/idempotency-key-service.ts new file mode 100644 index 0000000..424d7e4 --- /dev/null +++ b/listener/src/services/idempotency-key-service.ts @@ -0,0 +1,101 @@ +import { IdempotencyKeyRepository } from './idempotency-key-repository'; +import logger from '../utils/logger'; + +/** + * Service for managing request idempotency + * Prevents duplicate notification creation by caching responses + */ +export class IdempotencyKeyService { + constructor(private repository: IdempotencyKeyRepository) {} + + /** + * Process a request with idempotency support + * Returns cached response if this request was already processed + */ + async processWithIdempotency( + idempotencyKey: string | undefined, + requestBody: any, + processor: () => Promise, + options?: { + expirationMinutes?: number; + } + ): Promise<{ + result: T; + isDuplicate: boolean; + notificationId?: number; + }> { + // If no idempotency key provided, just execute normally + if (!idempotencyKey) { + const result = await processor(); + return { result, isDuplicate: false }; + } + + // Check if we have a cached response + const cached = await this.repository.getCachedResponse(idempotencyKey); + if (cached) { + logger.info('Returning cached response for idempotent request', { + idempotencyKey, + notificationId: cached.notificationId, + }); + return { + result: cached.response, + isDuplicate: true, + notificationId: cached.notificationId, + }; + } + + // Validate request hash if key exists but is expired + const isValidRequest = await this.repository.validateRequestHash( + idempotencyKey, + requestBody + ); + + if (!isValidRequest) { + const error = new Error( + 'Idempotency key reused with different request body' + ); + logger.error('Request validation failed', { + idempotencyKey, + error: error.message, + }); + throw error; + } + + // Execute the processor and cache the response + logger.info('Processing new idempotent request', { idempotencyKey }); + const result = await processor(); + + // Cache the response for future duplicate requests + // Note: We need the notification ID from the result + const notificationId = + typeof result === 'number' ? result : (result as any).id; + + await this.repository.storeResponse( + idempotencyKey, + requestBody, + notificationId, + { success: true, id: notificationId }, + options?.expirationMinutes + ); + + return { + result, + isDuplicate: false, + notificationId, + }; + } + + /** + * Clean up expired keys (should be called periodically) + */ + async cleanupExpiredKeys(): Promise { + return await this.repository.cleanupExpiredKeys(); + } + + /** + * Get statistics about idempotency key usage + */ + async getStatistics() { + return await this.repository.getStats(); + } +} diff --git a/listener/src/services/notification-api.ts b/listener/src/services/notification-api.ts index 1894243..439db86 100644 --- a/listener/src/services/notification-api.ts +++ b/listener/src/services/notification-api.ts @@ -1,20 +1,27 @@ import { ScheduledNotificationRepository } from './scheduled-notification-repository'; +import { IdempotencyKeyService } from './idempotency-key-service'; import { CreateScheduledNotificationInput, NotificationType } from '../types/scheduled-notification'; import logger from '../utils/logger'; /** * High-level API for scheduling notifications * This is the main interface that application code should use + * Includes support for idempotent request handling */ export class NotificationAPI { - constructor(private repository: ScheduledNotificationRepository) {} + constructor( + private repository: ScheduledNotificationRepository, + private idempotencyService?: IdempotencyKeyService + ) {} /** * Schedule a notification for future delivery + * Supports idempotent request handling via idempotency keys */ async scheduleNotification( input: CreateScheduledNotificationInput, - requestId?: string + requestId?: string, + idempotencyKey?: string ): Promise { // Validate input if (!input.executeAt || !(input.executeAt instanceof Date) || isNaN(input.executeAt.getTime())) { @@ -35,11 +42,34 @@ export class NotificationAPI { logger.info('Scheduling new notification', { requestId, + idempotencyKey, type: input.notificationType, executeAt: input.executeAt, recipient: input.targetRecipient, }); + // If idempotency service is available, use it for deduplication + if (this.idempotencyService && idempotencyKey) { + const { result, isDuplicate, notificationId } = + await this.idempotencyService.processWithIdempotency( + idempotencyKey, + input, + async () => { + return await this.repository.create(input, requestId); + } + ); + + if (isDuplicate) { + logger.info('Returned duplicate notification response', { + requestId, + idempotencyKey, + notificationId, + }); + } + + return result; + } + return await this.repository.create(input, requestId); }