diff --git a/dashboard/src/App.tsx b/dashboard/src/App.tsx index 2443663..3232e59 100644 --- a/dashboard/src/App.tsx +++ b/dashboard/src/App.tsx @@ -1,9 +1,35 @@ +import { useState } from 'react'; import { EventExplorerPage } from './pages/EventExplorerPage'; +import { NotificationTimelineView } from './components/NotificationTimelineView'; + +type Tab = 'explorer' | 'timeline'; export function App() { + const [tab, setTab] = useState('explorer'); + return (
- + + + {tab === 'explorer' && } + {tab === 'timeline' && }
); } diff --git a/dashboard/src/components/NotificationSearchBar.test.tsx b/dashboard/src/components/NotificationSearchBar.test.tsx new file mode 100644 index 0000000..ee318c7 --- /dev/null +++ b/dashboard/src/components/NotificationSearchBar.test.tsx @@ -0,0 +1,118 @@ +import { render, screen, fireEvent, act } from '@testing-library/react'; +import { NotificationSearchBar } from './NotificationSearchBar'; +import { useEventStore } from '../store/eventStore'; + +// Reset store between tests +beforeEach(() => { + useEventStore.setState({ + filters: { search: '', contractAddress: 'all', eventType: 'all', status: 'all', dateFrom: '', dateTo: '' }, + }); + jest.useFakeTimers(); +}); + +afterEach(() => { + jest.useRealTimers(); +}); + +function getStore() { + return useEventStore.getState(); +} + +describe('NotificationSearchBar', () => { + it('renders search input and all status buttons', () => { + render(); + expect(screen.getByLabelText(/search notifications/i)).toBeInTheDocument(); + expect(screen.getByRole('button', { name: /^all$/i })).toBeInTheDocument(); + expect(screen.getByRole('button', { name: /^unread$/i })).toBeInTheDocument(); + expect(screen.getByRole('button', { name: /^read$/i })).toBeInTheDocument(); + }); + + it('debounces search input — store not updated immediately', () => { + render(); + fireEvent.change(screen.getByLabelText(/search notifications/i), { + target: { value: 'TaskCreated' }, + }); + // Before debounce fires, store should still be empty + expect(getStore().filters.search).toBe(''); + }); + + it('updates store after debounce delay', () => { + render(); + fireEvent.change(screen.getByLabelText(/search notifications/i), { + target: { value: 'TaskCreated' }, + }); + act(() => jest.advanceTimersByTime(300)); + expect(getStore().filters.search).toBe('TaskCreated'); + }); + + it('sets status filter when status button clicked', () => { + render(); + fireEvent.click(screen.getByRole('button', { name: /^unread$/i })); + expect(getStore().filters.status).toBe('unread'); + expect(screen.getByRole('button', { name: /^unread$/i })).toHaveAttribute('aria-pressed', 'true'); + }); + + it('sets dateFrom and dateTo', () => { + render(); + fireEvent.change(screen.getByLabelText(/filter from date/i), { + target: { value: '2026-01-01' }, + }); + fireEvent.change(screen.getByLabelText(/filter to date/i), { + target: { value: '2026-01-31' }, + }); + expect(getStore().filters.dateFrom).toBe('2026-01-01'); + expect(getStore().filters.dateTo).toBe('2026-01-31'); + }); + + it('shows clear button only when filters are active', () => { + render(); + expect(screen.queryByRole('button', { name: /clear/i })).not.toBeInTheDocument(); + + fireEvent.click(screen.getByRole('button', { name: /^unread$/i })); + expect(screen.getByRole('button', { name: /clear/i })).toBeInTheDocument(); + }); + + it('clear button resets all filters', () => { + render(); + fireEvent.click(screen.getByRole('button', { name: /^unread$/i })); + fireEvent.change(screen.getByLabelText(/filter from date/i), { + target: { value: '2026-01-01' }, + }); + + fireEvent.click(screen.getByRole('button', { name: /clear/i })); + + act(() => jest.advanceTimersByTime(300)); + + const f = getStore().filters; + expect(f.status).toBe('all'); + expect(f.dateFrom).toBe(''); + expect(f.search).toBe(''); + }); +}); + +describe('filterEvents with new filter fields', () => { + it('filters by status=unread correctly', () => { + const { filterEvents } = require('../utils/eventData'); + const events = [ + { eventId: '1', read: false, contractAddress: 'A', eventName: 'X', receivedAt: Date.now(), ledger: 1, type: 'c', topic: [], value: '' }, + { eventId: '2', read: true, contractAddress: 'A', eventName: 'X', receivedAt: Date.now(), ledger: 2, type: 'c', topic: [], value: '' }, + ]; + const result = filterEvents(events, '', 'all', 'all', 'unread', '', ''); + expect(result).toHaveLength(1); + expect(result[0].eventId).toBe('1'); + }); + + it('filters by date range', () => { + const { filterEvents } = require('../utils/eventData'); + const jan1 = new Date('2026-01-01').getTime() + 1000; + const jan15 = new Date('2026-01-15').getTime() + 1000; + const feb1 = new Date('2026-02-01').getTime() + 1000; + const events = [ + { eventId: '1', contractAddress: 'A', eventName: 'X', receivedAt: jan1, ledger: 1, type: 'c', topic: [], value: '' }, + { eventId: '2', contractAddress: 'A', eventName: 'X', receivedAt: jan15, ledger: 2, type: 'c', topic: [], value: '' }, + { eventId: '3', contractAddress: 'A', eventName: 'X', receivedAt: feb1, ledger: 3, type: 'c', topic: [], value: '' }, + ]; + const result = filterEvents(events, '', 'all', 'all', 'all', '2026-01-01', '2026-01-20'); + expect(result.map((e: { eventId: string }) => e.eventId)).toEqual(['1', '2']); + }); +}); diff --git a/dashboard/src/components/NotificationSearchBar.tsx b/dashboard/src/components/NotificationSearchBar.tsx new file mode 100644 index 0000000..18b3b70 --- /dev/null +++ b/dashboard/src/components/NotificationSearchBar.tsx @@ -0,0 +1,126 @@ +import { useState, useEffect, memo } from 'react'; +import { useEventStore } from '../store/eventStore'; +import { useEventFilters } from '../hooks/useEventSelectors'; +import { useDebounce } from '../hooks/useDebounce'; +import type { NotificationStatus } from '../types/event'; + +const STATUS_OPTIONS: { value: NotificationStatus; label: string }[] = [ + { value: 'all', label: 'All' }, + { value: 'unread', label: 'Unread' }, + { value: 'read', label: 'Read' }, +]; + +export const NotificationSearchBar = memo(function NotificationSearchBar() { + const filters = useEventFilters(); + const setSearch = useEventStore((s) => s.setSearch); + const setStatusFilter = useEventStore((s) => s.setStatusFilter); + const setDateFrom = useEventStore((s) => s.setDateFrom); + const setDateTo = useEventStore((s) => s.setDateTo); + + // Local state for the text input so debounce doesn't block typing + const [inputValue, setInputValue] = useState(filters.search); + const debouncedSearch = useDebounce(inputValue, 250); + + useEffect(() => { + setSearch(debouncedSearch); + }, [debouncedSearch, setSearch]); + + // Keep local value in sync if store is cleared externally + useEffect(() => { + if (filters.search === '' && inputValue !== '') setInputValue(''); + // intentionally only react to store reset, not every keystroke + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [filters.search]); + + return ( +
+ {/* Text search */} +
+ + setInputValue(e.target.value)} + aria-label="Search notifications" + /> +
+ + {/* Status filter */} +
+ + Status + +
+ {STATUS_OPTIONS.map(({ value, label }) => ( + + ))} +
+
+ + {/* Date range */} +
+ + setDateFrom(e.target.value)} + aria-label="Filter from date" + /> +
+ +
+ + setDateTo(e.target.value)} + aria-label="Filter to date" + /> +
+ + {/* Clear all */} + {(inputValue || filters.status !== 'all' || filters.dateFrom || filters.dateTo) && ( + + )} +
+ ); +}); diff --git a/dashboard/src/components/NotificationTimelineView.test.tsx b/dashboard/src/components/NotificationTimelineView.test.tsx new file mode 100644 index 0000000..e518dbc --- /dev/null +++ b/dashboard/src/components/NotificationTimelineView.test.tsx @@ -0,0 +1,112 @@ +import { render, screen, fireEvent, waitFor } from '@testing-library/react'; +import { NotificationTimelineView } from './NotificationTimelineView'; +import * as timelineApi from '../services/timelineApi'; +import type { NotificationTimeline } from '../types/timeline'; + +jest.mock('../services/timelineApi'); +const mockFetch = timelineApi.fetchTimeline as jest.MockedFunction; + +const MOCK_TIMELINE: NotificationTimeline = { + notificationId: 7, + status: 'COMPLETED', + retryCount: 1, + maxRetries: 3, + createdAt: '2026-01-01T10:00:00.000Z', + nextRetryAt: null, + lastError: null, + entries: [ + { + attempt: 1, + status: 'RETRY', + executionTime: '2026-01-01T10:00:05.000Z', + errorMessage: 'network timeout', + durationMs: 500, + }, + { + attempt: 2, + status: 'COMPLETED', + executionTime: '2026-01-01T10:00:15.000Z', + errorMessage: null, + durationMs: 120, + }, + ], +}; + +function renderView() { + return render(); +} + +describe('NotificationTimelineView', () => { + beforeEach(() => jest.clearAllMocks()); + + it('renders initial empty state with prompt', () => { + renderView(); + expect(screen.getByText(/enter a notification id/i)).toBeInTheDocument(); + }); + + it('shows validation error for invalid id', async () => { + renderView(); + fireEvent.change(screen.getByLabelText(/notification id/i), { target: { value: '-1' } }); + fireEvent.submit(screen.getByRole('search')); + expect(await screen.findByRole('alert')).toHaveTextContent(/valid notification id/i); + expect(mockFetch).not.toHaveBeenCalled(); + }); + + it('fetches and renders timeline entries in chronological order', async () => { + mockFetch.mockResolvedValue(MOCK_TIMELINE); + renderView(); + + fireEvent.change(screen.getByLabelText(/notification id/i), { target: { value: '7' } }); + fireEvent.submit(screen.getByRole('search')); + + await waitFor(() => expect(mockFetch).toHaveBeenCalledWith(7)); + + const items = screen.getAllByRole('listitem'); + // entries should be chronological: RETRY first, COMPLETED second + expect(items[0]).toHaveTextContent(/retrying/i); + expect(items[1]).toHaveTextContent(/delivered/i); + }); + + it('shows error message from failed entries', async () => { + mockFetch.mockResolvedValue(MOCK_TIMELINE); + renderView(); + fireEvent.change(screen.getByLabelText(/notification id/i), { target: { value: '7' } }); + fireEvent.submit(screen.getByRole('search')); + + await waitFor(() => expect(screen.getByText(/network timeout/i)).toBeInTheDocument()); + }); + + it('shows empty state when no entries returned', async () => { + mockFetch.mockResolvedValue({ ...MOCK_TIMELINE, entries: [] }); + renderView(); + fireEvent.change(screen.getByLabelText(/notification id/i), { target: { value: '7' } }); + fireEvent.submit(screen.getByRole('search')); + + await waitFor(() => + expect(screen.getByText(/no history entries found/i)).toBeInTheDocument() + ); + }); + + it('shows API error message on fetch failure', async () => { + mockFetch.mockRejectedValue(new Error('Failed to fetch timeline: 404')); + renderView(); + fireEvent.change(screen.getByLabelText(/notification id/i), { target: { value: '99' } }); + fireEvent.submit(screen.getByRole('search')); + + expect(await screen.findByRole('alert')).toHaveTextContent(/404/); + }); + + it('disables button while loading', async () => { + let resolve!: (v: NotificationTimeline) => void; + mockFetch.mockReturnValue(new Promise((r) => { resolve = r; })); + + renderView(); + fireEvent.change(screen.getByLabelText(/notification id/i), { target: { value: '1' } }); + fireEvent.submit(screen.getByRole('search')); + + expect(screen.getByRole('button', { name: /loading/i })).toBeDisabled(); + + resolve(MOCK_TIMELINE); + await waitFor(() => expect(screen.getByRole('button', { name: /view timeline/i })).not.toBeDisabled()); + }); +}); diff --git a/dashboard/src/components/NotificationTimelineView.tsx b/dashboard/src/components/NotificationTimelineView.tsx new file mode 100644 index 0000000..8062e80 --- /dev/null +++ b/dashboard/src/components/NotificationTimelineView.tsx @@ -0,0 +1,208 @@ +import { useState, useCallback } from 'react'; +import type { NotificationTimeline, TimelineEntry, TimelineStatus } from '../types/timeline'; +import { fetchTimeline } from '../services/timelineApi'; +import { formatTimestamp } from '../utils/formatTime'; + +// ─── status helpers ────────────────────────────────────────────────────────── + +const STATUS_LABEL: Record = { + PENDING: 'Pending', + PROCESSING: 'Processing', + COMPLETED: 'Delivered', + FAILED: 'Failed', + RETRY: 'Retrying', +}; + +const STATUS_CLASS: Record = { + PENDING: 'timeline__dot--pending', + PROCESSING: 'timeline__dot--processing', + COMPLETED: 'timeline__dot--completed', + FAILED: 'timeline__dot--failed', + RETRY: 'timeline__dot--retry', +}; + +// ─── sub-components ────────────────────────────────────────────────────────── + +function TimelineEntryItem({ entry }: { entry: TimelineEntry }) { + const ts = new Date(entry.executionTime).getTime(); + return ( +
  • +
  • + ); +} + +function TimelineSkeleton() { + return ( +
      + {[1, 2, 3].map((i) => ( +
    • +
    • + ))} +
    + ); +} + +// ─── main component ────────────────────────────────────────────────────────── + +export function NotificationTimelineView() { + const [inputValue, setInputValue] = useState(''); + const [timeline, setTimeline] = useState(null); + const [loading, setLoading] = useState(false); + const [error, setError] = useState(null); + + const handleSearch = useCallback( + async (e: React.FormEvent) => { + e.preventDefault(); + const id = parseInt(inputValue.trim(), 10); + if (isNaN(id) || id <= 0) { + setError('Please enter a valid notification ID.'); + return; + } + + setLoading(true); + setError(null); + setTimeline(null); + + try { + const data = await fetchTimeline(id); + // Sort entries chronologically + const sorted = [...data.entries].sort( + (a, b) => new Date(a.executionTime).getTime() - new Date(b.executionTime).getTime() + ); + setTimeline({ ...data, entries: sorted }); + } catch (err) { + setError(err instanceof Error ? err.message : 'Unknown error'); + } finally { + setLoading(false); + } + }, + [inputValue] + ); + + const overallStatus = timeline?.status; + + return ( +
    +

    + Notification Delivery Timeline +

    + + {/* Filter / search */} +
    + +
    + setInputValue(e.target.value)} + aria-describedby={error ? 'timeline-error' : undefined} + /> + +
    + {error && ( + + )} +
    + + {/* Loading skeleton */} + {loading && } + + {/* Empty state — searched but no entries */} + {!loading && timeline && timeline.entries.length === 0 && ( +
    +

    No history entries found for notification #{timeline.notificationId}.

    +

    + Current status: {STATUS_LABEL[overallStatus!] ?? overallStatus} +

    +
    + )} + + {/* Timeline entries */} + {!loading && timeline && timeline.entries.length > 0 && ( +
    +
    + + Notification #{timeline.notificationId} + +
    + +
      + {timeline.entries.map((entry, idx) => ( + + ))} +
    + + {timeline.nextRetryAt && ( +

    + Next retry scheduled:{' '} + +

    + )} + + {timeline.lastError && overallStatus === 'FAILED' && ( +

    + Last error: {timeline.lastError} +

    + )} +
    + )} + + {/* Initial empty state — nothing searched yet */} + {!loading && !timeline && !error && ( +
    +

    Enter a notification ID above to view its delivery history.

    +
    + )} +
    + ); +} diff --git a/dashboard/src/hooks/useEventSelectors.ts b/dashboard/src/hooks/useEventSelectors.ts index 07da9b9..ba53e31 100644 --- a/dashboard/src/hooks/useEventSelectors.ts +++ b/dashboard/src/hooks/useEventSelectors.ts @@ -13,7 +13,10 @@ export function useFilteredEvents() { events, filters.search, filters.contractAddress, - filters.eventType + filters.eventType, + filters.status, + filters.dateFrom, + filters.dateTo ), [events, filters] ); diff --git a/dashboard/src/index.css b/dashboard/src/index.css index f3b7ac7..3ebdf73 100644 --- a/dashboard/src/index.css +++ b/dashboard/src/index.css @@ -758,3 +758,351 @@ body { display: none; } } + +/* ── Notification Delivery Timeline ─────────────────────────────────────── */ + +.timeline-view { + max-width: 720px; + margin: 32px auto; + padding: 24px; + border: 1px solid rgba(255,255,255,0.08); + border-radius: 12px; + background: rgba(255,255,255,0.03); +} + +.timeline-view__title { + margin: 0 0 20px; + font-size: 1.25rem; +} + +.timeline-view__form { + display: flex; + flex-direction: column; + gap: 8px; + margin-bottom: 24px; +} + +.timeline-view__label { + font-size: 0.85rem; + color: #9aa0a6; +} + +.timeline-view__input-row { + display: flex; + gap: 8px; +} + +.timeline-view__input { + flex: 1; + padding: 8px 12px; + border: 1px solid rgba(255,255,255,0.15); + border-radius: 8px; + background: rgba(255,255,255,0.05); + color: #e8eaed; + font-size: 0.95rem; +} + +.timeline-view__btn { + padding: 8px 18px; + border: none; + border-radius: 8px; + background: #1a73e8; + color: #fff; + font-size: 0.95rem; + cursor: pointer; +} + +.timeline-view__btn:disabled { + opacity: 0.5; + cursor: not-allowed; +} + +.timeline-view__error { + margin: 4px 0 0; + font-size: 0.85rem; + color: #f28b82; +} + +.timeline-view__empty { + text-align: center; + padding: 32px 0; + color: #9aa0a6; +} + +.timeline-view__empty-sub { + margin-top: 8px; + font-size: 0.9rem; +} + +.timeline-view__summary { + display: flex; + align-items: center; + gap: 10px; + margin-bottom: 16px; + font-size: 0.95rem; +} + +.timeline-view__summary-retries { + margin-left: auto; + font-size: 0.85rem; + color: #9aa0a6; +} + +.timeline-view__next-retry, +.timeline-view__last-error { + margin-top: 16px; + font-size: 0.85rem; + color: #9aa0a6; +} + +.timeline-view__last-error { + color: #f28b82; +} + +/* Timeline list */ + +.timeline__list { + list-style: none; + margin: 0; + padding: 0; + position: relative; +} + +.timeline__list::before { + content: ''; + position: absolute; + left: 9px; + top: 0; + bottom: 0; + width: 2px; + background: rgba(255,255,255,0.08); +} + +.timeline__entry { + display: flex; + gap: 16px; + padding: 12px 0; + position: relative; +} + +.timeline__dot { + flex-shrink: 0; + width: 20px; + height: 20px; + border-radius: 50%; + border: 2px solid transparent; + position: relative; + z-index: 1; +} + +.timeline__dot--inline { + display: inline-block; + vertical-align: middle; +} + +.timeline__dot--pending { background: #5f6368; border-color: #9aa0a6; } +.timeline__dot--processing { background: #1a73e8; border-color: #4a90e2; } +.timeline__dot--completed { background: #34a853; border-color: #81c995; } +.timeline__dot--failed { background: #ea4335; border-color: #f28b82; } +.timeline__dot--retry { background: #f9ab00; border-color: #fdd663; } +.timeline__dot--skeleton { background: rgba(255,255,255,0.12); } + +.timeline__entry-body { + flex: 1; + display: flex; + flex-direction: column; + gap: 4px; +} + +.timeline__entry-header { + display: flex; + align-items: center; + gap: 10px; + flex-wrap: wrap; +} + +.timeline__entry-label { + font-weight: 600; + font-size: 0.95rem; +} + +.timeline__entry-attempt { + font-size: 0.8rem; + color: #9aa0a6; + background: rgba(255,255,255,0.07); + padding: 2px 6px; + border-radius: 4px; +} + +.timeline__entry-time { + margin-left: auto; + font-size: 0.8rem; + color: #9aa0a6; +} + +.timeline__entry-error { + margin: 2px 0 0; + font-size: 0.85rem; + color: #f28b82; +} + +.timeline__entry-duration { + font-size: 0.8rem; + color: #9aa0a6; +} + +/* Skeleton animation */ +.timeline__skeleton-line { + height: 12px; + border-radius: 4px; + background: rgba(255,255,255,0.08); + animation: timeline-pulse 1.4s ease-in-out infinite; +} + +@keyframes timeline-pulse { + 0%, 100% { opacity: 1; } + 50% { opacity: 0.4; } +} + +/* ── App tab navigation ─────────────────────────────────────────────────── */ +.app-tabs { + display: flex; + gap: 4px; + padding: 0 0 0 0; + border-bottom: 1px solid rgba(255,255,255,0.08); + margin-bottom: 8px; +} + +.app-tabs__btn { + padding: 10px 20px; + background: none; + border: none; + border-bottom: 2px solid transparent; + color: #9aa0a6; + font-size: 0.95rem; + cursor: pointer; + margin-bottom: -1px; +} + +.app-tabs__btn--active, +.app-tabs__btn[aria-selected="true"] { + color: #e8eaed; + border-bottom-color: #1a73e8; +} + +/* ── Notification Search Bar ────────────────────────────────────────────── */ + +.notif-search { + display: flex; + flex-wrap: wrap; + align-items: flex-end; + gap: 12px; + margin: 20px 0 16px; + padding: 16px; + border: 1px solid rgba(255,255,255,0.08); + border-radius: 12px; + background: rgba(255,255,255,0.03); +} + +.notif-search__group { + display: flex; + flex-direction: column; + gap: 6px; + min-width: 140px; +} + +.notif-search__group--wide { + flex: 1 1 260px; +} + +.notif-search__label { + font-size: 0.8rem; + color: #9aa0a6; + font-weight: 500; +} + +.notif-search__input { + padding: 8px 12px; + border: 1px solid rgba(255,255,255,0.15); + border-radius: 8px; + background: rgba(255,255,255,0.05); + color: #e8eaed; + font-size: 0.9rem; + width: 100%; +} + +.notif-search__input:focus { + outline: 2px solid #1a73e8; + outline-offset: 2px; +} + +.notif-search__input--date { + color-scheme: dark; +} + +/* Status button group */ +.notif-search__status-group { + display: flex; + gap: 4px; +} + +.notif-search__status-btn { + padding: 6px 14px; + border: 1px solid rgba(255,255,255,0.15); + border-radius: 20px; + background: transparent; + color: #9aa0a6; + font-size: 0.85rem; + cursor: pointer; + transition: background 0.15s, color 0.15s; +} + +.notif-search__status-btn--active, +.notif-search__status-btn[aria-pressed="true"] { + background: #1a73e8; + border-color: #1a73e8; + color: #fff; +} + +.notif-search__status-btn:hover:not(.notif-search__status-btn--active) { + background: rgba(255,255,255,0.07); + color: #e8eaed; +} + +/* Clear button */ +.notif-search__clear { + align-self: flex-end; + padding: 8px 14px; + border: 1px solid rgba(255,255,255,0.15); + border-radius: 8px; + background: transparent; + color: #9aa0a6; + font-size: 0.85rem; + cursor: pointer; +} + +.notif-search__clear:hover { + color: #e8eaed; + border-color: rgba(255,255,255,0.3); +} + +/* Responsive: stack to single column on small screens */ +@media (max-width: 600px) { + .notif-search { + flex-direction: column; + } + + .notif-search__group, + .notif-search__group--wide { + width: 100%; + } + + .notif-search__status-group { + width: 100%; + } + + .notif-search__status-btn { + flex: 1; + text-align: center; + } +} diff --git a/dashboard/src/pages/EventExplorerPage.tsx b/dashboard/src/pages/EventExplorerPage.tsx index 724938b..3eab266 100644 --- a/dashboard/src/pages/EventExplorerPage.tsx +++ b/dashboard/src/pages/EventExplorerPage.tsx @@ -1,5 +1,6 @@ import { useCallback, useEffect, useMemo, useState } from 'react'; import { EventFiltersBar } from '../components/EventFiltersBar'; +import { NotificationSearchBar } from '../components/NotificationSearchBar'; import { WalletConnectButton } from '../components/WalletConnectButton'; import { EventExplorerTable } from '../components/EventExplorerTable'; import { EventExplorerSkeleton } from '../components/EventExplorerSkeleton'; @@ -86,7 +87,7 @@ export function EventExplorerPage() { useEffect(() => { setPage(1); - }, [filters.search, filters.contractAddress, filters.eventType]); + }, [filters.search, filters.contractAddress, filters.eventType, filters.status, filters.dateFrom, filters.dateTo]); useEffect(() => { if (typeof window === 'undefined') { @@ -138,6 +139,7 @@ export function EventExplorerPage() { + {error && (
    diff --git a/dashboard/src/services/timelineApi.ts b/dashboard/src/services/timelineApi.ts new file mode 100644 index 0000000..e1b45bb --- /dev/null +++ b/dashboard/src/services/timelineApi.ts @@ -0,0 +1,11 @@ +import type { NotificationTimeline } from '../types/timeline'; + +const BASE_URL = + (typeof import.meta !== 'undefined' && (import.meta as any).env?.VITE_EVENTS_API_URL) || + 'http://localhost:8787'; + +export async function fetchTimeline(notificationId: number): Promise { + const res = await fetch(`${BASE_URL}/api/notifications/${notificationId}/timeline`); + if (!res.ok) throw new Error(`Failed to fetch timeline: ${res.status}`); + return res.json() as Promise; +} diff --git a/dashboard/src/store/eventStore.ts b/dashboard/src/store/eventStore.ts index 5dcd525..56763c8 100644 --- a/dashboard/src/store/eventStore.ts +++ b/dashboard/src/store/eventStore.ts @@ -1,5 +1,5 @@ import { create } from 'zustand'; -import type { BlockchainEvent, EventFilters } from '../types/event'; +import type { BlockchainEvent, EventFilters, NotificationStatus } from '../types/event'; import { filterEvents } from '../utils/eventData'; interface EventStoreState { @@ -12,6 +12,9 @@ interface EventStoreState { setSearch: (search: string) => void; setContractFilter: (contractAddress: string) => void; setEventTypeFilter: (eventType: string) => void; + setStatusFilter: (status: NotificationStatus) => void; + setDateFrom: (dateFrom: string) => void; + setDateTo: (dateTo: string) => void; setLoading: (isLoading: boolean) => void; setError: (error: string | null) => void; } @@ -35,6 +38,9 @@ export const useEventStore = create((set) => ({ search: '', contractAddress: 'all', eventType: 'all', + status: 'all', + dateFrom: '', + dateTo: '', }, isLoading: false, error: null, @@ -43,18 +49,17 @@ export const useEventStore = create((set) => ({ set((state) => ({ events: dedupeEventsById([...state.events, ...events]), })), - setSearch: (search) => - set((state) => ({ - filters: { ...state.filters, search }, - })), + setSearch: (search) => set((state) => ({ filters: { ...state.filters, search } })), setContractFilter: (contractAddress) => - set((state) => ({ - filters: { ...state.filters, contractAddress }, - })), + set((state) => ({ filters: { ...state.filters, contractAddress } })), setEventTypeFilter: (eventType) => - set((state) => ({ - filters: { ...state.filters, eventType }, - })), + set((state) => ({ filters: { ...state.filters, eventType } })), + setStatusFilter: (status) => + set((state) => ({ filters: { ...state.filters, status } })), + setDateFrom: (dateFrom) => + set((state) => ({ filters: { ...state.filters, dateFrom } })), + setDateTo: (dateTo) => + set((state) => ({ filters: { ...state.filters, dateTo } })), setLoading: (isLoading) => set({ isLoading }), setError: (error) => set({ error }), })); @@ -65,7 +70,10 @@ export function selectFilteredEvents(state: EventStoreState): BlockchainEvent[] events, filters.search, filters.contractAddress, - filters.eventType + filters.eventType, + filters.status, + filters.dateFrom, + filters.dateTo ); } diff --git a/dashboard/src/types/event.ts b/dashboard/src/types/event.ts index ca0fd02..8ed9838 100644 --- a/dashboard/src/types/event.ts +++ b/dashboard/src/types/event.ts @@ -8,10 +8,17 @@ export interface BlockchainEvent { value: string; txHash?: string; receivedAt: number; + /** Whether the user has seen/read this notification. Default: false */ + read?: boolean; } +export type NotificationStatus = 'all' | 'read' | 'unread'; + export interface EventFilters { search: string; contractAddress: string; eventType: string; + status: NotificationStatus; + dateFrom: string; // ISO date string "YYYY-MM-DD" or "" + dateTo: string; // ISO date string "YYYY-MM-DD" or "" } diff --git a/dashboard/src/types/timeline.ts b/dashboard/src/types/timeline.ts new file mode 100644 index 0000000..5932449 --- /dev/null +++ b/dashboard/src/types/timeline.ts @@ -0,0 +1,20 @@ +export type TimelineStatus = 'PENDING' | 'PROCESSING' | 'COMPLETED' | 'FAILED' | 'RETRY'; + +export interface TimelineEntry { + attempt: number; + status: TimelineStatus; + executionTime: string; // ISO string + errorMessage?: string | null; + durationMs?: number | null; +} + +export interface NotificationTimeline { + notificationId: number; + status: TimelineStatus; + retryCount: number; + maxRetries: number; + createdAt: string; + nextRetryAt?: string | null; + lastError?: string | null; + entries: TimelineEntry[]; +} diff --git a/dashboard/src/utils/eventData.ts b/dashboard/src/utils/eventData.ts index f0b540b..e093ad0 100644 --- a/dashboard/src/utils/eventData.ts +++ b/dashboard/src/utils/eventData.ts @@ -36,22 +36,26 @@ export function filterEvents( events: BlockchainEvent[], search: string, contractAddress: string, - eventType: string + eventType: string, + status: import('../types/event').NotificationStatus = 'all', + dateFrom = '', + dateTo = '' ): BlockchainEvent[] { const normalizedSearch = search.trim().toLowerCase(); + const fromMs = dateFrom ? new Date(dateFrom).getTime() : 0; + // dateTo is inclusive: include the entire day + const toMs = dateTo ? new Date(dateTo).getTime() + 86_399_999 : Infinity; return events.filter((event) => { - if (contractAddress !== 'all' && event.contractAddress !== contractAddress) { - return false; - } + if (contractAddress !== 'all' && event.contractAddress !== contractAddress) return false; + if (eventType !== 'all' && event.eventName !== eventType) return false; - if (eventType !== 'all' && event.eventName !== eventType) { - return false; - } + if (status === 'read' && !event.read) return false; + if (status === 'unread' && event.read) return false; - if (!normalizedSearch) { - return true; - } + if (event.receivedAt < fromMs || event.receivedAt > toMs) return false; + + if (!normalizedSearch) return true; return ( event.eventId.toLowerCase().includes(normalizedSearch) || diff --git a/listener/.env.example b/listener/.env.example index f671c87..366a838 100644 --- a/listener/.env.example +++ b/listener/.env.example @@ -25,6 +25,16 @@ DISCORD_WEBHOOK_URL=https://discord.com/api/webhooks/YOUR_WEBHOOK_ID/YOUR_WEBHOO # Retry Queue Configuration RETRY_BASE_DELAY_MS=5000 RETRY_MAX_RETRIES=5 +RETRY_MULTIPLIER=2 +RETRY_MAX_DELAY_MS=3600000 +RETRY_JITTER=true + +# Retry Scheduler (DB-backed retry for persisted notifications) +RETRY_SCHEDULER_ENABLED=true +RETRY_SCHEDULER_POLL_INTERVAL_MS=15000 +RETRY_SCHEDULER_LOCK_TIMEOUT_MS=60000 +RETRY_SCHEDULER_PROCESSOR_ID= +RETRY_SCHEDULER_BATCH_SIZE=10 # Database Configuration DATABASE_PATH=./data/notifications.db @@ -43,3 +53,10 @@ RATE_LIMIT_WINDOW_MS=60000 RATE_LIMIT_MAX_REQUESTS=60 # Per-client overrides (JSON object): {"client-id":{"maxRequests":100,"windowMs":60000}} RATE_LIMIT_CLIENT_OVERRIDES={} + +# Notification Archive Configuration +# ARCHIVE_ENABLED=true # Set to 'false' to disable the background archiver +# ARCHIVE_INTERVAL_MS=21600000 # How often to run the archive cycle (default: 6 h) +# ARCHIVE_AFTER_MS=604800000 # Archive notifications completed > X ms ago (default: 7 days) +# ARCHIVE_DELETE_AFTER_MS=7776000000 # Permanently delete archive rows > X ms old (default: 90 days; 0 = never) +# ARCHIVE_BATCH_SIZE=500 # Max rows processed per cycle diff --git a/listener/NOTIFICATION_ARCHIVING.md b/listener/NOTIFICATION_ARCHIVING.md new file mode 100644 index 0000000..551f7ce --- /dev/null +++ b/listener/NOTIFICATION_ARCHIVING.md @@ -0,0 +1,138 @@ +# Notification Archiving Service + +## Overview + +The archiving service automatically moves old, terminal-state notifications out of the active `scheduled_notifications` table into a read-only `notification_archive` table. Archived records remain fully queryable for audit purposes and are permanently deleted only after a configurable grace period. + +--- + +## How the Archiving Loop Works + +A background worker (`ArchiveService`) runs on a configurable interval. Each cycle has two phases: + +``` +Every ARCHIVE_INTERVAL_MS (default 6 h) +│ +├─ Phase 1 — Archive +│ SELECT up to ARCHIVE_BATCH_SIZE rows FROM scheduled_notifications +│ WHERE status IN ('COMPLETED','FAILED','CANCELLED') +│ AND processing_completed_at < NOW() - ARCHIVE_AFTER_MS +│ ↓ TRANSACTION +│ INSERT those rows INTO notification_archive +│ DELETE those rows FROM scheduled_notifications +│ +└─ Phase 2 — Purge (skipped when ARCHIVE_DELETE_AFTER_MS = 0) + DELETE FROM notification_archive + WHERE archived_at < NOW() - ARCHIVE_DELETE_AFTER_MS +``` + +Both phases run inside a single `setInterval` tick. The batch size cap keeps individual transactions short and prevents long table locks. + +--- + +## Data Schema + +### `notification_archive` table (new) + +| Column | Type | Notes | +|---|---|---| +| `id` | INTEGER PK | Archive-table primary key | +| `original_id` | INTEGER | PK from `scheduled_notifications` at archival time | +| `payload` | TEXT | JSON notification payload | +| `notification_type` | VARCHAR(50) | discord / email / webhook / sms | +| `target_recipient` | TEXT | User ID, webhook URL, etc. | +| `execute_at` | DATETIME | Original scheduled time | +| `created_at` | DATETIME | When the notification was originally created | +| `processing_completed_at` | DATETIME | When the notification finished processing (nullable) | +| `status` | VARCHAR(20) | Terminal status at archival: COMPLETED / FAILED / CANCELLED | +| `retry_count` | INTEGER | Final retry count | +| `last_error` | TEXT | Last error message (nullable) | +| `event_id` | TEXT | Source blockchain event ID (nullable) | +| `contract_address` | TEXT | Source contract address (nullable) | +| `metadata` | TEXT | JSON metadata blob (nullable) | +| `archived_at` | DATETIME | When this row was inserted into the archive | + +The archive table is append-only; no UPDATE triggers are applied. + +--- + +## Retention Policy + +| Variable | Default | Description | +|---|---|---| +| `ARCHIVE_ENABLED` | `true` | Set to `false` to disable the worker entirely | +| `ARCHIVE_INTERVAL_MS` | `21600000` (6 h) | How often the cycle runs | +| `ARCHIVE_AFTER_MS` | `604800000` (7 d) | Move notifications completed > X ms ago | +| `ARCHIVE_DELETE_AFTER_MS` | `7776000000` (90 d) | Permanently delete archive rows > X ms old; `0` = never delete | +| `ARCHIVE_BATCH_SIZE` | `500` | Max rows archived per cycle | + +Set these in your `.env` file or as environment variables before starting the service. + +Example — archive after 3 days, purge after 30 days, run every hour: + +```env +ARCHIVE_AFTER_MS=259200000 +ARCHIVE_DELETE_AFTER_MS=2592000000 +ARCHIVE_INTERVAL_MS=3600000 +``` + +--- + +## API Endpoints + +All endpoints return `application/json`. + +### `GET /api/archive` + +Paginated list of archived notifications for audit queries. + +**Query parameters** + +| Parameter | Type | Description | +|---|---|---| +| `limit` | number | Max records to return (default: service default) | +| `offset` | number | Pagination offset | +| `status` | string | Filter by terminal status: `COMPLETED`, `FAILED`, `CANCELLED` | +| `contractAddress` | string | Filter by source contract address | +| `startDate` | ISO 8601 | Filter `archived_at >= startDate` | +| `endDate` | ISO 8601 | Filter `archived_at <= endDate` | + +**Response** +```json +{ + "records": [ /* ArchivedNotification[] */ ], + "total": 1024, + "limit": 20, + "offset": 0, + "itemCount": 20, + "totalPages": 52 +} +``` + +### `GET /api/archive/:id` + +Fetch a single archived record by its archive-table PK. + +- **200** — record found +- **404** — no record with that `id` + +### `POST /api/archive/run` *(admin)* + +Trigger an on-demand archive + purge cycle immediately. Requires `ArchiveService` to be enabled. + +**Response** +```json +{ + "archived": 12, + "purged": 3, + "durationMs": 45 +} +``` + +--- + +## Operational Notes + +- **No data loss on crash** — because archival is transactional (copy + delete in one SQLite transaction), a crash mid-cycle leaves the original rows intact and the partial archive rows are cleaned up on the next cycle (duplicate `original_id` rows are harmless for read queries). +- **Active notifications are never touched** — only rows with a terminal status (`COMPLETED`, `FAILED`, `CANCELLED`) *and* a non-null `processing_completed_at` older than `ARCHIVE_AFTER_MS` are eligible. +- **Disabling** — set `ARCHIVE_ENABLED=false` to keep all notifications in the active table indefinitely. The `notification_archive` table is still created; it will simply remain empty. diff --git a/listener/src/__tests__/retry-scheduler.integration.test.ts b/listener/src/__tests__/retry-scheduler.integration.test.ts new file mode 100644 index 0000000..dcc02b6 --- /dev/null +++ b/listener/src/__tests__/retry-scheduler.integration.test.ts @@ -0,0 +1,210 @@ +/** + * Integration test: RetryScheduler + ScheduledNotificationRepository + SQLite + * + * Uses an in-memory SQLite database so no file system state is needed. + */ +import { Database } from '../database/database'; +import { ScheduledNotificationRepository } from '../services/scheduled-notification-repository'; +import { RetryScheduler, RETRY_SCHEDULER_DEFAULTS, calculateBackoffDelay } from '../services/retry-scheduler'; +import { NotificationStatus, NotificationType } from '../types/scheduled-notification'; + +jest.mock('../utils/logger', () => ({ + __esModule: true, + default: { info: jest.fn(), warn: jest.fn(), error: jest.fn(), debug: jest.fn() }, +})); +jest.mock('../utils/request-id', () => ({ generateRequestId: () => 'integ-req-id' })); + +// ─── helpers ──────────────────────────────────────────────────────────────── + +async function setupDb(): Promise { + const db = new Database(':memory:'); + await db.initialize(); + // Add next_retry_at column (schema.sql ALTER TABLE is idempotent in our migration) + // The column is already part of schema.sql after our patch, so this is a no-op guard. + return db; +} + +async function insertFailedNotification( + repo: ScheduledNotificationRepository, + db: Database, + retryCount = 1, + nextRetryAt: Date | null = null +): Promise { + // Create initially, then manually set retry_count and status to simulate a prior failure + const id = await repo.create( + { + payload: { event: {}, contractConfig: {} }, + notificationType: NotificationType.DISCORD, + targetRecipient: 'test-recipient', + executeAt: new Date(Date.now() - 1000), + maxRetries: 3, + } + ); + + await db.run( + `UPDATE scheduled_notifications + SET status = ?, retry_count = ?, next_retry_at = ? + WHERE id = ?`, + [NotificationStatus.PENDING, retryCount, nextRetryAt?.toISOString() ?? null, id] + ); + + return id; +} + +// ─── tests ─────────────────────────────────────────────────────────────────── + +describe('RetryScheduler integration', () => { + let db: Database; + let repo: ScheduledNotificationRepository; + + beforeEach(async () => { + db = await setupDb(); + repo = new ScheduledNotificationRepository(db); + }); + + afterEach(async () => { + await db.close(); + }); + + it('picks up a PENDING notification with retry_count > 0 and marks it COMPLETED on success', async () => { + const id = await insertFailedNotification(repo, db, 1, null); + + const discordService = { sendEventNotification: jest.fn().mockResolvedValue(true) } as any; + const scheduler = new RetryScheduler( + repo, + { ...RETRY_SCHEDULER_DEFAULTS, jitter: false }, + discordService + ); + + await scheduler.runOnce(); + + const row = await db.get<{ status: string; retry_count: number }>( + 'SELECT status, retry_count FROM scheduled_notifications WHERE id = ?', + [id] + ); + + expect(row!.status).toBe(NotificationStatus.COMPLETED); + expect(discordService.sendEventNotification).toHaveBeenCalledTimes(1); + }); + + it('writes next_retry_at when delivery fails and retries remain', async () => { + const id = await insertFailedNotification(repo, db, 1, null); + + const discordService = { sendEventNotification: jest.fn().mockResolvedValue(false) } as any; + const beforeRun = Date.now(); + const scheduler = new RetryScheduler( + repo, + { ...RETRY_SCHEDULER_DEFAULTS, baseDelayMs: 1000, multiplier: 2, jitter: false }, + discordService + ); + + await scheduler.runOnce(); + + const row = await db.get<{ status: string; retry_count: number; next_retry_at: string | null }>( + 'SELECT status, retry_count, next_retry_at FROM scheduled_notifications WHERE id = ?', + [id] + ); + + expect(row!.status).toBe(NotificationStatus.PENDING); + expect(row!.retry_count).toBe(2); // incremented from 1 → 2 + expect(row!.next_retry_at).not.toBeNull(); + + const nextRetryAt = new Date(row!.next_retry_at!).getTime(); + // Base backoff for attempt=1 without jitter: 1000 * 2^1 = 2000 ms + const expectedDelay = calculateBackoffDelay(1, 1000, 2, RETRY_SCHEDULER_DEFAULTS.maxDelayMs, false); + expect(nextRetryAt).toBeGreaterThanOrEqual(beforeRun + expectedDelay * 0.9); + }); + + it('marks notification FAILED when max retries exhausted', async () => { + const id = await insertFailedNotification(repo, db, 3, null); // retryCount === maxRetries + + const discordService = { sendEventNotification: jest.fn().mockResolvedValue(false) } as any; + const scheduler = new RetryScheduler( + repo, + { ...RETRY_SCHEDULER_DEFAULTS, jitter: false }, + discordService + ); + + await scheduler.runOnce(); + + const row = await db.get<{ status: string; retry_count: number }>( + 'SELECT status, retry_count FROM scheduled_notifications WHERE id = ?', + [id] + ); + + expect(row!.status).toBe(NotificationStatus.FAILED); + expect(row!.retry_count).toBe(4); + }); + + it('does not pick up a notification whose next_retry_at is in the future', async () => { + const futureRetryAt = new Date(Date.now() + 60_000); + await insertFailedNotification(repo, db, 1, futureRetryAt); + + const discordService = { sendEventNotification: jest.fn() } as any; + const scheduler = new RetryScheduler( + repo, + { ...RETRY_SCHEDULER_DEFAULTS }, + discordService + ); + + await scheduler.runOnce(); + + expect(discordService.sendEventNotification).not.toHaveBeenCalled(); + }); + + it('does not pick up first-attempt notifications (retry_count === 0)', async () => { + // Fresh notification with no prior failures + await repo.create({ + payload: { event: {}, contractConfig: {} }, + notificationType: NotificationType.DISCORD, + targetRecipient: 'recipient', + executeAt: new Date(Date.now() - 1000), + }); + + const discordService = { sendEventNotification: jest.fn() } as any; + const scheduler = new RetryScheduler(repo, { ...RETRY_SCHEDULER_DEFAULTS }, discordService); + + await scheduler.runOnce(); + + expect(discordService.sendEventNotification).not.toHaveBeenCalled(); + }); + + it('prevents duplicate processing via distributed lock', async () => { + const id = await insertFailedNotification(repo, db, 1, null); + + let resolveFirst!: (value: boolean) => void; + const firstDeliveryBarrier = new Promise((res) => { resolveFirst = res; }); + + const callOrder: string[] = []; + const discordService = { + sendEventNotification: jest.fn() + .mockImplementationOnce(async () => { + callOrder.push('first-start'); + const result = await firstDeliveryBarrier; + callOrder.push('first-end'); + return result; + }) + .mockResolvedValue(true), + } as any; + + const s1 = new RetryScheduler(repo, { ...RETRY_SCHEDULER_DEFAULTS, jitter: false }, discordService); + const s2 = new RetryScheduler(repo, { ...RETRY_SCHEDULER_DEFAULTS, jitter: false }, discordService); + + // s1 starts processing but hasn't finished delivery yet + const p1 = s1.runOnce(); + // s2 tries to pick up the same notification — should get nothing (locked) + await s2.runOnce(); + // Now s1 finishes + resolveFirst(true); + await p1; + + // Only one scheduler should have delivered + expect(discordService.sendEventNotification).toHaveBeenCalledTimes(1); + + const row = await db.get<{ status: string }>( + 'SELECT status FROM scheduled_notifications WHERE id = ?', + [id] + ); + expect(row!.status).toBe(NotificationStatus.COMPLETED); + }); +}); diff --git a/listener/src/api/archive-api.ts b/listener/src/api/archive-api.ts new file mode 100644 index 0000000..870cc6f --- /dev/null +++ b/listener/src/api/archive-api.ts @@ -0,0 +1,102 @@ +/** + * Archive API route handler. + * + * Mounted into events-server.ts and handles: + * GET /api/archive – paginated list of archived notifications + * GET /api/archive/:id – single archived record by archive PK + * POST /api/archive/run – trigger an on-demand archive cycle (admin) + * + * All endpoints return JSON. The optional `archiveService` parameter is only + * needed for the admin /run endpoint; read-only endpoints only require `store`. + */ +import http from 'http'; +import { ArchiveStore } from '../services/archive-store'; +import { ArchiveService } from '../services/archive-service'; +import logger from '../utils/logger'; + +export interface ArchiveApiHandlerDeps { + store: ArchiveStore; + service?: ArchiveService | null; +} + +/** + * Try to handle an archive API request. + * Returns `true` if the request was handled (so the caller can `return`), + * `false` if it was not an archive route. + */ +export async function handleArchiveRequest( + req: http.IncomingMessage, + res: http.ServerResponse, + deps: ArchiveApiHandlerDeps, + requestId: string, +): Promise { + const url = new URL(req.url ?? '/', 'http://localhost'); + const { pathname } = url; + + // POST /api/archive/run – trigger on-demand cycle + if (req.method === 'POST' && pathname === '/api/archive/run') { + if (!deps.service) { + res.writeHead(503, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'Archive service not enabled' })); + return true; + } + logger.info('Handling POST /api/archive/run', { requestId }); + try { + const result = await deps.service.runCycle(); + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify(result)); + } catch (err) { + logger.error('Archive run failed', { error: err, requestId }); + res.writeHead(500, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: (err as Error).message })); + } + return true; + } + + // GET /api/archive/:id + const singleMatch = pathname.match(/^\/api\/archive\/(\d+)$/); + if (req.method === 'GET' && singleMatch) { + const id = parseInt(singleMatch[1], 10); + logger.info('Handling GET /api/archive/:id', { requestId, id }); + try { + const record = await deps.store.getById(id); + if (!record) { + res.writeHead(404, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'Archived record not found' })); + return true; + } + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify(record)); + } catch (err) { + logger.error('Failed to fetch archive record', { error: err, requestId, id }); + res.writeHead(500, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: (err as Error).message })); + } + return true; + } + + // GET /api/archive + if (req.method === 'GET' && pathname === '/api/archive') { + logger.info('Handling GET /api/archive', { requestId }); + try { + const options = { + limit: url.searchParams.get('limit') ? parseInt(url.searchParams.get('limit')!, 10) : undefined, + offset: url.searchParams.get('offset') ? parseInt(url.searchParams.get('offset')!, 10) : undefined, + status: url.searchParams.get('status') ?? undefined, + contractAddress: url.searchParams.get('contractAddress') ?? undefined, + startDate: url.searchParams.get('startDate') ?? undefined, + endDate: url.searchParams.get('endDate') ?? undefined, + }; + const result = await deps.store.query(options); + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify(result)); + } catch (err) { + logger.error('Failed to query archive', { error: err, requestId }); + res.writeHead(500, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: (err as Error).message })); + } + return true; + } + + return false; +} diff --git a/listener/src/api/events-server.ts b/listener/src/api/events-server.ts index c1153c5..40cc16a 100644 --- a/listener/src/api/events-server.ts +++ b/listener/src/api/events-server.ts @@ -33,6 +33,9 @@ import { serializeTemplate, } from './template-api'; import { CreateNotificationTemplateInput } from '../types/notification-template'; +import { handleArchiveRequest } from './archive-api'; +import { ArchiveStore } from '../services/archive-store'; +import { ArchiveService } from '../services/archive-service'; export interface EventsServerOptions { port: number; @@ -49,6 +52,10 @@ export interface EventsServerOptions { */ analyticsAggregator?: NotificationAnalyticsAggregator | null; templateService?: NotificationTemplateService | null; + /** Archive store for retrieval endpoints (optional). */ + archiveStore?: ArchiveStore | null; + /** Archive service for the admin /run endpoint (optional). */ + archiveService?: ArchiveService | null; } type ServiceStatus = 'ok' | 'error' | 'not_configured'; @@ -700,6 +707,15 @@ export function createEventsServer(options: EventsServerOptions): http.Server { return; } + // GET /api/archive, GET /api/archive/:id, POST /api/archive/run + if (options.archiveStore && (url.pathname === '/api/archive' || url.pathname.startsWith('/api/archive/'))) { + const handled = await handleArchiveRequest(req, res, { + store: options.archiveStore, + service: options.archiveService, + }, requestId); + if (handled) return; + } + logger.warn('Unhandled request', { requestId, method: req.method, diff --git a/listener/src/config.ts b/listener/src/config.ts index 6c361e4..97068a3 100644 --- a/listener/src/config.ts +++ b/listener/src/config.ts @@ -1,4 +1,4 @@ -import { Config, ContractConfig, DiscordConfig, WebhookSecret, AppCleanupConfig } from './types'; +import { Config, ContractConfig, DiscordConfig, WebhookSecret, AppCleanupConfig, RetrySchedulerOptions } from './types'; export class ConfigError extends Error { constructor(message: string) { @@ -121,6 +121,20 @@ function loadCleanupConfig(): AppCleanupConfig { }; } +function loadRetrySchedulerConfig(): RetrySchedulerOptions { + return { + enabled: trimEnv('RETRY_SCHEDULER_ENABLED') !== 'false', + pollIntervalMs: parseIntegerEnv('RETRY_SCHEDULER_POLL_INTERVAL_MS', '15000'), + lockTimeoutMs: parseIntegerEnv('RETRY_SCHEDULER_LOCK_TIMEOUT_MS', '60000'), + processorId: trimEnv('RETRY_SCHEDULER_PROCESSOR_ID'), + batchSize: parseIntegerEnv('RETRY_SCHEDULER_BATCH_SIZE', '10'), + baseDelayMs: parseIntegerEnv('RETRY_BASE_DELAY_MS', '5000'), + multiplier: parseIntegerEnv('RETRY_MULTIPLIER', '2'), + maxDelayMs: parseIntegerEnv('RETRY_MAX_DELAY_MS', String(60 * 60 * 1000)), + jitter: trimEnv('RETRY_JITTER') !== 'false', + }; +} + export function loadConfig(): Config { const discord = loadDiscordConfig(); const rawContractAddresses = parseJsonEnv('CONTRACT_ADDRESSES', '[]'); @@ -145,6 +159,8 @@ export function loadConfig(): Config { retryQueue: { baseDelayMs: parseIntegerEnv('RETRY_BASE_DELAY_MS', '5000'), maxRetries: parseIntegerEnv('RETRY_MAX_RETRIES', '5'), + multiplier: parseIntegerEnv('RETRY_MULTIPLIER', '2'), + jitter: trimEnv('RETRY_JITTER') !== 'false', }, webhookSecrets: validateWebhookSecrets(rawWebhookSecrets), scheduler: { @@ -155,6 +171,7 @@ export function loadConfig(): Config { batchSize: parseIntegerEnv('SCHEDULER_BATCH_SIZE', '10'), timingBufferMs: parseIntegerEnv('SCHEDULER_TIMING_BUFFER_MS', '60000'), }, + retryScheduler: loadRetrySchedulerConfig(), rateLimit: { enabled: trimEnv('RATE_LIMIT_ENABLED') !== 'false', windowMs: parseIntegerEnv('RATE_LIMIT_WINDOW_MS', '60000'), diff --git a/listener/src/database/archive-schema.sql b/listener/src/database/archive-schema.sql new file mode 100644 index 0000000..546e109 --- /dev/null +++ b/listener/src/database/archive-schema.sql @@ -0,0 +1,46 @@ +-- Archive table for notifications moved out of active storage. +-- Records here are read-only for audit purposes and are never modified. +CREATE TABLE IF NOT EXISTS notification_archive ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + + -- Original record identity + original_id INTEGER NOT NULL, -- PK from scheduled_notifications at time of archiving + payload TEXT NOT NULL, + notification_type VARCHAR(50) NOT NULL, + target_recipient TEXT NOT NULL, + + -- Original scheduling / timing + execute_at DATETIME NOT NULL, + created_at DATETIME NOT NULL, + processing_completed_at DATETIME, + + -- Final status at time of archiving + status VARCHAR(20) NOT NULL, -- COMPLETED | FAILED | CANCELLED + retry_count INTEGER NOT NULL DEFAULT 0, + last_error TEXT, + + -- Optional references + event_id TEXT, + contract_address TEXT, + metadata TEXT, + + -- Archival bookkeeping + archived_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_archive_original_id + ON notification_archive(original_id); + +CREATE INDEX IF NOT EXISTS idx_archive_archived_at + ON notification_archive(archived_at); + +CREATE INDEX IF NOT EXISTS idx_archive_status + ON notification_archive(status); + +CREATE INDEX IF NOT EXISTS idx_archive_contract_address + ON notification_archive(contract_address) + WHERE contract_address IS NOT NULL; + +CREATE INDEX IF NOT EXISTS idx_archive_event_id + ON notification_archive(event_id) + WHERE event_id IS NOT NULL; diff --git a/listener/src/database/schema.sql b/listener/src/database/schema.sql index 749b35f..422ece5 100644 --- a/listener/src/database/schema.sql +++ b/listener/src/database/schema.sql @@ -49,6 +49,10 @@ CREATE INDEX IF NOT EXISTS idx_scheduled_notifications_lock_expires ON scheduled_notifications(lock_expires_at, status) WHERE status = 'PROCESSING'; +CREATE INDEX IF NOT EXISTS idx_scheduled_notifications_next_retry_at + ON scheduled_notifications(next_retry_at, status) + WHERE status = 'PENDING'; + CREATE INDEX IF NOT EXISTS idx_scheduled_notifications_created_at ON scheduled_notifications(created_at); @@ -82,6 +86,9 @@ CREATE INDEX IF NOT EXISTS idx_execution_log_execution_time CREATE INDEX IF NOT EXISTS idx_execution_log_status_execution_time ON notification_execution_log(status, execution_time); +-- Migration: add next_retry_at for explicit retry scheduling +ALTER TABLE scheduled_notifications ADD COLUMN next_retry_at DATETIME; + -- Trigger to update updated_at timestamp CREATE TRIGGER IF NOT EXISTS update_scheduled_notifications_timestamp AFTER UPDATE ON scheduled_notifications diff --git a/listener/src/index.ts b/listener/src/index.ts index c6397cb..b3be518 100644 --- a/listener/src/index.ts +++ b/listener/src/index.ts @@ -2,6 +2,7 @@ import dotenv from 'dotenv'; import { startEventsServer } from './api/events-server'; import { EventSubscriber } from './services/event-subscriber'; import { NotificationScheduler } from './services/notification-scheduler'; +import { RetryScheduler } from './services/retry-scheduler'; import { ScheduledNotificationRepository } from './services/scheduled-notification-repository'; import { NotificationTemplateRepository } from './services/notification-template-repository'; import { NotificationTemplateService } from './services/notification-template-service'; @@ -9,6 +10,9 @@ import { TemplateAuditTrail } from './services/template-audit-trail'; import { getTemplateCache } from './services/notification-template-cache'; import { NotificationAPI } from './services/notification-api'; import { CleanupService } from './services/cleanup-service'; +import { ArchiveService } from './services/archive-service'; +import { ArchiveStore } from './services/archive-store'; +import { loadArchiveConfig } from './services/archive-config'; import { initializeDatabase } from './database/database'; import { DiscordNotificationService } from './services/discord-notification'; import { eventRegistry } from './store/event-registry'; @@ -22,9 +26,12 @@ async function main() { // Initialize database for templates, scheduler, and rate limiting let scheduler: NotificationScheduler | null = null; + let retryScheduler: RetryScheduler | null = null; let notificationAPI: NotificationAPI | null = null; let templateService: NotificationTemplateService | null = null; let cleanupService: CleanupService | null = null; + let archiveService: ArchiveService | null = null; + let archiveStore: ArchiveStore | null = null; try { logger.info('Initializing database'); @@ -38,6 +45,16 @@ async function main() { cleanupService = new CleanupService(db, eventRegistry, config.cleanup); cleanupService.start(); + // Archive service: moves old notifications to the archive table. + const archiveCfg = loadArchiveConfig(); + archiveStore = new ArchiveStore(db); + archiveService = new ArchiveService(db, archiveCfg); + await archiveService.initialize(); + if (archiveCfg.enabled) { + archiveService.start(); + logger.info('ArchiveService started'); + } + const templateRepository = new NotificationTemplateRepository( db, new TemplateAuditTrail(db), @@ -59,6 +76,12 @@ async function main() { await scheduler.start(); logger.info('Notification scheduler started successfully'); + + if (config.retryScheduler?.enabled) { + retryScheduler = new RetryScheduler(repository, config.retryScheduler, discordService); + await retryScheduler.start(); + logger.info('Retry scheduler started successfully'); + } } } catch (error) { logger.error('Failed to initialize database or scheduler', { error }); @@ -74,6 +97,8 @@ async function main() { notificationAPI, templateService, rateLimit: config.rateLimit, + archiveStore, + archiveService, }); const subscriber = new EventSubscriber(config); @@ -86,10 +111,18 @@ async function main() { await cleanupService.stop(); } + if (archiveService) { + archiveService.stop(); + } + if (scheduler) { await scheduler.stop(); } + if (retryScheduler) { + await retryScheduler.stop(); + } + await subscriber.stop(); eventsServer.close(); diff --git a/listener/src/services/archive-config.ts b/listener/src/services/archive-config.ts new file mode 100644 index 0000000..2d3fb49 --- /dev/null +++ b/listener/src/services/archive-config.ts @@ -0,0 +1,57 @@ +/** + * Archive configuration loaded from environment variables. + * + * Retention policy (applied per archiving cycle): + * 1. Notifications older than `archiveAfterMs` are MOVED from + * `scheduled_notifications` into `notification_archive`. + * 2. Records in `notification_archive` older than `deleteAfterMs` + * are permanently deleted. + * + * Timeline example (defaults): + * Day 0 – notification completes + * Day 7 – notification is archived (moved to archive table) + * Day 90 – archived record is permanently deleted + */ +export interface ArchiveConfig { + /** Whether the archiving background worker is enabled. Default: true. */ + enabled: boolean; + /** How often to run the archive cycle (ms). Default: 6 hours. */ + intervalMs: number; + /** + * Move completed/failed/cancelled notifications to the archive after this + * many milliseconds since `processing_completed_at`. Default: 7 days. + */ + archiveAfterMs: number; + /** + * Permanently delete archived records after this many milliseconds since + * `archived_at`. 0 = never delete. Default: 90 days. + */ + deleteAfterMs: number; + /** Max rows to process per cycle (prevents long-running transactions). Default: 500. */ + batchSize: number; +} + +const DEFAULTS: ArchiveConfig = { + enabled: true, + intervalMs: 6 * 60 * 60 * 1000, // 6 h + archiveAfterMs: 7 * 24 * 60 * 60 * 1000, // 7 days + deleteAfterMs: 90 * 24 * 60 * 60 * 1000, // 90 days + batchSize: 500, +}; + +function parseIntEnv(name: string, fallback: number): number { + const raw = process.env[name]?.trim(); + if (!raw) return fallback; + const n = Number.parseInt(raw, 10); + return Number.isNaN(n) ? fallback : n; +} + +export function loadArchiveConfig(): ArchiveConfig { + return { + enabled: process.env.ARCHIVE_ENABLED?.trim() !== 'false', + intervalMs: parseIntEnv('ARCHIVE_INTERVAL_MS', DEFAULTS.intervalMs), + archiveAfterMs: parseIntEnv('ARCHIVE_AFTER_MS', DEFAULTS.archiveAfterMs), + deleteAfterMs: parseIntEnv('ARCHIVE_DELETE_AFTER_MS', DEFAULTS.deleteAfterMs), + batchSize: parseIntEnv('ARCHIVE_BATCH_SIZE', DEFAULTS.batchSize), + }; +} diff --git a/listener/src/services/archive-service.ts b/listener/src/services/archive-service.ts new file mode 100644 index 0000000..cd66bed --- /dev/null +++ b/listener/src/services/archive-service.ts @@ -0,0 +1,181 @@ +/** + * ArchiveService + * + * Background worker that enforces the notification retention policy: + * + * Phase 1 – Archive + * Every `intervalMs`, rows in `scheduled_notifications` with a terminal + * status (COMPLETED | FAILED | CANCELLED) whose `processing_completed_at` + * is older than `archiveAfterMs` are MOVED (copy + delete) into the + * `notification_archive` table. Processing is capped at `batchSize` rows + * per cycle to keep individual transactions short. + * + * Phase 2 – Purge + * Within the same cycle, rows in `notification_archive` whose `archived_at` + * is older than `deleteAfterMs` are permanently deleted (when deleteAfterMs > 0). + * + * Both phases run inside the same `setInterval` tick so that the full + * retention policy is applied atomically per cycle. + */ +import * as fs from 'fs'; +import * as path from 'path'; +import { Database } from '../database/database'; +import { ArchiveConfig } from './archive-config'; +import { ArchiveStore } from './archive-store'; +import logger from '../utils/logger'; + +/** Shape of the raw SQLite row from scheduled_notifications. */ +interface NotificationRow { + id: number; + payload: string; + notification_type: string; + target_recipient: string; + execute_at: string; + created_at: string; + processing_completed_at: string | null; + status: string; + retry_count: number; + last_error: string | null; + event_id: string | null; + contract_address: string | null; + metadata: string | null; +} + +export interface ArchiveCycleResult { + archived: number; + purged: number; + durationMs: number; +} + +export class ArchiveService { + private timer: ReturnType | null = null; + private readonly store: ArchiveStore; + + constructor( + private readonly db: Database, + private readonly config: ArchiveConfig, + ) { + this.store = new ArchiveStore(db); + } + + /** Ensure the archive schema exists (idempotent). */ + async initialize(): Promise { + const schemaPath = path.join(__dirname, '../database/archive-schema.sql'); + if (!fs.existsSync(schemaPath)) { + throw new Error(`Archive schema not found: ${schemaPath}`); + } + const sql = fs.readFileSync(schemaPath, 'utf-8'); + await this.db.exec(sql); + logger.info('ArchiveService: schema ready'); + } + + start(): void { + if (this.timer) return; + logger.info('ArchiveService started', { + intervalMs: this.config.intervalMs, + archiveAfterMs: this.config.archiveAfterMs, + deleteAfterMs: this.config.deleteAfterMs, + batchSize: this.config.batchSize, + }); + // Run immediately on start, then on the configured interval. + void this.runCycle(); + this.timer = setInterval(() => void this.runCycle(), this.config.intervalMs); + } + + stop(): void { + if (this.timer) { + clearInterval(this.timer); + this.timer = null; + } + logger.info('ArchiveService stopped'); + } + + /** + * Run one full archive + purge cycle. + * Exposed publicly so callers (tests, admin tooling) can trigger on demand. + */ + async runCycle(): Promise { + const t0 = Date.now(); + let archived = 0; + let purged = 0; + + try { + archived = await this._archiveOldNotifications(); + purged = await this._purgeExpiredArchive(); + } catch (err) { + logger.error('ArchiveService: cycle error', { error: err }); + } + + const durationMs = Date.now() - t0; + logger.info('ArchiveService: cycle complete', { archived, purged, durationMs }); + return { archived, purged, durationMs }; + } + + // --------------------------------------------------------------------------- + // Private helpers + // --------------------------------------------------------------------------- + + private async _archiveOldNotifications(): Promise { + const cutoff = new Date(Date.now() - this.config.archiveAfterMs).toISOString(); + + const rows = await this.db.all( + `SELECT id, payload, notification_type, target_recipient, execute_at, + created_at, processing_completed_at, status, retry_count, + last_error, event_id, contract_address, metadata + FROM scheduled_notifications + WHERE status IN ('COMPLETED','FAILED','CANCELLED') + AND processing_completed_at IS NOT NULL + AND processing_completed_at < ? + ORDER BY processing_completed_at ASC + LIMIT ?`, + [cutoff, this.config.batchSize], + ); + + if (rows.length === 0) return 0; + + // Copy to archive, then delete originals — done inside a transaction. + let inserted = 0; + await this.db.transaction(async () => { + inserted = await this.store.insertBatch( + rows.map((r) => ({ + originalId: r.id, + payload: r.payload, + notificationType: r.notification_type, + targetRecipient: r.target_recipient, + executeAt: r.execute_at, + createdAt: r.created_at, + processingCompletedAt: r.processing_completed_at, + status: r.status, + retryCount: r.retry_count, + lastError: r.last_error, + eventId: r.event_id, + contractAddress: r.contract_address, + metadata: r.metadata, + })), + ); + + // Remove originals + const ids = rows.map((r) => r.id); + const placeholders = ids.map(() => '?').join(','); + await this.db.run( + `DELETE FROM scheduled_notifications WHERE id IN (${placeholders})`, + ids, + ); + }); + + logger.info('ArchiveService: archived notifications', { count: inserted, cutoff }); + return inserted; + } + + private async _purgeExpiredArchive(): Promise { + if (!this.config.deleteAfterMs) return 0; + + const cutoff = new Date(Date.now() - this.config.deleteAfterMs).toISOString(); + const purged = await this.store.purgeOlderThan(cutoff); + + if (purged > 0) { + logger.info('ArchiveService: purged expired archive records', { count: purged, cutoff }); + } + return purged; + } +} diff --git a/listener/src/services/archive-store.ts b/listener/src/services/archive-store.ts new file mode 100644 index 0000000..e2d35e1 --- /dev/null +++ b/listener/src/services/archive-store.ts @@ -0,0 +1,209 @@ +/** + * ArchiveStore + * + * Low-level data-access layer for the `notification_archive` table. + * All write operations (insert / purge) are called exclusively by + * ArchiveService; this file is the single source of truth for the + * archive data schema. + */ +import { Database } from '../database/database'; +import { buildPaginationMetadata, normalizePaginationParams } from '../utils/pagination'; + +/** Shape of one row in notification_archive. */ +export interface ArchivedNotification { + id: number; + originalId: number; + payload: string; + notificationType: string; + targetRecipient: string; + executeAt: string; + createdAt: string; + processingCompletedAt: string | null; + status: string; + retryCount: number; + lastError: string | null; + eventId: string | null; + contractAddress: string | null; + metadata: string | null; + archivedAt: string; +} + +/** Raw SQLite row (snake_case). */ +interface ArchiveRow { + id: number; + original_id: number; + payload: string; + notification_type: string; + target_recipient: string; + execute_at: string; + created_at: string; + processing_completed_at: string | null; + status: string; + retry_count: number; + last_error: string | null; + event_id: string | null; + contract_address: string | null; + metadata: string | null; + archived_at: string; +} + +export interface ArchiveQueryOptions { + limit?: number; + offset?: number; + status?: string; + contractAddress?: string; + startDate?: string; + endDate?: string; +} + +export interface PaginatedArchiveResponse { + records: ArchivedNotification[]; + total: number; + limit: number; + offset: number; + itemCount: number; + totalPages: number; +} + +function mapRow(row: ArchiveRow): ArchivedNotification { + return { + id: row.id, + originalId: row.original_id, + payload: row.payload, + notificationType: row.notification_type, + targetRecipient: row.target_recipient, + executeAt: row.execute_at, + createdAt: row.created_at, + processingCompletedAt: row.processing_completed_at, + status: row.status, + retryCount: row.retry_count, + lastError: row.last_error, + eventId: row.event_id, + contractAddress: row.contract_address, + metadata: row.metadata, + archivedAt: row.archived_at, + }; +} + +export class ArchiveStore { + constructor(private readonly db: Database) {} + + /** + * Insert a batch of completed/failed/cancelled notifications into the + * archive. Returns the number of rows inserted. + */ + async insertBatch( + rows: Array<{ + originalId: number; + payload: string; + notificationType: string; + targetRecipient: string; + executeAt: string; + createdAt: string; + processingCompletedAt: string | null; + status: string; + retryCount: number; + lastError: string | null; + eventId: string | null; + contractAddress: string | null; + metadata: string | null; + }>, + ): Promise { + if (rows.length === 0) return 0; + + let inserted = 0; + for (const r of rows) { + await this.db.run( + `INSERT INTO notification_archive + (original_id, payload, notification_type, target_recipient, + execute_at, created_at, processing_completed_at, + status, retry_count, last_error, event_id, contract_address, metadata) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)`, + [ + r.originalId, + r.payload, + r.notificationType, + r.targetRecipient, + r.executeAt, + r.createdAt, + r.processingCompletedAt, + r.status, + r.retryCount, + r.lastError, + r.eventId, + r.contractAddress, + r.metadata, + ], + ); + inserted++; + } + return inserted; + } + + /** Delete archive rows whose `archived_at` is older than `cutoff`. */ + async purgeOlderThan(cutoff: string): Promise { + const result = await this.db.run( + `DELETE FROM notification_archive WHERE archived_at < ?`, + [cutoff], + ); + return result.changes; + } + + /** Paginated query for audit retrieval. */ + async query(options: ArchiveQueryOptions): Promise { + const { limit, offset } = normalizePaginationParams(options.limit, options.offset); + + const conditions: string[] = []; + const params: unknown[] = []; + + if (options.status) { + conditions.push('status = ?'); + params.push(options.status); + } + if (options.contractAddress) { + conditions.push('contract_address = ?'); + params.push(options.contractAddress); + } + if (options.startDate) { + conditions.push('archived_at >= ?'); + params.push(options.startDate); + } + if (options.endDate) { + conditions.push('archived_at <= ?'); + params.push(options.endDate); + } + + const where = conditions.length ? `WHERE ${conditions.join(' AND ')}` : ''; + + const countRow = await this.db.get<{ count: number }>( + `SELECT COUNT(*) as count FROM notification_archive ${where}`, + params, + ); + const total = countRow?.count ?? 0; + + const rows = await this.db.all( + `SELECT * FROM notification_archive ${where} + ORDER BY archived_at DESC LIMIT ? OFFSET ?`, + [...params, limit, offset], + ); + + const pagination = buildPaginationMetadata(total, limit, offset); + return { + records: rows.map(mapRow), + total, + limit: pagination.limit, + offset: pagination.offset, + itemCount: pagination.itemCount, + totalPages: pagination.totalPages, + }; + } + + /** Fetch a single archived record by its archive-table PK. */ + async getById(id: number): Promise { + const row = await this.db.get( + `SELECT * FROM notification_archive WHERE id = ?`, + [id], + ); + return row ? mapRow(row) : null; + } +} diff --git a/listener/src/services/archive.test.ts b/listener/src/services/archive.test.ts new file mode 100644 index 0000000..9d306e9 --- /dev/null +++ b/listener/src/services/archive.test.ts @@ -0,0 +1,481 @@ +/** + * Tests for the notification archiving feature: + * - loadArchiveConfig (config loading from env vars) + * - ArchiveStore (data-access layer) + * - ArchiveService (archive + purge cycle) + * - handleArchiveRequest (HTTP API handler) + */ + +import { loadArchiveConfig } from '../services/archive-config'; +import { ArchiveStore } from '../services/archive-store'; +import { ArchiveService } from '../services/archive-service'; +import { handleArchiveRequest } from '../api/archive-api'; + +// --------------------------------------------------------------------------- +// Minimal in-memory Database stub +// --------------------------------------------------------------------------- + +interface RunResult { lastID: number; changes: number } + +class MemoryDb { + private tables: Record[]> = { + scheduled_notifications: [], + notification_archive: [], + }; + private nextId = 1; + + async run(sql: string, params: unknown[] = []): Promise { + const s = sql.trim().toUpperCase(); + + if (s.startsWith('BEGIN') || s.startsWith('COMMIT') || s.startsWith('ROLLBACK')) { + return { lastID: 0, changes: 0 }; + } + + if (s.startsWith('INSERT INTO NOTIFICATION_ARCHIVE')) { + const row: Record = { + id: this.nextId++, + original_id: params[0], + payload: params[1], + notification_type: params[2], + target_recipient: params[3], + execute_at: params[4], + created_at: params[5], + processing_completed_at: params[6], + status: params[7], + retry_count: params[8], + last_error: params[9], + event_id: params[10], + contract_address: params[11], + metadata: params[12], + archived_at: new Date().toISOString(), + }; + this.tables.notification_archive.push(row); + return { lastID: row.id as number, changes: 1 }; + } + + if (s.startsWith('DELETE FROM SCHEDULED_NOTIFICATIONS WHERE ID IN')) { + const before = this.tables.scheduled_notifications.length; + this.tables.scheduled_notifications = this.tables.scheduled_notifications.filter( + (r) => !params.includes((r as any).id), + ); + return { lastID: 0, changes: before - this.tables.scheduled_notifications.length }; + } + + if (s.startsWith('DELETE FROM NOTIFICATION_ARCHIVE WHERE ARCHIVED_AT <')) { + const cutoff = params[0] as string; + const before = this.tables.notification_archive.length; + this.tables.notification_archive = this.tables.notification_archive.filter( + (r) => (r as any).archived_at >= cutoff, + ); + return { lastID: 0, changes: before - this.tables.notification_archive.length }; + } + + return { lastID: 0, changes: 0 }; + } + + async get(sql: string, params: unknown[] = []): Promise { + const s = sql.trim().toUpperCase(); + + if (s.includes('COUNT(*) AS COUNT FROM NOTIFICATION_ARCHIVE')) { + return { count: this.tables.notification_archive.length } as unknown as T; + } + if (s.includes('* FROM NOTIFICATION_ARCHIVE WHERE ID =')) { + const id = params[0] as number; + const row = this.tables.notification_archive.find((r) => (r as any).id === id); + return row as unknown as T | undefined; + } + return undefined; + } + + async all(sql: string, params: unknown[] = []): Promise { + const s = sql.trim().toUpperCase(); + + if (s.includes('FROM SCHEDULED_NOTIFICATIONS')) { + const cutoff = params[0] as string; + const limit = params[1] as number; + const rows = this.tables.scheduled_notifications.filter( + (r) => { + const row = r as any; + return ( + ['COMPLETED', 'FAILED', 'CANCELLED'].includes(row.status) && + row.processing_completed_at != null && + row.processing_completed_at < cutoff + ); + }, + ).slice(0, limit); + return rows as unknown as T[]; + } + + if (s.includes('FROM NOTIFICATION_ARCHIVE')) { + return this.tables.notification_archive as unknown as T[]; + } + + return []; + } + + async exec(_sql: string): Promise { /* no-op for schema ddl */ } + + async transaction(cb: () => Promise): Promise { await cb(); } + + // Test helpers + seedScheduledNotification(overrides: Partial> = {}): void { + this.tables.scheduled_notifications.push({ + id: this.nextId++, + payload: '{}', + notification_type: 'discord', + target_recipient: 'test-user', + execute_at: new Date(Date.now() - 10 * 24 * 60 * 60 * 1000).toISOString(), + created_at: new Date(Date.now() - 10 * 24 * 60 * 60 * 1000).toISOString(), + processing_completed_at: new Date(Date.now() - 8 * 24 * 60 * 60 * 1000).toISOString(), + status: 'COMPLETED', + retry_count: 0, + last_error: null, + event_id: null, + contract_address: null, + metadata: null, + ...overrides, + }); + } + + archiveCount(): number { + return this.tables.notification_archive.length; + } + + scheduledCount(): number { + return this.tables.scheduled_notifications.length; + } +} + +// --------------------------------------------------------------------------- +// 1. loadArchiveConfig +// --------------------------------------------------------------------------- + +describe('loadArchiveConfig', () => { + const ORIGINAL = { ...process.env }; + + afterEach(() => { Object.assign(process.env, ORIGINAL); }); + + it('returns defaults when no env vars are set', () => { + delete process.env.ARCHIVE_ENABLED; + delete process.env.ARCHIVE_INTERVAL_MS; + delete process.env.ARCHIVE_AFTER_MS; + delete process.env.ARCHIVE_DELETE_AFTER_MS; + delete process.env.ARCHIVE_BATCH_SIZE; + + const cfg = loadArchiveConfig(); + expect(cfg.enabled).toBe(true); + expect(cfg.intervalMs).toBe(6 * 60 * 60 * 1000); + expect(cfg.archiveAfterMs).toBe(7 * 24 * 60 * 60 * 1000); + expect(cfg.deleteAfterMs).toBe(90 * 24 * 60 * 60 * 1000); + expect(cfg.batchSize).toBe(500); + }); + + it('respects ARCHIVE_ENABLED=false', () => { + process.env.ARCHIVE_ENABLED = 'false'; + expect(loadArchiveConfig().enabled).toBe(false); + }); + + it('parses custom integer env vars', () => { + process.env.ARCHIVE_INTERVAL_MS = '3600000'; + process.env.ARCHIVE_AFTER_MS = '86400000'; + process.env.ARCHIVE_DELETE_AFTER_MS = '0'; + process.env.ARCHIVE_BATCH_SIZE = '100'; + + const cfg = loadArchiveConfig(); + expect(cfg.intervalMs).toBe(3_600_000); + expect(cfg.archiveAfterMs).toBe(86_400_000); + expect(cfg.deleteAfterMs).toBe(0); + expect(cfg.batchSize).toBe(100); + }); + + it('falls back to default for invalid integer', () => { + process.env.ARCHIVE_BATCH_SIZE = 'not-a-number'; + expect(loadArchiveConfig().batchSize).toBe(500); + }); +}); + +// --------------------------------------------------------------------------- +// 2. ArchiveStore +// --------------------------------------------------------------------------- + +describe('ArchiveStore', () => { + let db: MemoryDb; + let store: ArchiveStore; + + beforeEach(() => { + db = new MemoryDb(); + store = new ArchiveStore(db as any); + }); + + it('insertBatch returns 0 for empty input', async () => { + const n = await store.insertBatch([]); + expect(n).toBe(0); + }); + + it('insertBatch inserts all rows', async () => { + const n = await store.insertBatch([ + { + originalId: 1, + payload: '{}', + notificationType: 'discord', + targetRecipient: 'u1', + executeAt: '2024-01-01T00:00:00.000Z', + createdAt: '2024-01-01T00:00:00.000Z', + processingCompletedAt: '2024-01-02T00:00:00.000Z', + status: 'COMPLETED', + retryCount: 0, + lastError: null, + eventId: null, + contractAddress: null, + metadata: null, + }, + { + originalId: 2, + payload: '{"key":"val"}', + notificationType: 'email', + targetRecipient: 'u2', + executeAt: '2024-01-01T00:00:00.000Z', + createdAt: '2024-01-01T00:00:00.000Z', + processingCompletedAt: null, + status: 'FAILED', + retryCount: 3, + lastError: 'timeout', + eventId: 'evt-1', + contractAddress: 'CABC', + metadata: null, + }, + ]); + expect(n).toBe(2); + expect(db.archiveCount()).toBe(2); + }); + + it('purgeOlderThan removes rows before cutoff', async () => { + // Seed archive with one old row (override archived_at via direct table access) + const internalDb = db as any; + internalDb.tables.notification_archive.push({ + id: 99, + original_id: 10, + archived_at: '2020-01-01T00:00:00.000Z', + }); + + const purged = await store.purgeOlderThan('2021-01-01T00:00:00.000Z'); + expect(purged).toBe(1); + expect(db.archiveCount()).toBe(0); + }); + + it('getById returns null for missing id', async () => { + const result = await store.getById(9999); + expect(result).toBeNull(); + }); + + it('query returns paginated results', async () => { + // Seed a couple of archive rows + for (let i = 1; i <= 3; i++) { + (db as any).tables.notification_archive.push({ + id: i, + original_id: i, + payload: '{}', + notification_type: 'discord', + target_recipient: 'u', + execute_at: '', + created_at: '', + processing_completed_at: null, + status: 'COMPLETED', + retry_count: 0, + last_error: null, + event_id: null, + contract_address: null, + metadata: null, + archived_at: new Date().toISOString(), + }); + } + const result = await store.query({ limit: 10, offset: 0 }); + expect(result.total).toBe(3); + expect(result.records).toHaveLength(3); + }); +}); + +// --------------------------------------------------------------------------- +// 3. ArchiveService +// --------------------------------------------------------------------------- + +describe('ArchiveService', () => { + let db: MemoryDb; + let service: ArchiveService; + + const cfg = { + enabled: true, + intervalMs: 60_000, + archiveAfterMs: 7 * 24 * 60 * 60 * 1000, // 7 days + deleteAfterMs: 90 * 24 * 60 * 60 * 1000, // 90 days + batchSize: 500, + }; + + beforeEach(() => { + db = new MemoryDb(); + service = new ArchiveService(db as any, cfg); + }); + + it('archives old completed notifications', async () => { + db.seedScheduledNotification({ status: 'COMPLETED' }); + db.seedScheduledNotification({ status: 'FAILED' }); + expect(db.scheduledCount()).toBe(2); + + const result = await service.runCycle(); + + expect(result.archived).toBe(2); + expect(db.scheduledCount()).toBe(0); + expect(db.archiveCount()).toBe(2); + }); + + it('does not archive PENDING notifications', async () => { + db.seedScheduledNotification({ status: 'PENDING', processing_completed_at: null }); + const result = await service.runCycle(); + expect(result.archived).toBe(0); + expect(db.scheduledCount()).toBe(1); + }); + + it('does not archive recently completed notifications', async () => { + // Completed only 1 day ago — below the 7-day threshold + db.seedScheduledNotification({ + status: 'COMPLETED', + processing_completed_at: new Date(Date.now() - 1 * 24 * 60 * 60 * 1000).toISOString(), + }); + const result = await service.runCycle(); + expect(result.archived).toBe(0); + }); + + it('purges archive rows older than deleteAfterMs', async () => { + // Manually plant an "old" archive row + (db as any).tables.notification_archive.push({ + id: 1, + original_id: 100, + archived_at: new Date(Date.now() - 91 * 24 * 60 * 60 * 1000).toISOString(), + }); + const result = await service.runCycle(); + expect(result.purged).toBe(1); + expect(db.archiveCount()).toBe(0); + }); + + it('skips purge when deleteAfterMs is 0', async () => { + const noPurgeSvc = new ArchiveService(db as any, { ...cfg, deleteAfterMs: 0 }); + (db as any).tables.notification_archive.push({ + id: 1, + original_id: 100, + archived_at: new Date(0).toISOString(), + }); + const result = await noPurgeSvc.runCycle(); + expect(result.purged).toBe(0); + expect(db.archiveCount()).toBe(1); + }); + + it('start/stop manages the interval without throwing', () => { + jest.useFakeTimers(); + service.start(); + service.stop(); + jest.useRealTimers(); + }); +}); + +// --------------------------------------------------------------------------- +// 4. handleArchiveRequest (HTTP handler) +// --------------------------------------------------------------------------- + +function makeRes() { + const res = { + _status: 0, + _body: '', + writeHead(status: number) { this._status = status; }, + end(body: string) { this._body = body; }, + }; + return res; +} + +function makeReq(method: string, url: string) { + return { method, url } as any; +} + +describe('handleArchiveRequest', () => { + let db: MemoryDb; + let store: ArchiveStore; + + beforeEach(() => { + db = new MemoryDb(); + store = new ArchiveStore(db as any); + }); + + it('returns false for non-archive routes', async () => { + const req = makeReq('GET', '/api/events'); + const res = makeRes(); + const handled = await handleArchiveRequest(req, res as any, { store }, 'req-1'); + expect(handled).toBe(false); + }); + + it('GET /api/archive returns paginated results', async () => { + const req = makeReq('GET', '/api/archive'); + const res = makeRes(); + const handled = await handleArchiveRequest(req, res as any, { store }, 'req-2'); + expect(handled).toBe(true); + expect(res._status).toBe(200); + const body = JSON.parse(res._body); + expect(body).toHaveProperty('records'); + expect(body).toHaveProperty('total'); + }); + + it('GET /api/archive/:id returns 404 for unknown id', async () => { + const req = makeReq('GET', '/api/archive/999'); + const res = makeRes(); + const handled = await handleArchiveRequest(req, res as any, { store }, 'req-3'); + expect(handled).toBe(true); + expect(res._status).toBe(404); + }); + + it('GET /api/archive/:id returns the record when it exists', async () => { + // Plant a row in the in-memory archive table + (db as any).tables.notification_archive.push({ + id: 42, + original_id: 1, + payload: '{}', + notification_type: 'discord', + target_recipient: 'u', + execute_at: '', + created_at: '', + processing_completed_at: null, + status: 'COMPLETED', + retry_count: 0, + last_error: null, + event_id: null, + contract_address: null, + metadata: null, + archived_at: new Date().toISOString(), + }); + const req = makeReq('GET', '/api/archive/42'); + const res = makeRes(); + const handled = await handleArchiveRequest(req, res as any, { store }, 'req-4'); + expect(handled).toBe(true); + expect(res._status).toBe(200); + expect(JSON.parse(res._body).id).toBe(42); + }); + + it('POST /api/archive/run returns 503 when service not provided', async () => { + const req = makeReq('POST', '/api/archive/run'); + const res = makeRes(); + const handled = await handleArchiveRequest(req, res as any, { store, service: null }, 'req-5'); + expect(handled).toBe(true); + expect(res._status).toBe(503); + }); + + it('POST /api/archive/run triggers cycle when service is provided', async () => { + const fakeService = { + runCycle: jest.fn().mockResolvedValue({ archived: 1, purged: 0, durationMs: 5 }), + } as any; + const req = makeReq('POST', '/api/archive/run'); + const res = makeRes(); + const handled = await handleArchiveRequest(req, res as any, { store, service: fakeService }, 'req-6'); + expect(handled).toBe(true); + expect(res._status).toBe(200); + expect(fakeService.runCycle).toHaveBeenCalledTimes(1); + const body = JSON.parse(res._body); + expect(body.archived).toBe(1); + }); +}); diff --git a/listener/src/services/notification-retry-queue.ts b/listener/src/services/notification-retry-queue.ts index 895d3bd..e24099c 100644 --- a/listener/src/services/notification-retry-queue.ts +++ b/listener/src/services/notification-retry-queue.ts @@ -7,6 +7,8 @@ import { NotificationType } from '../types/scheduled-notification'; export interface RetryQueueOptions { baseDelayMs?: number; + multiplier?: number; + jitter?: boolean; maxRetries?: number; processIntervalMs?: number; } @@ -21,6 +23,8 @@ interface RetryItem { const DEFAULTS = { baseDelayMs: 5_000, + multiplier: 2, + jitter: true, maxRetries: 5, processIntervalMs: 5_000, }; @@ -35,6 +39,8 @@ export class NotificationRetryQueue { private queue: RetryItem[] = []; private readonly queuedFingerprints: Set = new Set(); private readonly baseDelayMs: number; + private readonly multiplier: number; + private readonly jitter: boolean; private readonly maxRetries: number; private readonly processIntervalMs: number; private timer: ReturnType | null = null; @@ -44,6 +50,8 @@ export class NotificationRetryQueue { constructor(notificationFn: NotificationFn, options?: RetryQueueOptions) { this.notificationFn = notificationFn; this.baseDelayMs = options?.baseDelayMs ?? DEFAULTS.baseDelayMs; + this.multiplier = options?.multiplier ?? DEFAULTS.multiplier; + this.jitter = options?.jitter ?? DEFAULTS.jitter; this.maxRetries = options?.maxRetries ?? DEFAULTS.maxRetries; this.processIntervalMs = options?.processIntervalMs ?? DEFAULTS.processIntervalMs; this.analytics = getNotificationAnalyticsAggregator(); @@ -181,7 +189,8 @@ export class NotificationRetryQueue { } private calculateDelay(retryCount: number): number { - return this.baseDelayMs * Math.pow(2, retryCount); + const base = this.baseDelayMs * Math.pow(this.multiplier, retryCount); + return this.jitter ? base * (0.5 + Math.random() * 0.5) : base; } } diff --git a/listener/src/services/retry-scheduler.test.ts b/listener/src/services/retry-scheduler.test.ts new file mode 100644 index 0000000..6b49d9f --- /dev/null +++ b/listener/src/services/retry-scheduler.test.ts @@ -0,0 +1,210 @@ +import { calculateBackoffDelay, RetryScheduler, RETRY_SCHEDULER_DEFAULTS } from './retry-scheduler'; +import { NotificationStatus, NotificationType } from '../types/scheduled-notification'; + +jest.mock('../utils/logger', () => ({ + __esModule: true, + default: { info: jest.fn(), warn: jest.fn(), error: jest.fn(), debug: jest.fn() }, +})); +jest.mock('../utils/request-id', () => ({ generateRequestId: () => 'test-req-id' })); + +// ─── helpers ──────────────────────────────────────────────────────────────── + +function makeRepo(overrides: Partial> = {}) { + return { ...buildMockRepo(), ...overrides }; +} + +function buildMockRepo() { + return { + recoverStaleLocks: jest.fn().mockResolvedValue(0), + fetchDueRetries: jest.fn().mockResolvedValue([]), + markAsCompleted: jest.fn().mockResolvedValue(undefined), + markAsFailedOrRetry: jest.fn().mockResolvedValue(undefined), + logExecution: jest.fn().mockResolvedValue(undefined), + // other methods not exercised by RetryScheduler + create: jest.fn(), + fetchAndLockPendingNotifications: jest.fn(), + getById: jest.fn(), + cancel: jest.fn(), + getStats: jest.fn(), + fetchDueRetriesMock: jest.fn(), + } as any; +} + +function makeNotification(overrides: Record = {}) { + return { + id: 1, + payload: JSON.stringify({ event: {}, contractConfig: {} }), + notificationType: NotificationType.DISCORD, + targetRecipient: 'user-1', + executeAt: new Date(), + status: NotificationStatus.PROCESSING, + retryCount: 1, + maxRetries: 3, + priority: 5, + ...overrides, + }; +} + +// ─── calculateBackoffDelay ─────────────────────────────────────────────────── + +describe('calculateBackoffDelay', () => { + it('returns base * multiplier^attempt without jitter', () => { + expect(calculateBackoffDelay(0, 1000, 2, 60_000, false)).toBe(1000); + expect(calculateBackoffDelay(1, 1000, 2, 60_000, false)).toBe(2000); + expect(calculateBackoffDelay(2, 1000, 2, 60_000, false)).toBe(4000); + expect(calculateBackoffDelay(3, 1000, 2, 60_000, false)).toBe(8000); + }); + + it('respects maxDelayMs cap', () => { + expect(calculateBackoffDelay(10, 1000, 2, 5000, false)).toBe(5000); + }); + + it('with jitter returns value within [75 %, 125 %] of base delay', () => { + const base = calculateBackoffDelay(1, 1000, 2, 60_000, false); // 2000 + for (let i = 0; i < 20; i++) { + const jittered = calculateBackoffDelay(1, 1000, 2, 60_000, true); + expect(jittered).toBeGreaterThanOrEqual(base * 0.75); + expect(jittered).toBeLessThanOrEqual(base * 1.25); + } + }); + + it('uses custom multiplier', () => { + expect(calculateBackoffDelay(2, 1000, 3, 100_000, false)).toBe(9000); // 1000*3^2 + }); +}); + +// ─── RetryScheduler ────────────────────────────────────────────────────────── + +describe('RetryScheduler', () => { + beforeEach(() => jest.clearAllMocks()); + + describe('start / stop', () => { + it('does not start when disabled', async () => { + const repo = makeRepo(); + const scheduler = new RetryScheduler(repo, { enabled: false }); + await scheduler.start(); + expect(repo.recoverStaleLocks).not.toHaveBeenCalled(); + }); + + it('recovers stale locks on start', async () => { + const repo = makeRepo(); + const scheduler = new RetryScheduler(repo, { enabled: true, pollIntervalMs: 999_999 }); + await scheduler.start(); + expect(repo.recoverStaleLocks).toHaveBeenCalledTimes(1); + await scheduler.stop(); + }); + + it('calling start twice is idempotent', async () => { + const repo = makeRepo(); + const scheduler = new RetryScheduler(repo, { enabled: true, pollIntervalMs: 999_999 }); + await scheduler.start(); + await scheduler.start(); + expect(repo.recoverStaleLocks).toHaveBeenCalledTimes(1); + await scheduler.stop(); + }); + }); + + describe('runOnce – success path', () => { + it('marks notification as completed when delivery succeeds', async () => { + const notification = makeNotification(); + const repo = makeRepo({ fetchDueRetries: jest.fn().mockResolvedValue([notification]) }); + const discordService = { sendEventNotification: jest.fn().mockResolvedValue(true) } as any; + + const scheduler = new RetryScheduler(repo, { ...RETRY_SCHEDULER_DEFAULTS }, discordService); + await scheduler.runOnce(); + + expect(discordService.sendEventNotification).toHaveBeenCalledTimes(1); + expect(repo.markAsCompleted).toHaveBeenCalledWith(1, 'test-req-id'); + expect(repo.logExecution).toHaveBeenCalledWith( + expect.objectContaining({ status: 'SUCCESS', scheduledNotificationId: 1 }) + ); + }); + }); + + describe('runOnce – failure path', () => { + it('schedules next retry with backoff when delivery fails and retries remain', async () => { + const notification = makeNotification({ retryCount: 1, maxRetries: 3 }); + const repo = makeRepo({ fetchDueRetries: jest.fn().mockResolvedValue([notification]) }); + const discordService = { sendEventNotification: jest.fn().mockResolvedValue(false) } as any; + + const scheduler = new RetryScheduler( + repo, + { ...RETRY_SCHEDULER_DEFAULTS, baseDelayMs: 1000, multiplier: 2, jitter: false }, + discordService + ); + await scheduler.runOnce(); + + expect(repo.markAsFailedOrRetry).toHaveBeenCalledWith( + 1, + expect.any(Error), + 1, // currentRetryCount = notification.retryCount + 3, + expect.any(Date) // nextRetryAt must be set + ); + + const [, , , , nextRetryAt] = (repo.markAsFailedOrRetry as jest.Mock).mock.calls[0]; + expect(nextRetryAt).toBeInstanceOf(Date); + expect(nextRetryAt.getTime()).toBeGreaterThan(Date.now()); + }); + + it('marks notification as permanently failed when max retries exhausted', async () => { + const notification = makeNotification({ retryCount: 3, maxRetries: 3 }); + const repo = makeRepo({ fetchDueRetries: jest.fn().mockResolvedValue([notification]) }); + const discordService = { sendEventNotification: jest.fn().mockResolvedValue(false) } as any; + + const scheduler = new RetryScheduler(repo, { ...RETRY_SCHEDULER_DEFAULTS }, discordService); + await scheduler.runOnce(); + + expect(repo.markAsFailedOrRetry).toHaveBeenCalledWith( + 1, + expect.any(Error), + 3, + 3, + undefined // no nextRetryAt when permanently failed + ); + expect(repo.logExecution).toHaveBeenCalledWith( + expect.objectContaining({ status: 'FAILED' }) + ); + }); + + it('logs error and does not throw when delivery throws', async () => { + const logger = jest.requireMock('../utils/logger').default; + const notification = makeNotification({ retryCount: 1, maxRetries: 3 }); + const repo = makeRepo({ fetchDueRetries: jest.fn().mockResolvedValue([notification]) }); + const discordService = { + sendEventNotification: jest.fn().mockRejectedValue(new Error('network timeout')), + } as any; + + const scheduler = new RetryScheduler(repo, { ...RETRY_SCHEDULER_DEFAULTS }, discordService); + await expect(scheduler.runOnce()).resolves.not.toThrow(); + expect(logger.warn).toHaveBeenCalledWith( + 'Retry failed, scheduling next attempt', + expect.objectContaining({ id: 1 }) + ); + }); + }); + + describe('duplicate prevention (distributed lock)', () => { + it('does not process the same notification twice in the same cycle', async () => { + // fetchDueRetries returns same notification twice (simulates two schedulers racing) + const notification = makeNotification(); + let callCount = 0; + const repo = makeRepo({ + fetchDueRetries: jest.fn().mockImplementation(() => { + callCount++; + // Only first call returns the notification; second call returns empty (lock held) + return Promise.resolve(callCount === 1 ? [notification] : []); + }), + }); + const discordService = { sendEventNotification: jest.fn().mockResolvedValue(true) } as any; + + const s1 = new RetryScheduler(repo, { ...RETRY_SCHEDULER_DEFAULTS }, discordService); + const s2 = new RetryScheduler(repo, { ...RETRY_SCHEDULER_DEFAULTS }, discordService); + + await Promise.all([s1.runOnce(), s2.runOnce()]); + + // Delivery should have been called only once (second scheduler got empty batch) + expect(discordService.sendEventNotification).toHaveBeenCalledTimes(1); + }); + }); +}); diff --git a/listener/src/services/retry-scheduler.ts b/listener/src/services/retry-scheduler.ts new file mode 100644 index 0000000..2c37fb9 --- /dev/null +++ b/listener/src/services/retry-scheduler.ts @@ -0,0 +1,270 @@ +import { v4 as uuidv4 } from 'uuid'; +import logger from '../utils/logger'; +import { generateRequestId } from '../utils/request-id'; +import { ScheduledNotificationRepository } from './scheduled-notification-repository'; +import { ScheduledNotification, NotificationStatus } from '../types/scheduled-notification'; +import { DiscordNotificationService } from './discord-notification'; + +export interface RetrySchedulerConfig { + /** Whether the scheduler is enabled. */ + enabled: boolean; + /** How often to poll for due retries (ms). */ + pollIntervalMs: number; + /** How long to hold a distributed lock before it is considered stale (ms). */ + lockTimeoutMs: number; + /** Unique identifier for this scheduler instance (used in distributed locking). */ + processorId?: string; + /** Maximum notifications to process per poll cycle. */ + batchSize: number; + /** Backoff base delay (ms). Delay = base * multiplier^attempt */ + baseDelayMs: number; + /** Backoff multiplier. Default: 2. */ + multiplier: number; + /** Maximum delay cap (ms). Default: 1 hour. */ + maxDelayMs: number; + /** Add ±25 % random jitter to prevent thundering herd. Default: true. */ + jitter: boolean; +} + +export const RETRY_SCHEDULER_DEFAULTS: RetrySchedulerConfig = { + enabled: true, + pollIntervalMs: 15_000, + lockTimeoutMs: 60_000, + batchSize: 10, + baseDelayMs: 5_000, + multiplier: 2, + maxDelayMs: 60 * 60 * 1_000, + jitter: true, +}; + +/** + * Calculates exponential backoff delay with optional jitter. + * + * Formula: delay = min(base * multiplier^attempt, maxDelayMs) + * Jitter: delay *= (0.75 + Math.random() * 0.5) → ±25 % + */ +export function calculateBackoffDelay( + attempt: number, + baseDelayMs: number, + multiplier: number, + maxDelayMs: number, + jitter: boolean +): number { + const raw = Math.min(baseDelayMs * Math.pow(multiplier, attempt), maxDelayMs); + return jitter ? raw * (0.75 + Math.random() * 0.5) : raw; +} + +/** + * DB-backed retry scheduler. + * + * On each poll cycle it: + * 1. Atomically claims PENDING notifications with retry_count > 0 that are due + * (next_retry_at ≤ now) using the repository's pessimistic lock. + * 2. Re-executes the notification delivery. + * 3. On success → marks COMPLETED. + * 4. On failure → if retries remain, computes next backoff delay, writes + * next_retry_at, and resets status to PENDING. Otherwise marks FAILED. + * + * The distributed lock (processor_id + lock_expires_at) prevents two concurrent + * scheduler instances from retrying the same notification. + */ +export class RetryScheduler { + private readonly config: RetrySchedulerConfig; + private readonly processorId: string; + private repository: ScheduledNotificationRepository; + private discordService: DiscordNotificationService | null; + private timer: NodeJS.Timeout | null = null; + private running = false; + + constructor( + repository: ScheduledNotificationRepository, + config: Partial = {}, + discordService?: DiscordNotificationService | null + ) { + this.config = { ...RETRY_SCHEDULER_DEFAULTS, ...config }; + this.processorId = this.config.processorId ?? `retry-${uuidv4()}`; + this.repository = repository; + this.discordService = discordService ?? null; + } + + async start(): Promise { + if (this.running) { + logger.warn('RetryScheduler already running', { processorId: this.processorId }); + return; + } + if (!this.config.enabled) { + logger.info('RetryScheduler is disabled'); + return; + } + + this.running = true; + logger.info('RetryScheduler started', { + processorId: this.processorId, + pollIntervalMs: this.config.pollIntervalMs, + baseDelayMs: this.config.baseDelayMs, + multiplier: this.config.multiplier, + maxDelayMs: this.config.maxDelayMs, + jitter: this.config.jitter, + }); + + await this.repository.recoverStaleLocks(); + this.scheduleNextPoll(); + } + + async stop(): Promise { + if (!this.running) return; + this.running = false; + if (this.timer) { + clearTimeout(this.timer); + this.timer = null; + } + logger.info('RetryScheduler stopped', { processorId: this.processorId }); + } + + /** Exposed for testing. */ + async runOnce(): Promise { + await this.processDueRetries(); + } + + private scheduleNextPoll(): void { + if (!this.running) return; + this.timer = setTimeout(async () => { + await this.processDueRetries(); + this.scheduleNextPoll(); + }, this.config.pollIntervalMs); + } + + private async processDueRetries(): Promise { + const requestId = generateRequestId(); + + try { + await this.repository.recoverStaleLocks(requestId); + + const notifications = await this.repository.fetchDueRetries( + this.processorId, + this.config.lockTimeoutMs, + this.config.batchSize, + requestId + ); + + if (notifications.length === 0) return; + + logger.info('RetryScheduler processing batch', { + requestId, + processorId: this.processorId, + count: notifications.length, + }); + + for (const notification of notifications) { + await this.processRetry(notification, requestId); + } + } catch (err) { + logger.error('RetryScheduler poll error', { requestId, error: err }); + } + } + + private async processRetry( + notification: ScheduledNotification, + requestId: string + ): Promise { + const attempt = notification.retryCount; // already incremented on prior failure + const startMs = Date.now(); + + logger.info('Retrying notification', { + requestId, + id: notification.id, + type: notification.notificationType, + attempt, + maxRetries: notification.maxRetries, + }); + + try { + const success = await this.deliver(notification, requestId); + const durationMs = Date.now() - startMs; + + if (success) { + await this.repository.markAsCompleted(notification.id!, requestId); + await this.repository.logExecution({ + scheduledNotificationId: notification.id!, + executionAttempt: attempt, + executionTime: new Date(), + status: 'SUCCESS', + durationMs, + }); + logger.info('Retry succeeded', { requestId, id: notification.id, attempt }); + return; + } + + throw new Error('Delivery returned false'); + } catch (err) { + const durationMs = Date.now() - startMs; + const error = err as Error; + const isFinalAttempt = attempt >= notification.maxRetries; + + const nextRetryAt = isFinalAttempt + ? undefined + : new Date( + Date.now() + + calculateBackoffDelay( + attempt, + this.config.baseDelayMs, + this.config.multiplier, + this.config.maxDelayMs, + this.config.jitter + ) + ); + + await this.repository.markAsFailedOrRetry( + notification.id!, + error, + attempt, + notification.maxRetries, + nextRetryAt + ); + + await this.repository.logExecution({ + scheduledNotificationId: notification.id!, + executionAttempt: attempt, + executionTime: new Date(), + status: isFinalAttempt ? 'FAILED' : 'RETRY', + errorMessage: error.message, + durationMs, + }); + + if (isFinalAttempt) { + logger.error('Notification permanently failed after max retries', { + requestId, + id: notification.id, + totalAttempts: attempt, + }); + } else { + logger.warn('Retry failed, scheduling next attempt', { + requestId, + id: notification.id, + attempt, + nextRetryAt: nextRetryAt?.toISOString(), + }); + } + } + } + + private async deliver( + notification: ScheduledNotification, + requestId: string + ): Promise { + const payload = JSON.parse(notification.payload); + + switch (notification.notificationType) { + case 'discord': + if (!this.discordService) throw new Error('Discord service not configured'); + return this.discordService.sendEventNotification( + payload.event, + payload.contractConfig, + `retry-${notification.id}-${requestId}` + ); + + default: + throw new Error(`Unsupported notification type: ${notification.notificationType}`); + } + } +} diff --git a/listener/src/services/scheduled-notification-repository.ts b/listener/src/services/scheduled-notification-repository.ts index 592273e..0d77426 100644 --- a/listener/src/services/scheduled-notification-repository.ts +++ b/listener/src/services/scheduled-notification-repository.ts @@ -215,13 +215,14 @@ export class ScheduledNotificationRepository { } /** - * Mark notification as failed or retry + * Mark notification as failed or retry (sets next_retry_at for backoff scheduling) */ async markAsFailedOrRetry( id: number, error: Error, currentRetryCount: number, - maxRetries: number + maxRetries: number, + nextRetryAt?: Date ): Promise { const isFailed = currentRetryCount >= maxRetries; const newStatus = isFailed ? NotificationStatus.FAILED : NotificationStatus.PENDING; @@ -233,6 +234,7 @@ export class ScheduledNotificationRepository { retry_count = ?, last_error = ?, error_details = ?, + next_retry_at = ?, processing_completed_at = ?, processor_id = NULL, lock_expires_at = NULL @@ -250,6 +252,7 @@ export class ScheduledNotificationRepository { currentRetryCount + 1, error.message, errorDetails, + isFailed ? null : (nextRetryAt?.toISOString() ?? null), isFailed ? new Date().toISOString() : null, id, ]); @@ -259,9 +262,69 @@ export class ScheduledNotificationRepository { newStatus, retryCount: currentRetryCount + 1, maxRetries, + nextRetryAt: nextRetryAt?.toISOString(), }); } + /** + * Fetch PENDING notifications whose next_retry_at is due (or null, meaning immediately schedulable). + * Used by RetryScheduler to pick up failed notifications for re-attempt. + */ + async fetchDueRetries( + processorId: string, + lockTimeoutMs: number, + batchSize: number = 10, + requestId?: string + ): Promise { + const now = new Date(); + const lockExpiresAt = new Date(now.getTime() + lockTimeoutMs); + + const updateSql = ` + UPDATE scheduled_notifications + SET + status = ?, + processor_id = ?, + lock_expires_at = ?, + processing_started_at = ? + WHERE id IN ( + SELECT id FROM scheduled_notifications + WHERE status = ? + AND retry_count > 0 + AND (next_retry_at IS NULL OR next_retry_at <= ?) + ORDER BY priority ASC, next_retry_at ASC + LIMIT ? + ) + `; + + const updateResult = await this.db.run(updateSql, [ + NotificationStatus.PROCESSING, + processorId, + lockExpiresAt.toISOString(), + now.toISOString(), + NotificationStatus.PENDING, + now.toISOString(), + batchSize, + ]); + + if (updateResult.changes === 0) return []; + + const selectSql = ` + SELECT * FROM scheduled_notifications + WHERE processor_id = ? AND status = ? AND lock_expires_at = ? + AND retry_count > 0 + `; + + const rows = await this.db.all(selectSql, [ + processorId, + NotificationStatus.PROCESSING, + lockExpiresAt.toISOString(), + ]); + + logger.info('Fetched due retries', { requestId, count: rows.length, processorId }); + + return rows.map(this.rowToModel.bind(this)); + } + /** * Get notification by ID */ @@ -395,6 +458,7 @@ export class ScheduledNotificationRepository { contractAddress: row.contract_address, priority: row.priority, metadata: row.metadata, + nextRetryAt: row.next_retry_at ? new Date(row.next_retry_at) : null, }; } } diff --git a/listener/src/types/index.ts b/listener/src/types/index.ts index d9d29b6..5349b62 100644 --- a/listener/src/types/index.ts +++ b/listener/src/types/index.ts @@ -15,6 +15,8 @@ export interface DiscordConfig { export interface RetryQueueConfig { baseDelayMs?: number; + multiplier?: number; + jitter?: boolean; maxRetries?: number; } @@ -43,6 +45,7 @@ export interface Config { retryQueue?: RetryQueueConfig; webhookSecrets?: WebhookSecret[]; scheduler?: SchedulerConfig; + retryScheduler?: RetrySchedulerOptions; databasePath?: string; rateLimit?: RateLimitConfig; cleanup?: AppCleanupConfig; @@ -68,3 +71,15 @@ export interface AppCleanupConfig { eventRetentionMs: number; } +export interface RetrySchedulerOptions { + enabled: boolean; + pollIntervalMs: number; + lockTimeoutMs: number; + processorId?: string; + batchSize: number; + baseDelayMs: number; + multiplier: number; + maxDelayMs: number; + jitter: boolean; +} + diff --git a/listener/src/types/scheduled-notification.ts b/listener/src/types/scheduled-notification.ts index f564afc..a521865 100644 --- a/listener/src/types/scheduled-notification.ts +++ b/listener/src/types/scheduled-notification.ts @@ -38,6 +38,8 @@ export interface ScheduledNotification { contractAddress?: string | null; priority: number; metadata?: string | null; // JSON string + /** When the next retry should be attempted (null if not retrying). */ + nextRetryAt?: Date | null; } export interface CreateScheduledNotificationInput { @@ -73,6 +75,7 @@ export interface ScheduledNotificationRow { contract_address: string | null; priority: number; metadata: string | null; + next_retry_at: string | null; } export interface NotificationExecutionLog {