diff --git a/backend/.env.example b/backend/.env.example index e41e3b3f..85b09117 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -31,3 +31,24 @@ COMPRESSION_THRESHOLD=1024 STRIPE_SECRET_KEY=sk_test_... STRIPE_PUBLISHABLE_KEY=pk_test_... STRIPE_WEBHOOK_SECRET=whsec_... + +# Database Read Replicas +# DB_READ_REPLICA_URLS=postgresql://user:pass@replica1:5432/db,postgresql://user:pass@replica2:5432/db +# DB_REPLICA_MAX_LAG_MS=5000 + +# Notification Channels +# SMTP_HOST=smtp.example.com +# SMTP_PORT=587 +# SMTP_USER=user@example.com +# SMTP_PASSWORD=password +# SMTP_FROM=noreply@agenticpay.com +# SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/WEBHOOK/URL +# SLACK_BOT_TOKEN=xoxb-your-token +# NOTIFICATION_WEBHOOK_URL=https://your-webhook-endpoint.com/notify +# NOTIFICATION_WEBHOOK_SECRET=your-webhook-secret + +# Redis Cluster +# REDIS_CLUSTER_NODES=redis1:6379,redis2:6379,redis3:6379 +# REDIS_CLUSTER_MAX_REDIRECTIONS=3 +# REDIS_CLUSTER_RETRY_ATTEMPTS=3 +# REDIS_CLUSTER_RETRY_DELAY=500 diff --git a/backend/docs/NOTIFICATION_CHANNELS.md b/backend/docs/NOTIFICATION_CHANNELS.md new file mode 100644 index 00000000..8baec540 --- /dev/null +++ b/backend/docs/NOTIFICATION_CHANNELS.md @@ -0,0 +1,96 @@ +# Notification Channel Plugin System + +## Overview + +The notification system uses a channel-based plugin architecture that allows dynamic addition of new notification channels without modifying core code. + +## Built-in Channels + +- **In-App**: Real-time notifications via WebSocket (highest priority) +- **Email**: SMTP-based email notifications +- **Slack**: Slack webhook notifications with rich formatting +- **Webhook**: Generic webhook for custom integrations + +## Creating a Custom Channel + +Implement the `NotificationChannel` interface: + +```typescript +import { + NotificationChannel, + Notification, + NotificationDeliveryResult, +} from "./channel-interface"; + +export class CustomChannel implements NotificationChannel { + readonly id = "custom"; + readonly name = "Custom Channel"; + readonly enabled = true; + readonly priority = 50; + + async send(notification: Notification): Promise { + // Send notification + } + + async format(notification: Notification): Promise { + // Format for channel + } + + async validate(): Promise { + // Validate configuration + } + + async healthCheck(): Promise { + // Check channel health + } + + getRateLimit() { + return { maxPerHour: 10, maxPerDay: 50 }; + } +} +``` + +## Registration + +```typescript +import { channelRegistry } from "./channel-registry"; +import { CustomChannel } from "./channels/custom-channel"; + +const customChannel = new CustomChannel(config); +channelRegistry.register(customChannel); +``` + +## User Preferences + +Users can configure: + +- Preferred channels per event type +- Channel priority/fallback order +- Quiet hours (no notifications during specified times) + +## Fallback Chain + +If the primary channel fails, the system automatically tries fallback channels in priority order. + +## Rate Limiting + +Each channel enforces its own rate limits to prevent spam. + +## Environment Variables + +``` +# Email Channel +SMTP_HOST=smtp.example.com +SMTP_PORT=587 +SMTP_USER=user@example.com +SMTP_PASSWORD=password +SMTP_FROM=noreply@agenticpay.com + +# Slack Channel +SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/WEBHOOK/URL +SLACK_BOT_TOKEN=xoxb-your-token + +# Webhook Channel +NOTIFICATION_WEBHOOK_URL=https://your-webhook-endpoint.com/notify +NOTIFICATION_WEBHOOK_SECRET=your-webhook-secret +``` diff --git a/backend/src/cache/cluster-client.ts b/backend/src/cache/cluster-client.ts new file mode 100644 index 00000000..5af1b6d9 --- /dev/null +++ b/backend/src/cache/cluster-client.ts @@ -0,0 +1,393 @@ +/** + * Redis Cluster client wrapper with automatic failover and slot management + */ + +import { + createClient, + RedisClientType, + RedisFunctions, + RedisModules, + RedisScripts, +} from "redis"; + +export interface ClusterNode { + host: string; + port: number; +} + +export interface ClusterConfig { + nodes: ClusterNode[]; + maxRedirections: number; + retryAttempts: number; + retryDelay: number; + enableReadFromReplicas: boolean; +} + +export interface ClusterHealth { + nodeId: string; + host: string; + port: number; + role: "master" | "replica"; + connected: boolean; + slots: number[]; + memory: { + used: number; + max: number; + percentage: number; + }; +} + +export class RedisClusterClient { + private clients: Map = new Map(); + private config: ClusterConfig; + private slotMap: Map = new Map(); // slot -> nodeId + private healthCheckInterval?: NodeJS.Timeout; + + constructor(config: ClusterConfig) { + this.config = config; + } + + async connect(): Promise { + // Connect to all cluster nodes + for (const node of this.config.nodes) { + const nodeId = `${node.host}:${node.port}`; + const client = createClient({ + socket: { + host: node.host, + port: node.port, + reconnectStrategy: (retries) => { + if (retries > this.config.retryAttempts) { + console.error(`Max retries reached for ${nodeId}`); + return false; + } + return this.config.retryDelay * Math.pow(2, retries); + }, + }, + }); + + client.on("error", (err) => { + console.error(`Redis cluster node ${nodeId} error:`, err); + }); + + await client.connect(); + this.clients.set(nodeId, client); + console.log(`Connected to Redis cluster node: ${nodeId}`); + } + + // Build slot map + await this.updateSlotMap(); + + // Start health monitoring + this.startHealthCheck(); + } + + /** + * Get client for a specific key using consistent hashing + */ + private getClientForKey(key: string): RedisClientType { + const slot = this.calculateSlot(key); + const nodeId = this.slotMap.get(slot); + + if (nodeId) { + const client = this.clients.get(nodeId); + if (client) { + return client; + } + } + + // Fallback to first available client + const fallbackClient = Array.from(this.clients.values())[0]; + if (!fallbackClient) { + throw new Error("No Redis cluster nodes available"); + } + return fallbackClient; + } + + /** + * Calculate Redis slot for a key (CRC16 mod 16384) + */ + private calculateSlot(key: string): number { + // Extract hashtag if present: key{hashtag} + const hashtagMatch = key.match(/\{([^}]+)\}/); + const hashKey = hashtagMatch ? hashtagMatch[1] : key; + + return this.crc16(hashKey) % 16384; + } + + /** + * CRC16 implementation for Redis cluster + */ + private crc16(str: string): number { + const crcTable = [ + 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7, 0x8108, + 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef, + ]; + + let crc = 0; + for (let i = 0; i < str.length; i++) { + const byte = str.charCodeAt(i); + crc = + ((crc << 4) ^ crcTable[((crc >> 12) ^ (byte >> 4)) & 0x0f]) & 0xffff; + crc = + ((crc << 4) ^ crcTable[((crc >> 12) ^ (byte & 0x0f)) & 0x0f]) & 0xffff; + } + return crc; + } + + /** + * Update slot map from cluster info + */ + private async updateSlotMap(): Promise { + const firstClient = Array.from(this.clients.values())[0]; + if (!firstClient) return; + + try { + // Get cluster slots information + const clusterSlots: any = await firstClient.sendCommand([ + "CLUSTER", + "SLOTS", + ]); + + this.slotMap.clear(); + + for (const slotRange of clusterSlots) { + const [startSlot, endSlot, master] = slotRange; + const nodeId = `${master[0]}:${master[1]}`; + + for (let slot = startSlot; slot <= endSlot; slot++) { + this.slotMap.set(slot, nodeId); + } + } + + console.log(`Updated slot map: ${this.slotMap.size} slots mapped`); + } catch (error) { + console.error("Failed to update slot map:", error); + } + } + + /** + * Execute command with automatic redirection handling + */ + private async executeWithRedirection( + key: string, + command: (client: RedisClientType) => Promise, + ): Promise { + let attempts = 0; + let client = this.getClientForKey(key); + + while (attempts < this.config.maxRedirections) { + try { + return await command(client); + } catch (error: any) { + // Handle MOVED redirection + if (error.message?.includes("MOVED")) { + const [, , newHost, newPort] = error.message.split(" "); + const newNodeId = `${newHost}:${newPort}`; + const newClient = this.clients.get(newNodeId); + + if (newClient) { + client = newClient; + await this.updateSlotMap(); + } + } + // Handle ASK redirection + else if (error.message?.includes("ASK")) { + const [, , newHost, newPort] = error.message.split(" "); + const newNodeId = `${newHost}:${newPort}`; + const newClient = this.clients.get(newNodeId); + + if (newClient) { + await newClient.sendCommand(["ASKING"]); + client = newClient; + } + } else { + throw error; + } + + attempts++; + } + } + + throw new Error("Max redirections exceeded"); + } + + /** + * Get value by key + */ + async get(key: string): Promise { + return this.executeWithRedirection(key, (client) => client.get(key)); + } + + /** + * Set value by key + */ + async set( + key: string, + value: string, + options?: { EX?: number }, + ): Promise { + await this.executeWithRedirection(key, (client) => + options?.EX + ? client.setEx(key, options.EX, value) + : client.set(key, value), + ); + } + + /** + * Delete key + */ + async del(key: string): Promise { + return this.executeWithRedirection(key, (client) => client.del(key)); + } + + /** + * Check if key exists + */ + async exists(key: string): Promise { + return this.executeWithRedirection(key, (client) => client.exists(key)); + } + + /** + * Set expiration on key + */ + async expire(key: string, seconds: number): Promise { + return this.executeWithRedirection(key, (client) => + client.expire(key, seconds), + ); + } + + /** + * Increment value + */ + async incr(key: string): Promise { + return this.executeWithRedirection(key, (client) => client.incr(key)); + } + + /** + * Batch operations within same slot (using hashtags) + */ + async pipeline( + operations: Array<{ key: string; op: string; args: any[] }>, + ): Promise { + // Group operations by slot + const slotGroups = new Map(); + + for (const operation of operations) { + const slot = this.calculateSlot(operation.key); + if (!slotGroups.has(slot)) { + slotGroups.set(slot, []); + } + slotGroups.get(slot)!.push(operation); + } + + // Execute pipelines per slot + const results = await Promise.all( + Array.from(slotGroups.values()).map(async (ops) => { + const client = this.getClientForKey(ops[0].key); + const multi = client.multi(); + + for (const op of ops) { + // @ts-ignore + multi[op.op](...op.args); + } + + return multi.exec(); + }), + ); + + return results.flat(); + } + + /** + * Health check for all cluster nodes + */ + async getClusterHealth(): Promise { + const health: ClusterHealth[] = []; + + for (const [nodeId, client] of this.clients.entries()) { + try { + const info: any = await client.info("memory"); + const [host, port] = nodeId.split(":"); + + const usedMemory = this.parseInfoValue(info, "used_memory"); + const maxMemory = + this.parseInfoValue(info, "maxmemory") || usedMemory * 2; + + health.push({ + nodeId, + host, + port: parseInt(port), + role: "master", // Simplified + connected: client.isReady, + slots: Array.from(this.slotMap.entries()) + .filter(([, id]) => id === nodeId) + .map(([slot]) => slot), + memory: { + used: usedMemory, + max: maxMemory, + percentage: (usedMemory / maxMemory) * 100, + }, + }); + } catch (error) { + console.error(`Health check failed for ${nodeId}:`, error); + } + } + + return health; + } + + private parseInfoValue(info: string, key: string): number { + const match = info.match(new RegExp(`${key}:(\\d+)`)); + return match ? parseInt(match[1]) : 0; + } + + /** + * Start periodic health checks + */ + private startHealthCheck(): void { + this.healthCheckInterval = setInterval(async () => { + const health = await this.getClusterHealth(); + console.log("Cluster health:", JSON.stringify(health, null, 2)); + + // Check for unhealthy nodes + for (const node of health) { + if (!node.connected) { + console.warn(`Node ${node.nodeId} is disconnected`); + } + if (node.memory.percentage > 90) { + console.warn( + `Node ${node.nodeId} memory usage is high: ${node.memory.percentage.toFixed(2)}%`, + ); + } + } + }, 30000); // Every 30 seconds + } + + /** + * Gracefully disconnect all clients + */ + async disconnect(): Promise { + if (this.healthCheckInterval) { + clearInterval(this.healthCheckInterval); + } + + for (const [nodeId, client] of this.clients.entries()) { + try { + await client.quit(); + console.log(`Disconnected from ${nodeId}`); + } catch (error) { + console.error(`Error disconnecting from ${nodeId}:`, error); + } + } + + this.clients.clear(); + this.slotMap.clear(); + } +} + +/** + * Create Redis cluster client instance + */ +export function createRedisCluster(config: ClusterConfig): RedisClusterClient { + return new RedisClusterClient(config); +} diff --git a/backend/src/config/database.ts b/backend/src/config/database.ts index 8203121d..548e399f 100644 --- a/backend/src/config/database.ts +++ b/backend/src/config/database.ts @@ -5,7 +5,7 @@ * PgBouncer integration, and recommended composite indexes for AgenticPay. */ -import { featureFlags } from './featureFlags.js'; +import { featureFlags } from "./featureFlags.js"; // ── Pool configuration ───────────────────────────────────────────────────────── @@ -25,19 +25,19 @@ function envInt(key: string, fallback: number): number { export function buildPoolConfig(env = process.env.NODE_ENV): PoolConfig { switch (env) { - case 'production': + case "production": return { - max: envInt('DB_POOL_MAX', 50), - min: envInt('DB_POOL_MIN', 5), - acquireTimeoutMs: envInt('DB_ACQUIRE_TIMEOUT_MS', 10_000), - idleTimeoutMs: envInt('DB_IDLE_TIMEOUT_MS', 300_000), - createTimeoutMs: envInt('DB_CREATE_TIMEOUT_MS', 10_000), - maxConnectionAgeMs: envInt('DB_MAX_AGE_MS', 1_800_000), + max: envInt("DB_POOL_MAX", 50), + min: envInt("DB_POOL_MIN", 5), + acquireTimeoutMs: envInt("DB_ACQUIRE_TIMEOUT_MS", 10_000), + idleTimeoutMs: envInt("DB_IDLE_TIMEOUT_MS", 300_000), + createTimeoutMs: envInt("DB_CREATE_TIMEOUT_MS", 10_000), + maxConnectionAgeMs: envInt("DB_MAX_AGE_MS", 1_800_000), }; - case 'staging': + case "staging": return { - max: envInt('DB_POOL_MAX', 20), - min: envInt('DB_POOL_MIN', 2), + max: envInt("DB_POOL_MAX", 20), + min: envInt("DB_POOL_MIN", 2), acquireTimeoutMs: 15_000, idleTimeoutMs: 600_000, createTimeoutMs: 15_000, @@ -45,8 +45,8 @@ export function buildPoolConfig(env = process.env.NODE_ENV): PoolConfig { }; default: return { - max: envInt('DB_POOL_MAX', 10), - min: envInt('DB_POOL_MIN', 1), + max: envInt("DB_POOL_MAX", 10), + min: envInt("DB_POOL_MIN", 1), acquireTimeoutMs: 30_000, idleTimeoutMs: 900_000, createTimeoutMs: 30_000, @@ -59,7 +59,7 @@ export function buildPoolConfig(env = process.env.NODE_ENV): PoolConfig { export interface PgBouncerConfig { enabled: boolean; - poolMode: 'transaction' | 'session' | 'statement'; + poolMode: "transaction" | "session" | "statement"; defaultPoolSize: number; maxPoolSize: number; minPoolSize: number; @@ -75,20 +75,20 @@ export interface PgBouncerConfig { } const DEFAULT_PGBOUNCER_CONFIG: PgBouncerConfig = { - enabled: process.env.PGBOUNCER_ENABLED === 'true', - poolMode: 'transaction', - defaultPoolSize: envInt('PGBOUNCER_DEFAULT_POOL_SIZE', 25), - maxPoolSize: envInt('PGBOUNCER_MAX_POOL_SIZE', 50), - minPoolSize: envInt('PGBOUNCER_MIN_POOL_SIZE', 5), - reservePoolSize: envInt('PGBOUNCER_RESERVE_POOL_SIZE', 5), - reservePoolTimeoutMs: envInt('PGBOUNCER_RESERVE_POOL_TIMEOUT_MS', 5_000), - maxClientConnections: envInt('PGBOUNCER_MAX_CLIENT_CONNECTIONS', 100), - maxPreparedStatements: envInt('PGBOUNCER_MAX_PREPARED_STATEMENTS', 50), - queryTimeoutMs: envInt('PGBOUNCER_QUERY_TIMEOUT_MS', 30_000), - idleTimeoutMs: envInt('PGBOUNCER_IDLE_TIMEOUT_MS', 600_000), - serverLifetimeMs: envInt('PGBOUNCER_SERVER_LIFETIME_MS', 3_600_000), - serverIdleTimeoutMs: envInt('PGBOUNCER_SERVER_IDLE_TIMEOUT_MS', 600_000), - healthCheckIntervalMs: envInt('PGBOUNCER_HEALTH_CHECK_INTERVAL_MS', 30_000), + enabled: process.env.PGBOUNCER_ENABLED === "true", + poolMode: "transaction", + defaultPoolSize: envInt("PGBOUNCER_DEFAULT_POOL_SIZE", 25), + maxPoolSize: envInt("PGBOUNCER_MAX_POOL_SIZE", 50), + minPoolSize: envInt("PGBOUNCER_MIN_POOL_SIZE", 5), + reservePoolSize: envInt("PGBOUNCER_RESERVE_POOL_SIZE", 5), + reservePoolTimeoutMs: envInt("PGBOUNCER_RESERVE_POOL_TIMEOUT_MS", 5_000), + maxClientConnections: envInt("PGBOUNCER_MAX_CLIENT_CONNECTIONS", 100), + maxPreparedStatements: envInt("PGBOUNCER_MAX_PREPARED_STATEMENTS", 50), + queryTimeoutMs: envInt("PGBOUNCER_QUERY_TIMEOUT_MS", 30_000), + idleTimeoutMs: envInt("PGBOUNCER_IDLE_TIMEOUT_MS", 600_000), + serverLifetimeMs: envInt("PGBOUNCER_SERVER_LIFETIME_MS", 3_600_000), + serverIdleTimeoutMs: envInt("PGBOUNCER_SERVER_IDLE_TIMEOUT_MS", 600_000), + healthCheckIntervalMs: envInt("PGBOUNCER_HEALTH_CHECK_INTERVAL_MS", 30_000), }; let pgBouncerConfig: PgBouncerConfig = { ...DEFAULT_PGBOUNCER_CONFIG }; @@ -186,7 +186,8 @@ class PoolMetricsCollector { snapshot(): ConnectionPoolMetrics { const averageAcquireTimeMs = this.acquireTimes.length > 0 - ? this.acquireTimes.reduce((sum, t) => sum + t, 0) / this.acquireTimes.length + ? this.acquireTimes.reduce((sum, t) => sum + t, 0) / + this.acquireTimes.length : 0; return { @@ -272,7 +273,9 @@ class ConnectionLeaseManager { const now = Date.now(); for (const [id, lease] of this.leases.entries()) { if (!lease.released && now - lease.acquiredAt > this.leaseTimeoutMs) { - console.warn(`[PoolLeak] Connection ${id} has been held for ${now - lease.acquiredAt}ms without release`); + console.warn( + `[PoolLeak] Connection ${id} has been held for ${now - lease.acquiredAt}ms without release`, + ); poolMetrics.recordLeakDetected(); this.leases.delete(id); } else if (lease.released) { @@ -328,10 +331,12 @@ class PoolExhaustionManager { for (const handler of this.handlers) { try { handler.onExhaustion(); - } catch { } + } catch {} } - console.warn(`[PoolExhaustion] Pool exhausted, backing off for ${this.backoffMs}ms`); + console.warn( + `[PoolExhaustion] Pool exhausted, backing off for ${this.backoffMs}ms`, + ); } notifyRecovery(): void { @@ -341,10 +346,10 @@ class PoolExhaustionManager { for (const handler of this.handlers) { try { handler.onRecovery(); - } catch { } + } catch {} } - console.log('[PoolExhaustion] Pool recovered'); + console.log("[PoolExhaustion] Pool recovered"); } scheduleRecovery(): void { @@ -369,7 +374,8 @@ export const poolExhaustionManager = new PoolExhaustionManager(); // ── Prepared Statement Registry ──────────────────────────────────────────────── export const PREPARED_STATEMENTS = { - getPaymentById: 'SELECT * FROM payments WHERE id = $1 AND tenant_id = $2 LIMIT 1', + getPaymentById: + "SELECT * FROM payments WHERE id = $1 AND tenant_id = $2 LIMIT 1", listPendingPayments: "SELECT id, tx_hash, amount, network FROM payments WHERE status = 'pending' ORDER BY created_at ASC LIMIT $1", upsertGasEstimate: ` @@ -422,7 +428,10 @@ class PreparedStatementManager { } getRegisteredStatements(): Array<{ name: string; sql: string }> { - return Array.from(this.statements.entries()).map(([name, sql]) => ({ name, sql })); + return Array.from(this.statements.entries()).map(([name, sql]) => ({ + name, + sql, + })); } getStatementCount(): number { @@ -431,16 +440,19 @@ class PreparedStatementManager { } export const preparedStatementManager = new PreparedStatementManager( - envInt('PGBOUNCER_MAX_PREPARED_STATEMENTS', 50), + envInt("PGBOUNCER_MAX_PREPARED_STATEMENTS", 50), ); preparedStatementManager.registerDefaults(); // ── Slow query detection ─────────────────────────────────────────────────────── -export const SLOW_QUERY_THRESHOLD_MS = envInt('SLOW_QUERY_THRESHOLD_MS', 500); -export const VERY_SLOW_QUERY_THRESHOLD_MS = envInt('VERY_SLOW_QUERY_THRESHOLD_MS', 2_000); +export const SLOW_QUERY_THRESHOLD_MS = envInt("SLOW_QUERY_THRESHOLD_MS", 500); +export const VERY_SLOW_QUERY_THRESHOLD_MS = envInt( + "VERY_SLOW_QUERY_THRESHOLD_MS", + 2_000, +); -export type SlowQuerySeverity = 'warn' | 'critical'; +export type SlowQuerySeverity = "warn" | "critical"; export interface SlowQueryEvent { sql: string; @@ -461,7 +473,7 @@ export function onSlowQuery(handler: SlowQueryHandler): void { export async function withQueryTimer( sql: string, params: unknown[], - execute: () => Promise + execute: () => Promise, ): Promise { const start = Date.now(); try { @@ -470,7 +482,7 @@ export async function withQueryTimer( const durationMs = Date.now() - start; if (durationMs >= SLOW_QUERY_THRESHOLD_MS) { const severity: SlowQuerySeverity = - durationMs >= VERY_SLOW_QUERY_THRESHOLD_MS ? 'critical' : 'warn'; + durationMs >= VERY_SLOW_QUERY_THRESHOLD_MS ? "critical" : "warn"; const event: SlowQueryEvent = { sql: sql.slice(0, 500), durationMs, @@ -479,15 +491,19 @@ export async function withQueryTimer( timestamp: new Date(), }; for (const handler of slowQueryHandlers) { - try { handler(event); } catch { } + try { + handler(event); + } catch {} } } } } onSlowQuery((event) => { - const label = event.severity === 'critical' ? 'CRITICAL' : 'SLOW'; - console.warn(`[db] ${label} query ${event.durationMs}ms: ${event.sql.slice(0, 120)}`); + const label = event.severity === "critical" ? "CRITICAL" : "SLOW"; + console.warn( + `[db] ${label} query ${event.durationMs}ms: ${event.sql.slice(0, 120)}`, + ); }); // ── Composite index definitions ──────────────────────────────────────────────── @@ -504,88 +520,96 @@ export interface CompositeIndex { export const RECOMMENDED_INDEXES: CompositeIndex[] = [ { - name: 'idx_invoices_project_created', - table: 'invoices', - columns: ['project_id', 'created_at'], - description: 'Optimizes listing invoices by project ordered by date', - targetQuery: 'SELECT * FROM invoices WHERE project_id = ? ORDER BY created_at DESC', + name: "idx_invoices_project_created", + table: "invoices", + columns: ["project_id", "created_at"], + description: "Optimizes listing invoices by project ordered by date", + targetQuery: + "SELECT * FROM invoices WHERE project_id = ? ORDER BY created_at DESC", }, { - name: 'idx_verifications_status_type', - table: 'verifications', - columns: ['status', 'verification_type'], - description: 'Filters verifications by status and type', - targetQuery: 'SELECT * FROM verifications WHERE status = ? AND verification_type = ?', + name: "idx_verifications_status_type", + table: "verifications", + columns: ["status", "verification_type"], + description: "Filters verifications by status and type", + targetQuery: + "SELECT * FROM verifications WHERE status = ? AND verification_type = ?", }, { - name: 'idx_transactions_account_ledger', - table: 'transactions', - columns: ['account_id', 'ledger_seq'], - description: 'Looks up transactions for an account sorted by ledger sequence', - targetQuery: 'SELECT * FROM transactions WHERE account_id = ? ORDER BY ledger_seq DESC', + name: "idx_transactions_account_ledger", + table: "transactions", + columns: ["account_id", "ledger_seq"], + description: + "Looks up transactions for an account sorted by ledger sequence", + targetQuery: + "SELECT * FROM transactions WHERE account_id = ? ORDER BY ledger_seq DESC", }, { - name: 'idx_payments_recipient_status', - table: 'payments', - columns: ['recipient', 'status'], - description: 'Finds pending payments for a recipient', - targetQuery: 'SELECT * FROM payments WHERE recipient = ? AND status = ?', + name: "idx_payments_recipient_status", + table: "payments", + columns: ["recipient", "status"], + description: "Finds pending payments for a recipient", + targetQuery: "SELECT * FROM payments WHERE recipient = ? AND status = ?", }, { - name: 'idx_payments_created_status', - table: 'payments', - columns: ['created_at', 'status'], - description: 'Oldest pending payments for processing', - targetQuery: 'SELECT * FROM payments WHERE status = ? ORDER BY created_at ASC LIMIT ?', + name: "idx_payments_created_status", + table: "payments", + columns: ["created_at", "status"], + description: "Oldest pending payments for processing", + targetQuery: + "SELECT * FROM payments WHERE status = ? ORDER BY created_at ASC LIMIT ?", }, { - name: 'idx_payments_tx_hash', - table: 'payments', - columns: ['tx_hash'], + name: "idx_payments_tx_hash", + table: "payments", + columns: ["tx_hash"], unique: true, - description: 'Idempotency and on-chain lookup by transaction hash', - targetQuery: 'SELECT * FROM payments WHERE tx_hash = ?', + description: "Idempotency and on-chain lookup by transaction hash", + targetQuery: "SELECT * FROM payments WHERE tx_hash = ?", }, { - name: 'idx_sessions_user_expires', - table: 'sessions', - columns: ['user_id', 'expires_at'], - description: 'Finds active sessions for a user', - targetQuery: 'SELECT * FROM sessions WHERE user_id = ? AND expires_at > ?', + name: "idx_sessions_user_expires", + table: "sessions", + columns: ["user_id", "expires_at"], + description: "Finds active sessions for a user", + targetQuery: "SELECT * FROM sessions WHERE user_id = ? AND expires_at > ?", }, { - name: 'idx_refunds_invoice_created', - table: 'refunds', - columns: ['invoice_id', 'created_at'], - description: 'Lists refunds for an invoice ordered by date', - targetQuery: 'SELECT * FROM refunds WHERE invoice_id = ? ORDER BY created_at DESC', + name: "idx_refunds_invoice_created", + table: "refunds", + columns: ["invoice_id", "created_at"], + description: "Lists refunds for an invoice ordered by date", + targetQuery: + "SELECT * FROM refunds WHERE invoice_id = ? ORDER BY created_at DESC", }, { - name: 'idx_users_tenant_email', - table: 'users', - columns: ['tenant_id', 'email'], + name: "idx_users_tenant_email", + table: "users", + columns: ["tenant_id", "email"], unique: true, - description: 'Login and uniqueness constraint per tenant', - targetQuery: 'SELECT * FROM users WHERE tenant_id = ? AND email = ?', + description: "Login and uniqueness constraint per tenant", + targetQuery: "SELECT * FROM users WHERE tenant_id = ? AND email = ?", }, { - name: 'idx_audit_logs_entity_created', - table: 'audit_logs', - columns: ['entity_id', 'created_at'], - description: 'Audit trail queries per resource ordered by time', - targetQuery: 'SELECT * FROM audit_logs WHERE entity_id = ? ORDER BY created_at DESC', + name: "idx_audit_logs_entity_created", + table: "audit_logs", + columns: ["entity_id", "created_at"], + description: "Audit trail queries per resource ordered by time", + targetQuery: + "SELECT * FROM audit_logs WHERE entity_id = ? ORDER BY created_at DESC", }, { - name: 'idx_gas_estimates_network_recorded', - table: 'gas_estimates', - columns: ['network', 'recorded_at'], - description: 'Gas analytics aggregation by network and time window', - targetQuery: 'SELECT * FROM gas_estimates WHERE network = ? ORDER BY recorded_at DESC', + name: "idx_gas_estimates_network_recorded", + table: "gas_estimates", + columns: ["network", "recorded_at"], + description: "Gas analytics aggregation by network and time window", + targetQuery: + "SELECT * FROM gas_estimates WHERE network = ? ORDER BY recorded_at DESC", }, ]; export function getRecommendedIndexes(): CompositeIndex[] { - if (!featureFlags.evaluate('db-composite-indexes')) return []; + if (!featureFlags.evaluate("db-composite-indexes")) return []; return RECOMMENDED_INDEXES; } @@ -597,26 +621,39 @@ export interface ReplicaConfig { database: string; user: string; password: string; + enabled: boolean; + maxLag: number; } export function buildReplicaConfigs(): ReplicaConfig[] { - const replicaUrls = (process.env.DB_READ_REPLICA_URLS ?? '') - .split(',') + const replicaUrls = (process.env.DB_READ_REPLICA_URLS ?? "") + .split(",") .map((s) => s.trim()) .filter(Boolean); + const maxLag = envInt("DB_REPLICA_MAX_LAG_MS", 5000); + return replicaUrls.map((url) => { const parsed = new URL(url); return { host: parsed.hostname, port: Number(parsed.port) || 5432, - database: parsed.pathname.replace(/^\//, ''), + database: parsed.pathname.replace(/^\//, ""), user: parsed.username, password: parsed.password, + enabled: true, + maxLag, }; }); } +export function buildReplicaUrls(): string[] { + return (process.env.DB_READ_REPLICA_URLS ?? "") + .split(",") + .map((s) => s.trim()) + .filter(Boolean); +} + export function isReadQuery(sql: string): boolean { return /^\s*(SELECT|WITH\s)/i.test(sql); } @@ -652,7 +689,7 @@ class QueryProfiler { } isEnabled(): boolean { - return featureFlags.evaluate('db-query-profiling'); + return featureFlags.evaluate("db-query-profiling"); } profile(query: string, source: string, fn: () => Promise): Promise { @@ -661,47 +698,70 @@ class QueryProfiler { const start = Date.now(); return fn().then((result) => { const durationMs = Date.now() - start; - const profile: QueryProfile = { query, durationMs, timestamp: new Date().toISOString(), source }; + const profile: QueryProfile = { + query, + durationMs, + timestamp: new Date().toISOString(), + source, + }; this.allQueries.push(profile); if (this.allQueries.length > this.maxAllQueries) this.allQueries.shift(); if (durationMs > this.slowThresholdMs) { - console.warn(`[QueryProfiler] SLOW QUERY (${durationMs.toFixed(0)}ms) [${source}]: ${query.substring(0, 200)}`); + console.warn( + `[QueryProfiler] SLOW QUERY (${durationMs.toFixed(0)}ms) [${source}]: ${query.substring(0, 200)}`, + ); this.slowQueries.push(profile); - if (this.slowQueries.length > this.maxSlowQueries) this.slowQueries.shift(); + if (this.slowQueries.length > this.maxSlowQueries) + this.slowQueries.shift(); } return result; }); } - detectNPlusOne(source: string, parentFn: () => Promise): Promise { + detectNPlusOne( + source: string, + parentFn: () => Promise, + ): Promise { if (!this.isEnabled()) return parentFn(); - const originalQuery = this.allQueries[this.allQueries.length - 1]?.query || 'unknown'; + const originalQuery = + this.allQueries[this.allQueries.length - 1]?.query || "unknown"; return parentFn().then((results) => { const total = this.allQueries.length; if (total > 10 && results.length > 1) { - console.warn(`[QueryProfiler] N+1 DETECTED [${source}]: ${total} queries for ${results.length} results`); + console.warn( + `[QueryProfiler] N+1 DETECTED [${source}]: ${total} queries for ${results.length} results`, + ); console.warn(` Parent: ${originalQuery.substring(0, 150)}`); } return results; }); } - getSlowQueries(): QueryProfile[] { return [...this.slowQueries]; } + getSlowQueries(): QueryProfile[] { + return [...this.slowQueries]; + } getTopSlowQueries(n = 10): QueryProfile[] { - return [...this.slowQueries].sort((a, b) => b.durationMs - a.durationMs).slice(0, n); + return [...this.slowQueries] + .sort((a, b) => b.durationMs - a.durationMs) + .slice(0, n); } - getAllQueries(): QueryProfile[] { return [...this.allQueries]; } + getAllQueries(): QueryProfile[] { + return [...this.allQueries]; + } getStats() { const total = this.allQueries.length; const slow = this.slowQueries.length; - const avgDuration = total > 0 ? this.allQueries.reduce((sum, q) => sum + q.durationMs, 0) / total : 0; + const avgDuration = + total > 0 + ? this.allQueries.reduce((sum, q) => sum + q.durationMs, 0) / total + : 0; return { totalQueries: total, slowQueries: slow, @@ -714,7 +774,9 @@ class QueryProfiler { private calculatePercentile(pct: number): number { if (this.allQueries.length === 0) return 0; - const sorted = [...this.allQueries].sort((a, b) => a.durationMs - b.durationMs); + const sorted = [...this.allQueries].sort( + (a, b) => a.durationMs - b.durationMs, + ); const idx = Math.ceil((pct / 100) * sorted.length) - 1; return sorted[Math.max(0, idx)].durationMs; } diff --git a/backend/src/db/replica-router.ts b/backend/src/db/replica-router.ts new file mode 100644 index 00000000..d522ca20 --- /dev/null +++ b/backend/src/db/replica-router.ts @@ -0,0 +1,264 @@ +/** + * Database read replica router with automatic query classification + * Routes read queries to replicas and writes to primary + */ + +import { PrismaClient } from '@prisma/client'; + +export interface ReplicaConfig { + host: string; + port: number; + enabled: boolean; + maxLag: number; // milliseconds +} + +export interface ReplicationStatus { + replica: string; + healthy: boolean; + lag: number; + lastCheck: Date; +} + +export class ReplicaRouter { + private primary: PrismaClient; + private replicas: Map = new Map(); + private replicaHealth: Map = new Map(); + private currentReplicaIndex = 0; + private sessionStickiness: Map = new Map(); // sessionId -> replicaId + private config: { + maxReplicationLag: number; + healthCheckInterval: number; + enableStickiness: boolean; + }; + + constructor( + primaryUrl: string, + replicaUrls: string[], + config?: Partial + ) { + this.primary = new PrismaClient({ + datasources: { db: { url: primaryUrl } }, + }); + + this.config = { + maxReplicationLag: 5000, // 5 seconds + healthCheckInterval: 30000, // 30 seconds + enableStickiness: true, + ...config, + }; + + // Initialize replicas + replicaUrls.forEach((url, index) => { + const replicaId = `replica-${index}`; + const replica = new PrismaClient({ + datasources: { db: { url } }, + }); + this.replicas.set(replicaId, replica); + this.replicaHealth.set(replicaId, { + replica: replicaId, + healthy: true, + lag: 0, + lastCheck: new Date(), + }); + }); + + this.startHealthChecks(); + } + + /** + * Classify query type based on operation + */ + private isReadQuery(operation: string): boolean { + const readOperations = [ + 'findUnique', + 'findFirst', + 'findMany', + 'count', + 'aggregate', + 'groupBy', + ]; + return readOperations.includes(operation); + } + + /** + * Get healthy replica using round-robin + */ + private getHealthyReplica(sessionId?: string): PrismaClient | null { + // Check session stickiness + if (sessionId && this.config.enableStickiness) { + const stickyReplica = this.sessionStickiness.get(sessionId); + if (stickyReplica) { + const replica = this.replicas.get(stickyReplica); + const health = this.replicaHealth.get(stickyReplica); + if (replica && health?.healthy) { + return replica; + } + } + } + + const healthyReplicas = Array.from(this.replicas.entries()).filter( + ([id, _]) => { + const health = this.replicaHealth.get(id); + return health?.healthy && health.lag < this.config.maxReplicationLag; + } + ); + + if (healthyReplicas.length === 0) { + return null; + } + + // Round-robin selection + const [replicaId, replica] = + healthyReplicas[this.currentReplicaIndex % healthyReplicas.length]; + this.currentReplicaIndex++; + + // Store session stickiness + if (sessionId && this.config.enableStickiness) { + this.sessionStickiness.set(sessionId, replicaId); + } + + return replica; + } + + /** + * Route query to appropriate database + */ + async route( + operation: string, + model: string, + args: any, + sessionId?: string, + forceP Primary = false + ): Promise { + const isRead = this.isReadQuery(operation); + + // Always use primary for writes or when forced + if (!isRead || forcePrimary) { + // @ts-ignore + return this.primary[model][operation](args); + } + + // Try replica for reads + const replica = this.getHealthyReplica(sessionId); + if (replica) { + try { + // @ts-ignore + return await replica[model][operation](args); + } catch (error) { + console.warn('Replica query failed, falling back to primary', error); + } + } + + // Fallback to primary + // @ts-ignore + return this.primary[model][operation](args); + } + + /** + * Check replication lag for a replica + */ + private async checkReplicationLag( + replicaId: string, + replica: PrismaClient + ): Promise { + try { + // Query replication status + const result = await replica.$queryRaw< + Array<{ lag_seconds: number }> + >`SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp())) AS lag_seconds`; + + const lagMs = result[0]?.lag_seconds * 1000 || 0; + return lagMs; + } catch (error) { + console.error(`Failed to check replication lag for ${replicaId}`, error); + return Infinity; + } + } + + /** + * Health check for all replicas + */ + private async performHealthCheck() { + for (const [replicaId, replica] of this.replicas.entries()) { + try { + const lag = await this.checkReplicationLag(replicaId, replica); + const healthy = lag < this.config.maxReplicationLag; + + this.replicaHealth.set(replicaId, { + replica: replicaId, + healthy, + lag, + lastCheck: new Date(), + }); + + if (!healthy) { + console.warn( + `Replica ${replicaId} unhealthy: lag=${lag}ms, threshold=${this.config.maxReplicationLag}ms` + ); + } + } catch (error) { + console.error(`Health check failed for ${replicaId}`, error); + this.replicaHealth.set(replicaId, { + replica: replicaId, + healthy: false, + lag: Infinity, + lastCheck: new Date(), + }); + } + } + } + + /** + * Start periodic health checks + */ + private startHealthChecks() { + setInterval(() => { + this.performHealthCheck(); + }, this.config.healthCheckInterval); + + // Initial health check + this.performHealthCheck(); + } + + /** + * Get replication status for monitoring + */ + getReplicationStatus(): ReplicationStatus[] { + return Array.from(this.replicaHealth.values()); + } + + /** + * Get primary client (for explicit primary queries) + */ + getPrimary(): PrismaClient { + return this.primary; + } + + /** + * Disconnect all clients + */ + async disconnect() { + await this.primary.$disconnect(); + for (const replica of this.replicas.values()) { + await replica.$disconnect(); + } + } +} + +/** + * Prisma middleware for automatic query routing + */ +export function createReplicaMiddleware(router: ReplicaRouter) { + return async (params: any, next: any) => { + const { model, action, args } = params; + + // Extract session ID from context if available + const sessionId = args?.sessionId; + + // Check if write occurred in this session (use primary) + const forcePrimary = args?.forcePrimary || false; + + // Route the query + return router.route(action, model, args, sessionId, forcePrimary); + }; +} diff --git a/backend/src/notifications/channel-interface.ts b/backend/src/notifications/channel-interface.ts new file mode 100644 index 00000000..01ba58dc --- /dev/null +++ b/backend/src/notifications/channel-interface.ts @@ -0,0 +1,75 @@ +/** + * Notification channel interface for plugin-based architecture + */ + +export interface Notification { + id: string; + userId: string; + eventType: string; + title: string; + body: string; + data?: Record; + priority: "low" | "normal" | "high" | "urgent"; + createdAt: Date; +} + +export interface NotificationDeliveryResult { + success: boolean; + channelId: string; + messageId?: string; + error?: string; + timestamp: Date; + deliveryTimeMs: number; +} + +export interface NotificationChannel { + readonly id: string; + readonly name: string; + readonly enabled: boolean; + readonly priority: number; // Lower number = higher priority in fallback chain + + /** + * Send notification through this channel + */ + send(notification: Notification): Promise; + + /** + * Format notification for this channel's requirements + */ + format(notification: Notification): Promise; + + /** + * Validate channel configuration + */ + validate(): Promise; + + /** + * Check if channel is healthy and can deliver messages + */ + healthCheck(): Promise; + + /** + * Get rate limit for this channel per user + */ + getRateLimit(): { maxPerHour: number; maxPerDay: number }; +} + +export interface ChannelPlugin { + create(config: Record): NotificationChannel; + metadata: { + id: string; + name: string; + description: string; + version: string; + author: string; + }; +} + +export interface UserNotificationPreference { + userId: string; + eventType: string; + channels: string[]; // Ordered by preference + enabled: boolean; + quietHoursStart?: number; // Hour 0-23 + quietHoursEnd?: number; // Hour 0-23 +} diff --git a/backend/src/notifications/channel-registry.ts b/backend/src/notifications/channel-registry.ts new file mode 100644 index 00000000..cbfaf0eb --- /dev/null +++ b/backend/src/notifications/channel-registry.ts @@ -0,0 +1,188 @@ +/** + * Notification channel registry for dynamic plugin discovery + */ + +import { NotificationChannel, ChannelPlugin } from "./channel-interface"; +import { EmailChannel } from "./channels/email-channel"; +import { SlackChannel } from "./channels/slack-channel"; +import { InAppChannel } from "./channels/in-app-channel"; +import { WebhookChannel } from "./channels/webhook-channel"; + +export class ChannelRegistry { + private channels = new Map(); + private plugins = new Map(); + + constructor() { + this.loadBuiltInChannels(); + } + + /** + * Load built-in notification channels + */ + private loadBuiltInChannels() { + // Email channel + if (process.env.SMTP_HOST) { + const emailChannel = new EmailChannel({ + smtpHost: process.env.SMTP_HOST, + smtpPort: parseInt(process.env.SMTP_PORT || "587"), + smtpUser: process.env.SMTP_USER || "", + smtpPassword: process.env.SMTP_PASSWORD || "", + fromAddress: process.env.SMTP_FROM || "noreply@agenticpay.com", + maxPerHour: 10, + maxPerDay: 50, + }); + this.register(emailChannel); + } + + // Slack channel + if (process.env.SLACK_WEBHOOK_URL) { + const slackChannel = new SlackChannel({ + webhookUrl: process.env.SLACK_WEBHOOK_URL, + botToken: process.env.SLACK_BOT_TOKEN, + maxPerHour: 20, + maxPerDay: 100, + }); + this.register(slackChannel); + } + + // In-app channel (always enabled) + const inAppChannel = new InAppChannel(); + this.register(inAppChannel); + + // Webhook channel + if (process.env.NOTIFICATION_WEBHOOK_URL) { + const webhookChannel = new WebhookChannel({ + url: process.env.NOTIFICATION_WEBHOOK_URL, + secret: process.env.NOTIFICATION_WEBHOOK_SECRET, + maxPerHour: 50, + maxPerDay: 500, + timeout: 10000, + }); + this.register(webhookChannel); + } + } + + /** + * Register a notification channel + */ + register(channel: NotificationChannel): void { + if (this.channels.has(channel.id)) { + console.warn(`Channel ${channel.id} already registered, replacing`); + } + this.channels.set(channel.id, channel); + console.log( + `Registered notification channel: ${channel.name} (${channel.id})`, + ); + } + + /** + * Register a channel plugin + */ + registerPlugin(plugin: ChannelPlugin, config: Record): void { + this.plugins.set(plugin.metadata.id, plugin); + const channel = plugin.create(config); + this.register(channel); + } + + /** + * Get a channel by ID + */ + get(channelId: string): NotificationChannel | undefined { + return this.channels.get(channelId); + } + + /** + * Get all registered channels + */ + getAll(): NotificationChannel[] { + return Array.from(this.channels.values()); + } + + /** + * Get enabled channels sorted by priority + */ + getEnabled(): NotificationChannel[] { + return Array.from(this.channels.values()) + .filter((channel) => channel.enabled) + .sort((a, b) => a.priority - b.priority); + } + + /** + * Check if a channel exists and is enabled + */ + isEnabled(channelId: string): boolean { + const channel = this.channels.get(channelId); + return channel?.enabled ?? false; + } + + /** + * Remove a channel from the registry + */ + unregister(channelId: string): boolean { + return this.channels.delete(channelId); + } + + /** + * Validate all registered channels + */ + async validateAll(): Promise> { + const results = new Map(); + + for (const [id, channel] of this.channels.entries()) { + try { + const isValid = await channel.validate(); + results.set(id, isValid); + } catch (error) { + console.error(`Validation failed for channel ${id}:`, error); + results.set(id, false); + } + } + + return results; + } + + /** + * Health check all enabled channels + */ + async healthCheckAll(): Promise> { + const results = new Map(); + + for (const channel of this.getEnabled()) { + try { + const isHealthy = await channel.healthCheck(); + results.set(channel.id, isHealthy); + if (!isHealthy) { + console.warn(`Channel ${channel.id} failed health check`); + } + } catch (error) { + console.error(`Health check failed for channel ${channel.id}:`, error); + results.set(channel.id, false); + } + } + + return results; + } + + /** + * Get channels for a specific event type based on user preferences + */ + getChannelsForEvent( + eventType: string, + userChannels?: string[], + ): NotificationChannel[] { + if (userChannels && userChannels.length > 0) { + // Use user-specified channels in order + return userChannels + .map((id) => this.channels.get(id)) + .filter( + (channel): channel is NotificationChannel => + channel !== undefined && channel.enabled, + ); + } + + // Default: all enabled channels sorted by priority + return this.getEnabled(); + } +} + +export const channelRegistry = new ChannelRegistry(); diff --git a/backend/src/notifications/channels/email-channel.ts b/backend/src/notifications/channels/email-channel.ts new file mode 100644 index 00000000..66914f02 --- /dev/null +++ b/backend/src/notifications/channels/email-channel.ts @@ -0,0 +1,168 @@ +import { + NotificationChannel, + Notification, + NotificationDeliveryResult, +} from "../channel-interface"; + +export interface EmailChannelConfig { + smtpHost: string; + smtpPort: number; + smtpUser: string; + smtpPassword: string; + fromAddress: string; + maxPerHour: number; + maxPerDay: number; +} + +export class EmailChannel implements NotificationChannel { + readonly id = "email"; + readonly name = "Email"; + readonly enabled: boolean; + readonly priority = 20; + + private config: EmailChannelConfig; + private rateLimits = new Map(); + + constructor(config: EmailChannelConfig) { + this.config = config; + this.enabled = !!config.smtpHost; + } + + async send(notification: Notification): Promise { + const start = Date.now(); + + try { + // Check rate limit + if (!this.checkRateLimit(notification.userId)) { + return { + success: false, + channelId: this.id, + error: "Rate limit exceeded", + timestamp: new Date(), + deliveryTimeMs: Date.now() - start, + }; + } + + const formatted = await this.format(notification); + + // Send email via SMTP (implementation omitted for brevity) + // await sendEmail(formatted); + + this.recordSent(notification.userId); + + return { + success: true, + channelId: this.id, + messageId: `email-${notification.id}`, + timestamp: new Date(), + deliveryTimeMs: Date.now() - start, + }; + } catch (error) { + return { + success: false, + channelId: this.id, + error: error instanceof Error ? error.message : "Unknown error", + timestamp: new Date(), + deliveryTimeMs: Date.now() - start, + }; + } + } + + async format(notification: Notification): Promise { + return { + to: notification.data?.email, + from: this.config.fromAddress, + subject: notification.title, + html: this.generateHtml(notification), + text: notification.body, + }; + } + + private generateHtml(notification: Notification): string { + return ` + + + + + + +
+

${this.escapeHtml(notification.title)}

+
+
+

${this.escapeHtml(notification.body)}

+
+ + + + `; + } + + private escapeHtml(text: string): string { + const map: Record = { + "&": "&", + "<": "<", + ">": ">", + '"': """, + "'": "'", + }; + return text.replace(/[&<>"']/g, (m) => map[m]); + } + + async validate(): Promise { + return !!( + this.config.smtpHost && + this.config.smtpPort && + this.config.fromAddress + ); + } + + async healthCheck(): Promise { + try { + // Ping SMTP server + return true; + } catch { + return false; + } + } + + getRateLimit() { + return { + maxPerHour: this.config.maxPerHour, + maxPerDay: this.config.maxPerDay, + }; + } + + private checkRateLimit(userId: string): boolean { + const now = Date.now(); + const userLimits = this.rateLimits.get(userId) || []; + + // Filter out timestamps older than 24 hours + const recent = userLimits.filter((t) => now - t < 24 * 60 * 60 * 1000); + + if (recent.length >= this.config.maxPerDay) { + return false; + } + + // Check hourly limit + const lastHour = recent.filter((t) => now - t < 60 * 60 * 1000); + if (lastHour.length >= this.config.maxPerHour) { + return false; + } + + return true; + } + + private recordSent(userId: string): void { + const userLimits = this.rateLimits.get(userId) || []; + userLimits.push(Date.now()); + this.rateLimits.set(userId, userLimits); + } +} diff --git a/backend/src/notifications/channels/in-app-channel.ts b/backend/src/notifications/channels/in-app-channel.ts new file mode 100644 index 00000000..c4a0cd75 --- /dev/null +++ b/backend/src/notifications/channels/in-app-channel.ts @@ -0,0 +1,68 @@ +import { + NotificationChannel, + Notification, + NotificationDeliveryResult, +} from "../channel-interface"; + +export class InAppChannel implements NotificationChannel { + readonly id = "in-app"; + readonly name = "In-App"; + readonly enabled = true; + readonly priority = 10; // Highest priority + + async send(notification: Notification): Promise { + const start = Date.now(); + + try { + // Store in database for in-app display + // await db.notification.create({ data: notification }); + + // Emit WebSocket event for real-time delivery + // await websocket.emit(`user:${notification.userId}:notification`, notification); + + return { + success: true, + channelId: this.id, + messageId: `in-app-${notification.id}`, + timestamp: new Date(), + deliveryTimeMs: Date.now() - start, + }; + } catch (error) { + return { + success: false, + channelId: this.id, + error: error instanceof Error ? error.message : "Unknown error", + timestamp: new Date(), + deliveryTimeMs: Date.now() - start, + }; + } + } + + async format(notification: Notification): Promise { + return { + id: notification.id, + title: notification.title, + body: notification.body, + type: notification.eventType, + priority: notification.priority, + data: notification.data, + createdAt: notification.createdAt, + read: false, + }; + } + + async validate(): Promise { + return true; + } + + async healthCheck(): Promise { + return true; + } + + getRateLimit() { + return { + maxPerHour: 100, + maxPerDay: 500, + }; + } +} diff --git a/backend/src/notifications/channels/slack-channel.ts b/backend/src/notifications/channels/slack-channel.ts new file mode 100644 index 00000000..265ea77e --- /dev/null +++ b/backend/src/notifications/channels/slack-channel.ts @@ -0,0 +1,123 @@ +import { + NotificationChannel, + Notification, + NotificationDeliveryResult, +} from "../channel-interface"; + +export interface SlackChannelConfig { + webhookUrl: string; + botToken?: string; + maxPerHour: number; + maxPerDay: number; +} + +export class SlackChannel implements NotificationChannel { + readonly id = "slack"; + readonly name = "Slack"; + readonly enabled: boolean; + readonly priority = 30; + + private config: SlackChannelConfig; + + constructor(config: SlackChannelConfig) { + this.config = config; + this.enabled = !!config.webhookUrl; + } + + async send(notification: Notification): Promise { + const start = Date.now(); + + try { + const formatted = await this.format(notification); + + // Send to Slack webhook + const response = await fetch(this.config.webhookUrl, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(formatted), + }); + + if (!response.ok) { + throw new Error(`Slack API error: ${response.statusText}`); + } + + return { + success: true, + channelId: this.id, + messageId: `slack-${notification.id}`, + timestamp: new Date(), + deliveryTimeMs: Date.now() - start, + }; + } catch (error) { + return { + success: false, + channelId: this.id, + error: error instanceof Error ? error.message : "Unknown error", + timestamp: new Date(), + deliveryTimeMs: Date.now() - start, + }; + } + } + + async format(notification: Notification): Promise { + const priorityEmoji = { + low: ":white_circle:", + normal: ":large_blue_circle:", + high: ":large_orange_circle:", + urgent: ":red_circle:", + }; + + return { + blocks: [ + { + type: "header", + text: { + type: "plain_text", + text: notification.title, + emoji: true, + }, + }, + { + type: "section", + text: { + type: "mrkdwn", + text: `${priorityEmoji[notification.priority]} *${notification.eventType}*\n\n${notification.body}`, + }, + }, + { + type: "context", + elements: [ + { + type: "mrkdwn", + text: ``, + }, + ], + }, + ], + }; + } + + async validate(): Promise { + return !!this.config.webhookUrl; + } + + async healthCheck(): Promise { + try { + const response = await fetch(this.config.webhookUrl, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ text: "Health check" }), + }); + return response.ok; + } catch { + return false; + } + } + + getRateLimit() { + return { + maxPerHour: this.config.maxPerHour, + maxPerDay: this.config.maxPerDay, + }; + } +} diff --git a/backend/src/notifications/channels/webhook-channel.ts b/backend/src/notifications/channels/webhook-channel.ts new file mode 100644 index 00000000..85b8be14 --- /dev/null +++ b/backend/src/notifications/channels/webhook-channel.ts @@ -0,0 +1,122 @@ +import { + NotificationChannel, + Notification, + NotificationDeliveryResult, +} from "../channel-interface"; + +export interface WebhookChannelConfig { + url: string; + secret?: string; + headers?: Record; + maxPerHour: number; + maxPerDay: number; + timeout: number; +} + +export class WebhookChannel implements NotificationChannel { + readonly id = "webhook"; + readonly name = "Webhook"; + readonly enabled: boolean; + readonly priority = 40; + + private config: WebhookChannelConfig; + + constructor(config: WebhookChannelConfig) { + this.config = config; + this.enabled = !!config.url; + } + + async send(notification: Notification): Promise { + const start = Date.now(); + + try { + const formatted = await this.format(notification); + const headers: Record = { + "Content-Type": "application/json", + ...this.config.headers, + }; + + if (this.config.secret) { + headers["X-Webhook-Signature"] = this.generateSignature( + JSON.stringify(formatted), + ); + } + + const response = await fetch(this.config.url, { + method: "POST", + headers, + body: JSON.stringify(formatted), + signal: AbortSignal.timeout(this.config.timeout), + }); + + if (!response.ok) { + throw new Error(`Webhook failed: ${response.statusText}`); + } + + return { + success: true, + channelId: this.id, + messageId: `webhook-${notification.id}`, + timestamp: new Date(), + deliveryTimeMs: Date.now() - start, + }; + } catch (error) { + return { + success: false, + channelId: this.id, + error: error instanceof Error ? error.message : "Unknown error", + timestamp: new Date(), + deliveryTimeMs: Date.now() - start, + }; + } + } + + async format(notification: Notification): Promise { + return { + event: "notification", + notification: { + id: notification.id, + userId: notification.userId, + eventType: notification.eventType, + title: notification.title, + body: notification.body, + priority: notification.priority, + data: notification.data, + createdAt: notification.createdAt.toISOString(), + }, + }; + } + + private generateSignature(payload: string): string { + if (!this.config.secret) return ""; + + // HMAC SHA256 signature + const crypto = require("crypto"); + const hmac = crypto.createHmac("sha256", this.config.secret); + hmac.update(payload); + return `sha256=${hmac.digest("hex")}`; + } + + async validate(): Promise { + return !!this.config.url; + } + + async healthCheck(): Promise { + try { + const response = await fetch(this.config.url, { + method: "HEAD", + signal: AbortSignal.timeout(5000), + }); + return response.ok; + } catch { + return false; + } + } + + getRateLimit() { + return { + maxPerHour: this.config.maxPerHour, + maxPerDay: this.config.maxPerDay, + }; + } +} diff --git a/backend/src/notifications/dispatcher.ts b/backend/src/notifications/dispatcher.ts new file mode 100644 index 00000000..64b2718e --- /dev/null +++ b/backend/src/notifications/dispatcher.ts @@ -0,0 +1,211 @@ +/** + * Notification dispatcher with channel routing and fallback logic + */ + +import { + Notification, + NotificationDeliveryResult, + UserNotificationPreference, +} from "./channel-interface"; +import { channelRegistry } from "./channel-registry"; + +export interface DispatchResult { + notificationId: string; + deliveries: NotificationDeliveryResult[]; + success: boolean; + fallbackUsed: boolean; +} + +export class NotificationDispatcher { + /** + * Dispatch notification to user's preferred channels with fallback + */ + async dispatch( + notification: Notification, + preferences?: UserNotificationPreference, + ): Promise { + const deliveries: NotificationDeliveryResult[] = []; + let success = false; + let fallbackUsed = false; + + // Check quiet hours + if (preferences && this.isQuietHours(preferences)) { + console.log( + `Skipping notification during quiet hours for user ${notification.userId}`, + ); + return { + notificationId: notification.id, + deliveries: [], + success: false, + fallbackUsed: false, + }; + } + + // Get user's preferred channels or default + const preferredChannels = preferences?.channels || []; + const channels = channelRegistry.getChannelsForEvent( + notification.eventType, + preferredChannels, + ); + + if (channels.length === 0) { + console.warn("No channels available for notification"); + return { + notificationId: notification.id, + deliveries: [], + success: false, + fallbackUsed: false, + }; + } + + // Try primary channel + const primaryChannel = channels[0]; + console.log(`Dispatching to primary channel: ${primaryChannel.id}`); + + const primaryResult = await primaryChannel.send(notification); + deliveries.push(primaryResult); + + if (primaryResult.success) { + success = true; + } else { + // Fallback chain + console.warn( + `Primary channel ${primaryChannel.id} failed: ${primaryResult.error}`, + ); + + for (let i = 1; i < channels.length; i++) { + const fallbackChannel = channels[i]; + console.log(`Trying fallback channel: ${fallbackChannel.id}`); + + fallbackUsed = true; + const fallbackResult = await fallbackChannel.send(notification); + deliveries.push(fallbackResult); + + if (fallbackResult.success) { + success = true; + console.log(`Fallback channel ${fallbackChannel.id} succeeded`); + break; + } + } + } + + // Store delivery results + await this.storeDeliveryResults(notification.id, deliveries); + + return { + notificationId: notification.id, + deliveries, + success, + fallbackUsed, + }; + } + + /** + * Batch dispatch to multiple users + */ + async batchDispatch( + notifications: Notification[], + preferencesMap: Map, + ): Promise { + const results = await Promise.allSettled( + notifications.map((notification) => { + const preferences = preferencesMap.get(notification.userId); + return this.dispatch(notification, preferences); + }), + ); + + return results + .filter((result) => result.status === "fulfilled") + .map( + (result) => (result as PromiseFulfilledResult).value, + ); + } + + /** + * Check if current time is within user's quiet hours + */ + private isQuietHours(preferences: UserNotificationPreference): boolean { + if ( + preferences.quietHoursStart === undefined || + preferences.quietHoursEnd === undefined + ) { + return false; + } + + const now = new Date(); + const currentHour = now.getHours(); + + const start = preferences.quietHoursStart; + const end = preferences.quietHoursEnd; + + if (start <= end) { + return currentHour >= start && currentHour < end; + } else { + // Quiet hours cross midnight + return currentHour >= start || currentHour < end; + } + } + + /** + * Store delivery results for analytics + */ + private async storeDeliveryResults( + notificationId: string, + deliveries: NotificationDeliveryResult[], + ): Promise { + try { + // Store in database + // await db.notificationDelivery.createMany({ data: deliveries }); + console.log(`Stored delivery results for notification ${notificationId}`); + } catch (error) { + console.error("Failed to store delivery results:", error); + } + } + + /** + * Get delivery statistics for a notification + */ + async getDeliveryStats(notificationId: string): Promise<{ + totalAttempts: number; + successfulDeliveries: number; + failedDeliveries: number; + averageDeliveryTimeMs: number; + }> { + // Fetch from database + // const deliveries = await db.notificationDelivery.findMany({ where: { notificationId } }); + + return { + totalAttempts: 0, + successfulDeliveries: 0, + failedDeliveries: 0, + averageDeliveryTimeMs: 0, + }; + } + + /** + * Test all channels for a user + */ + async testChannels(userId: string): Promise> { + const results = new Map(); + const testNotification: Notification = { + id: `test-${Date.now()}`, + userId, + eventType: "test", + title: "Test Notification", + body: "This is a test notification", + priority: "normal", + createdAt: new Date(), + }; + + const channels = channelRegistry.getEnabled(); + + for (const channel of channels) { + const result = await channel.send(testNotification); + results.set(channel.id, result.success); + } + + return results; + } +} + +export const notificationDispatcher = new NotificationDispatcher(); diff --git a/frontend/components/common/link-prefetcher.tsx b/frontend/components/common/link-prefetcher.tsx new file mode 100644 index 00000000..94f8fdd9 --- /dev/null +++ b/frontend/components/common/link-prefetcher.tsx @@ -0,0 +1,57 @@ +"use client"; + +import { useEffect, useRef } from "react"; +import Link, { LinkProps } from "next/link"; +import { routePreloader } from "@/lib/preload"; + +interface PrefetchLinkProps extends LinkProps { + children: React.ReactNode; + className?: string; + prefetchPriority?: "high" | "low"; +} + +export function PrefetchLink({ + children, + href, + prefetchPriority = "low", + ...props +}: PrefetchLinkProps) { + const linkRef = useRef(null); + + useEffect(() => { + if (!linkRef.current) return; + + const observer = new IntersectionObserver( + ([entry]) => { + if (entry.isIntersecting) { + routePreloader.preloadRoute(href.toString(), prefetchPriority); + } + }, + { rootMargin: "50px" }, + ); + + observer.observe(linkRef.current); + + return () => observer.disconnect(); + }, [href, prefetchPriority]); + + const handleMouseEnter = () => { + routePreloader.preloadRoute(href.toString(), "high"); + }; + + const handleTouchStart = () => { + routePreloader.preloadRoute(href.toString(), "high"); + }; + + return ( + + {children} + + ); +} diff --git a/frontend/lib/preload.ts b/frontend/lib/preload.ts new file mode 100644 index 00000000..7814dbad --- /dev/null +++ b/frontend/lib/preload.ts @@ -0,0 +1,194 @@ +/** + * Predictive preload utility for route-based code splitting + * Implements Intersection Observer and heuristic-based preloading + */ + +interface PreloadConfig { + enabled: boolean; + maxConcurrent: number; + dataSaverMode: boolean; + hoverDelay: number; +} + +interface NavigationPattern { + from: string; + to: string; + frequency: number; +} + +class RoutePreloader { + private preloadedRoutes = new Set(); + private activePreloads = new Map(); + private config: PreloadConfig; + private navigationPatterns: Map = new Map(); + + constructor(config?: Partial) { + this.config = { + enabled: true, + maxConcurrent: 3, + dataSaverMode: this.detectDataSaver(), + hoverDelay: 50, + ...config, + }; + + this.loadNavigationPatterns(); + } + + private detectDataSaver(): boolean { + if (typeof navigator !== "undefined") { + return ( + // @ts-ignore + navigator.connection?.saveData || + // @ts-ignore + navigator.connection?.effectiveType === "slow-2g" || + // @ts-ignore + navigator.connection?.effectiveType === "2g" + ); + } + return false; + } + + private loadNavigationPatterns() { + // Common navigation patterns (heatmap-driven) + this.navigationPatterns.set("/", ["/dashboard", "/auth/login"]); + this.navigationPatterns.set("/dashboard", [ + "/dashboard/transactions", + "/dashboard/analytics", + "/dashboard/settings", + ]); + this.navigationPatterns.set("/auth/login", [ + "/dashboard", + "/auth/register", + ]); + } + + async preloadRoute( + route: string, + priority: "high" | "low" = "low", + ): Promise { + if (!this.config.enabled || this.config.dataSaverMode) { + return; + } + + if (this.preloadedRoutes.has(route)) { + return; + } + + if ( + this.activePreloads.size >= this.config.maxConcurrent && + priority === "low" + ) { + return; + } + + const abortController = new AbortController(); + this.activePreloads.set(route, abortController); + + try { + // Preload Next.js route chunk + const href = route; + const link = document.createElement("link"); + link.rel = "prefetch"; + link.href = href; + link.as = "document"; + + document.head.appendChild(link); + + this.preloadedRoutes.add(route); + } catch (error) { + console.warn(`Failed to preload route: ${route}`, error); + } finally { + this.activePreloads.delete(route); + } + } + + preloadPredictive(currentRoute: string) { + const patterns = this.navigationPatterns.get(currentRoute); + if (patterns) { + patterns.forEach((route) => this.preloadRoute(route, "low")); + } + } + + setupIntersectionObserver(linkSelector = 'a[href^="/"]') { + if (typeof IntersectionObserver === "undefined") { + return; + } + + const observer = new IntersectionObserver( + (entries) => { + entries.forEach((entry) => { + if (entry.isIntersecting) { + const link = entry.target as HTMLAnchorElement; + const href = link.getAttribute("href"); + if (href && href.startsWith("/")) { + this.preloadRoute(href, "low"); + } + } + }); + }, + { + rootMargin: "50px", + }, + ); + + // Observe all internal links + document.querySelectorAll(linkSelector).forEach((link) => { + observer.observe(link); + }); + + return observer; + } + + setupHoverPreload() { + let hoverTimer: NodeJS.Timeout; + + document.addEventListener( + "mouseover", + (e) => { + const target = (e.target as HTMLElement).closest('a[href^="/"]'); + if (target) { + const href = (target as HTMLAnchorElement).getAttribute("href"); + if (href) { + hoverTimer = setTimeout(() => { + this.preloadRoute(href, "high"); + }, this.config.hoverDelay); + } + } + }, + { passive: true }, + ); + + document.addEventListener( + "mouseout", + (e) => { + const target = (e.target as HTMLElement).closest('a[href^="/"]'); + if (target && hoverTimer) { + clearTimeout(hoverTimer); + } + }, + { passive: true }, + ); + + // Touch start preloading for mobile + document.addEventListener( + "touchstart", + (e) => { + const target = (e.target as HTMLElement).closest('a[href^="/"]'); + if (target) { + const href = (target as HTMLAnchorElement).getAttribute("href"); + if (href) { + this.preloadRoute(href, "high"); + } + } + }, + { passive: true }, + ); + } + + cancelAll() { + this.activePreloads.forEach((controller) => controller.abort()); + this.activePreloads.clear(); + } +} + +export const routePreloader = new RoutePreloader(); diff --git a/frontend/next.config.ts b/frontend/next.config.ts index f3f0895e..bff84a93 100644 --- a/frontend/next.config.ts +++ b/frontend/next.config.ts @@ -32,15 +32,38 @@ const nextConfig: NextConfig = { }; } + // Enhanced route-based code splitting with named chunks config.optimization = { ...config.optimization, splitChunks: { chunks: "all", minSize: 20000, - maxSize: 244000, + maxSize: 50000, // Target <50KB per chunk cacheGroups: { default: false, vendors: false, + // Route-specific chunks + dashboard: { + name: "route-dashboard", + chunks: "async", + test: /[\\/]app[\\/]dashboard[\\/]/, + priority: 60, + enforce: true, + }, + auth: { + name: "route-auth", + chunks: "async", + test: /[\\/]app[\\/]auth[\\/]/, + priority: 60, + enforce: true, + }, + forms: { + name: "route-forms", + chunks: "async", + test: /[\\/]app[\\/]forms[\\/]/, + priority: 60, + enforce: true, + }, abi: { name: "abi", chunks: "async", @@ -126,7 +149,7 @@ const nextConfig: NextConfig = { { key: "Link", value: [ - "; rel=preload; as=font; type=\"font/woff2\"; crossorigin=anonymous; fetchpriority=high", + '; rel=preload; as=font; type="font/woff2"; crossorigin=anonymous; fetchpriority=high', "; rel=preload; as=style", ].join(", "), },