diff --git a/package-lock.json b/package-lock.json index c05b9c09..afd85d2c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10759,7 +10759,7 @@ "graphql-ws": "^6.0.6", "graphql-yoga": "^5.18.0", "ioredis": "^5.4.1", - "jose": "^6.0.0", + "jose": "^4.15.9", "uuid": "^13.0.0", "ws": "^8.18.3", "zod": "^3.24.0" @@ -10817,6 +10817,15 @@ } } }, + "packages/backend/node_modules/jose": { + "version": "4.15.9", + "resolved": "https://registry.npmjs.org/jose/-/jose-4.15.9.tgz", + "integrity": "sha512-1vUQX+IdDMVPj4k8kOxgUqlcK518yluMuGZwqlr44FS1ppZB/5GWh4rZG89erpOBOJjU/OBsnCVFfapsRz6nEA==", + "license": "MIT", + "funding": { + "url": "https://github.com/sponsors/panva" + } + }, "packages/backend/node_modules/zod": { "version": "3.25.76", "resolved": "https://registry.npmjs.org/zod/-/zod-3.25.76.tgz", diff --git a/packages/backend/package.json b/packages/backend/package.json index 1c7f2e45..dc2da09f 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -34,7 +34,7 @@ "graphql-ws": "^6.0.6", "graphql-yoga": "^5.18.0", "ioredis": "^5.4.1", - "jose": "^6.0.0", + "jose": "^4.15.9", "uuid": "^13.0.0", "ws": "^8.18.3", "zod": "^3.24.0" diff --git a/packages/backend/src/__tests__/queue-sync-fixes.test.ts b/packages/backend/src/__tests__/queue-sync-fixes.test.ts new file mode 100644 index 00000000..2286ee3f --- /dev/null +++ b/packages/backend/src/__tests__/queue-sync-fixes.test.ts @@ -0,0 +1,656 @@ +/** + * Tests for queue sync fixes: + * 1. updateQueueOnly - Redis-first approach (fixes version desync) + * 2. addQueueItem - event publishing fix (only publish when item added) + */ +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; +import Redis from 'ioredis'; +import { v4 as uuidv4 } from 'uuid'; +import { roomManager, VersionConflictError } from '../services/room-manager.js'; +import { db } from '../db/client.js'; +import { boardSessions, boardSessionQueues } from '../db/schema.js'; +import { eq } from 'drizzle-orm'; +import type { ClimbQueueItem } from '@boardsesh/shared-schema'; +import { queueMutations } from '../graphql/resolvers/queue/mutations.js'; +import { pubsub } from '../pubsub/index.js'; + +// Mock Redis for testing +const createMockRedis = (): Redis & { _store: Map; _hashes: Map> } => { + const store = new Map(); + const sets = new Map>(); + const hashes = new Map>(); + const sortedSets = new Map>(); + + const mockRedis = { + set: vi.fn(async (key: string, value: string) => { + store.set(key, value); + return 'OK'; + }), + get: vi.fn(async (key: string) => { + return store.get(key) || null; + }), + del: vi.fn(async (...keys: string[]) => { + let count = 0; + for (const key of keys) { + if (store.delete(key)) count++; + if (sets.delete(key)) count++; + if (hashes.delete(key)) count++; + if (sortedSets.delete(key)) count++; + } + return count; + }), + exists: vi.fn(async (key: string) => { + return store.has(key) || hashes.has(key) ? 1 : 0; + }), + expire: vi.fn(async () => 1), + hmset: vi.fn(async (key: string, obj: Record) => { + hashes.set(key, { ...hashes.get(key), ...obj }); + return 'OK'; + }), + hgetall: vi.fn(async (key: string) => { + return hashes.get(key) || {}; + }), + sadd: vi.fn(async (key: string, ...members: string[]) => { + if (!sets.has(key)) sets.set(key, new Set()); + const set = sets.get(key)!; + let count = 0; + for (const member of members) { + if (!set.has(member)) { + set.add(member); + count++; + } + } + return count; + }), + srem: vi.fn(async (key: string, ...members: string[]) => { + const set = sets.get(key); + if (!set) return 0; + let count = 0; + for (const member of members) { + if (set.delete(member)) count++; + } + return count; + }), + zadd: vi.fn(async (key: string, score: number, member: string) => { + if (!sortedSets.has(key)) sortedSets.set(key, []); + const zset = sortedSets.get(key)!; + const existing = zset.findIndex((item) => item.member === member); + if (existing >= 0) { + zset[existing].score = score; + return 0; + } else { + zset.push({ score, member }); + return 1; + } + }), + zrem: vi.fn(async (key: string, member: string) => { + const zset = sortedSets.get(key); + if (!zset) return 0; + const index = zset.findIndex((item) => item.member === member); + if (index >= 0) { + zset.splice(index, 1); + return 1; + } + return 0; + }), + multi: vi.fn(() => { + const commands: Array<() => Promise> = []; + const chainable = { + hmset: (key: string, obj: Record) => { + commands.push(() => mockRedis.hmset(key, obj)); + return chainable; + }, + expire: (_key: string, _seconds: number) => { + commands.push(() => mockRedis.expire(_key, _seconds)); + return chainable; + }, + zadd: (key: string, score: number, member: string) => { + commands.push(() => mockRedis.zadd(key, score, member)); + return chainable; + }, + del: (...keys: string[]) => { + commands.push(() => mockRedis.del(...keys)); + return chainable; + }, + srem: (key: string, ...members: string[]) => { + commands.push(() => mockRedis.srem(key, ...members)); + return chainable; + }, + zrem: (key: string, member: string) => { + commands.push(() => mockRedis.zrem(key, member)); + return chainable; + }, + exec: async () => { + const results = []; + for (const cmd of commands) { + results.push([null, await cmd()]); + } + return results; + }, + }; + return chainable; + }), + eval: vi.fn(async () => 1), + // For test access + _store: store, + _hashes: hashes, + } as unknown as Redis & { _store: Map; _hashes: Map> }; + + return mockRedis; +}; + +const createTestClimb = (uuid?: string): ClimbQueueItem => ({ + uuid: uuid || uuidv4(), + climb: { + uuid: uuidv4(), + setter_username: 'TestSetter', + name: 'Test Climb', + description: 'A test climb', + frames: '{}', + angle: 40, + ascensionist_count: 10, + difficulty: '6A', + quality_average: '3.5', + stars: 3.5, + difficulty_error: '0.5', + litUpHoldsMap: {}, + mirrored: false, + benchmark_difficulty: null, + }, + addedBy: 'test-user', + tickedBy: [], + suggested: false, +}); + +// Helper function to register a client before joining +const registerAndJoinSession = async ( + clientId: string, + sessionId: string, + boardPath: string, + username: string +) => { + roomManager.registerClient(clientId); + return roomManager.joinSession(clientId, sessionId, boardPath, username); +}; + +describe('updateQueueOnly - Redis-first approach', () => { + let mockRedis: Redis & { _store: Map; _hashes: Map> }; + + beforeEach(async () => { + // Create fresh mock Redis for each test + mockRedis = createMockRedis(); + + // Reset room manager and initialize with mock Redis + roomManager.reset(); + await roomManager.initialize(mockRedis); + }); + + afterEach(() => { + vi.clearAllTimers(); + }); + + describe('Reading from Redis', () => { + it('should read current version and sequence from Redis', async () => { + const sessionId = uuidv4(); + const boardPath = '/kilter/1/2/3/40'; + + // Create session + await registerAndJoinSession('client-1', sessionId, boardPath, 'User1'); + + // Update state to get a known version/sequence + const initialState = await roomManager.getQueueState(sessionId); + await roomManager.updateQueueState(sessionId, [createTestClimb()], null, initialState.version); + + // Get state after update + const state = await roomManager.getQueueState(sessionId); + const previousVersion = state.version; + const previousSequence = state.sequence; + + // Call updateQueueOnly + const result = await roomManager.updateQueueOnly(sessionId, [createTestClimb(), createTestClimb()]); + + // Should have incremented version and sequence + expect(result.version).toBe(previousVersion + 1); + expect(result.sequence).toBe(previousSequence + 1); + }); + + it('should fall back to Postgres when Redis is empty', async () => { + const sessionId = uuidv4(); + const boardPath = '/kilter/1/2/3/40'; + + // Create session and update queue to write to Postgres + await registerAndJoinSession('client-1', sessionId, boardPath, 'User1'); + const climb = createTestClimb(); + const initialState = await roomManager.getQueueState(sessionId); + await roomManager.updateQueueState(sessionId, [climb], null, initialState.version); + + // Flush to ensure Postgres has the data + await roomManager.flushPendingWrites(); + + // Clear Redis to simulate empty Redis + mockRedis._hashes.clear(); + + // Reset and reinitialize room manager + roomManager.reset(); + await roomManager.initialize(mockRedis); + + // updateQueueOnly should still work by falling back to Postgres + const result = await roomManager.updateQueueOnly(sessionId, [climb, createTestClimb()]); + + // Should have a valid result (incremented from Postgres values) + expect(result.version).toBeGreaterThan(0); + expect(result.sequence).toBeGreaterThan(0); + expect(result.stateHash).toBeDefined(); + }); + }); + + describe('Writing to Redis', () => { + it('should write updated queue state to Redis immediately', async () => { + const sessionId = uuidv4(); + const boardPath = '/kilter/1/2/3/40'; + + // Create session + await registerAndJoinSession('client-1', sessionId, boardPath, 'User1'); + + const climb1 = createTestClimb(); + const climb2 = createTestClimb(); + + // Call updateQueueOnly + await roomManager.updateQueueOnly(sessionId, [climb1, climb2]); + + // Verify Redis was updated + const redisSession = mockRedis._hashes.get(`boardsesh:session:${sessionId}`); + expect(redisSession).toBeDefined(); + + // Parse the queue from Redis + const redisQueue = JSON.parse(redisSession?.queue || '[]'); + expect(redisQueue).toHaveLength(2); + expect(redisQueue[0].uuid).toBe(climb1.uuid); + expect(redisQueue[1].uuid).toBe(climb2.uuid); + }); + + it('should increment version and sequence on update', async () => { + const sessionId = uuidv4(); + const boardPath = '/kilter/1/2/3/40'; + + // Create session + await registerAndJoinSession('client-1', sessionId, boardPath, 'User1'); + + const climb1 = createTestClimb(); + const climb2 = createTestClimb(); + + // Get initial state + const initialState = await roomManager.getQueueState(sessionId); + + // Call updateQueueOnly + const result = await roomManager.updateQueueOnly(sessionId, [climb1, climb2]); + + // Verify version and sequence incremented + expect(result.version).toBe(initialState.version + 1); + expect(result.sequence).toBe(initialState.sequence + 1); + // Verify stateHash is returned + expect(result.stateHash).toBeDefined(); + expect(result.stateHash.length).toBeGreaterThan(0); + }); + }); + + describe('Version checking (optimistic locking)', () => { + it('should throw VersionConflictError when expectedVersion does not match', async () => { + const sessionId = uuidv4(); + const boardPath = '/kilter/1/2/3/40'; + + // Create session + await registerAndJoinSession('client-1', sessionId, boardPath, 'User1'); + + // Get current state + const state = await roomManager.getQueueState(sessionId); + const currentVersion = state.version; + + // Try to update with wrong version + await expect( + roomManager.updateQueueOnly(sessionId, [createTestClimb()], currentVersion + 100) + ).rejects.toThrow(VersionConflictError); + }); + + it('should succeed when expectedVersion matches current version', async () => { + const sessionId = uuidv4(); + const boardPath = '/kilter/1/2/3/40'; + + // Create session + await registerAndJoinSession('client-1', sessionId, boardPath, 'User1'); + + // Get current state + const state = await roomManager.getQueueState(sessionId); + const currentVersion = state.version; + + // Update with correct version + const result = await roomManager.updateQueueOnly(sessionId, [createTestClimb()], currentVersion); + + expect(result.version).toBe(currentVersion + 1); + }); + + it('should increment version on each call', async () => { + const sessionId = uuidv4(); + const boardPath = '/kilter/1/2/3/40'; + + // Create session + await registerAndJoinSession('client-1', sessionId, boardPath, 'User1'); + + // Get initial state + const initialState = await roomManager.getQueueState(sessionId); + + // First update + const result1 = await roomManager.updateQueueOnly(sessionId, [createTestClimb()]); + expect(result1.version).toBe(initialState.version + 1); + + // Second update + const result2 = await roomManager.updateQueueOnly(sessionId, [createTestClimb()]); + expect(result2.version).toBe(result1.version + 1); + }); + }); + + describe('Return value', () => { + it('should return version, sequence, and stateHash', async () => { + const sessionId = uuidv4(); + const boardPath = '/kilter/1/2/3/40'; + + // Create session + await registerAndJoinSession('client-1', sessionId, boardPath, 'User1'); + + const result = await roomManager.updateQueueOnly(sessionId, [createTestClimb()]); + + expect(typeof result.version).toBe('number'); + expect(typeof result.sequence).toBe('number'); + expect(typeof result.stateHash).toBe('string'); + expect(result.stateHash.length).toBeGreaterThan(0); + }); + + it('should compute correct stateHash based on queue content', async () => { + const sessionId = uuidv4(); + const boardPath = '/kilter/1/2/3/40'; + + // Create session + await registerAndJoinSession('client-1', sessionId, boardPath, 'User1'); + + const climb1 = createTestClimb(); + const climb2 = createTestClimb(); + + // Update with first set of climbs + const result1 = await roomManager.updateQueueOnly(sessionId, [climb1]); + + // Update with different set + const result2 = await roomManager.updateQueueOnly(sessionId, [climb1, climb2]); + + // Hashes should be different + expect(result1.stateHash).not.toBe(result2.stateHash); + }); + }); + + describe('Concurrent updates', () => { + it('should handle rapid sequential updates without version conflicts', async () => { + const sessionId = uuidv4(); + const boardPath = '/kilter/1/2/3/40'; + + // Create session + await registerAndJoinSession('client-1', sessionId, boardPath, 'User1'); + + const initialState = await roomManager.getQueueState(sessionId); + const initialSequence = initialState.sequence; + + // Make 5 sequential updates without version checking + for (let i = 0; i < 5; i++) { + await roomManager.updateQueueOnly(sessionId, [createTestClimb()]); + } + + // Final state should reflect all updates + const finalState = await roomManager.getQueueState(sessionId); + expect(finalState.sequence).toBe(initialSequence + 5); + }); + }); +}); + +describe('addQueueItem - Event publishing fix', () => { + let mockRedis: Redis & { _store: Map; _hashes: Map> }; + let publishSpy: ReturnType; + + beforeEach(async () => { + // Create fresh mock Redis for each test + mockRedis = createMockRedis(); + + // Reset room manager and initialize with mock Redis + roomManager.reset(); + await roomManager.initialize(mockRedis); + + // Spy on pubsub.publishQueueEvent + publishSpy = vi.spyOn(pubsub, 'publishQueueEvent').mockImplementation(() => {}); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('should publish QueueItemAdded event when item is successfully added', async () => { + const sessionId = uuidv4(); + const boardPath = '/kilter/1/2/3/40'; + + // Create session + await registerAndJoinSession('client-1', sessionId, boardPath, 'User1'); + + const climb = createTestClimb(); + + // Create mock context + const ctx = { + connectionId: 'client-1', + sessionId, + rateLimitTokens: 60, + rateLimitLastReset: Date.now(), + }; + + // Add item + await queueMutations.addQueueItem({}, { item: climb }, ctx); + + // Verify event was published + expect(publishSpy).toHaveBeenCalledWith( + sessionId, + expect.objectContaining({ + __typename: 'QueueItemAdded', + item: climb, + }) + ); + }); + + it('should NOT publish event when item already exists in queue', async () => { + const sessionId = uuidv4(); + const boardPath = '/kilter/1/2/3/40'; + + // Create session + await registerAndJoinSession('client-1', sessionId, boardPath, 'User1'); + + const climb = createTestClimb(); + + // Pre-populate the queue with the item using updateQueueState + const state = await roomManager.getQueueState(sessionId); + await roomManager.updateQueueState(sessionId, [climb], null, state.version); + + // Clear spy + publishSpy.mockClear(); + + // Create mock context + const ctx = { + connectionId: 'client-1', + sessionId, + rateLimitTokens: 60, + rateLimitLastReset: Date.now(), + }; + + // Try to add the same item that's already in queue + await queueMutations.addQueueItem({}, { item: climb }, ctx); + + // Verify event was NOT published for duplicate + expect(publishSpy).not.toHaveBeenCalled(); + }); + + it('should return the item even when it already exists (idempotent)', async () => { + const sessionId = uuidv4(); + const boardPath = '/kilter/1/2/3/40'; + + // Create session + await registerAndJoinSession('client-1', sessionId, boardPath, 'User1'); + + const climb = createTestClimb(); + + // Create mock context + const ctx = { + connectionId: 'client-1', + sessionId, + rateLimitTokens: 60, + rateLimitLastReset: Date.now(), + }; + + // Add item first time + const result1 = await queueMutations.addQueueItem({}, { item: climb }, ctx); + + // Add same item again + const result2 = await queueMutations.addQueueItem({}, { item: climb }, ctx); + + // Both should return the item + expect(result1.uuid).toBe(climb.uuid); + expect(result2.uuid).toBe(climb.uuid); + }); + + it('should include correct position in published event', async () => { + const sessionId = uuidv4(); + const boardPath = '/kilter/1/2/3/40'; + + // Create session + await registerAndJoinSession('client-1', sessionId, boardPath, 'User1'); + + const climb1 = createTestClimb(); + const climb2 = createTestClimb(); + + // Create mock context + const ctx = { + connectionId: 'client-1', + sessionId, + rateLimitTokens: 60, + rateLimitLastReset: Date.now(), + }; + + // Add first item at position 0 + await queueMutations.addQueueItem({}, { item: climb1, position: 0 }, ctx); + + expect(publishSpy).toHaveBeenCalledWith( + sessionId, + expect.objectContaining({ + __typename: 'QueueItemAdded', + position: 0, + }) + ); + + publishSpy.mockClear(); + + // Add second item at position 0 (should push first item to position 1) + await queueMutations.addQueueItem({}, { item: climb2, position: 0 }, ctx); + + expect(publishSpy).toHaveBeenCalledWith( + sessionId, + expect.objectContaining({ + __typename: 'QueueItemAdded', + position: 0, + }) + ); + }); + + it('should append to end when no position specified', async () => { + const sessionId = uuidv4(); + const boardPath = '/kilter/1/2/3/40'; + + // Create session + await registerAndJoinSession('client-1', sessionId, boardPath, 'User1'); + + const climb1 = createTestClimb(); + const climb2 = createTestClimb(); + + // Pre-populate the queue with first item + const state = await roomManager.getQueueState(sessionId); + await roomManager.updateQueueState(sessionId, [climb1], null, state.version); + + publishSpy.mockClear(); + + // Create mock context + const ctx = { + connectionId: 'client-1', + sessionId, + rateLimitTokens: 60, + rateLimitLastReset: Date.now(), + }; + + // Add second item without position - should append + await queueMutations.addQueueItem({}, { item: climb2 }, ctx); + + expect(publishSpy).toHaveBeenCalledWith( + sessionId, + expect.objectContaining({ + __typename: 'QueueItemAdded', + position: 1, // Appended at end + }) + ); + }); +}); + +describe('reorderQueueItem - Return type handling', () => { + let mockRedis: Redis & { _store: Map; _hashes: Map> }; + let publishSpy: ReturnType; + + beforeEach(async () => { + mockRedis = createMockRedis(); + roomManager.reset(); + await roomManager.initialize(mockRedis); + publishSpy = vi.spyOn(pubsub, 'publishQueueEvent').mockImplementation(() => {}); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('should use sequence from updateQueueOnly result', async () => { + const sessionId = uuidv4(); + const boardPath = '/kilter/1/2/3/40'; + + // Create session and add items to queue + await registerAndJoinSession('client-1', sessionId, boardPath, 'User1'); + + const climb1 = createTestClimb(); + const climb2 = createTestClimb(); + + // Add items to queue using updateQueueState + const state = await roomManager.getQueueState(sessionId); + await roomManager.updateQueueState(sessionId, [climb1, climb2], null, state.version); + + // Clear any previous publish calls + publishSpy.mockClear(); + + // Create mock context + const ctx = { + connectionId: 'client-1', + sessionId, + rateLimitTokens: 60, + rateLimitLastReset: Date.now(), + }; + + // Reorder + await queueMutations.reorderQueueItem({}, { uuid: climb1.uuid, oldIndex: 0, newIndex: 1 }, ctx); + + // Verify event includes a sequence number (should be incremented from the updateQueueOnly call) + expect(publishSpy).toHaveBeenCalledWith( + sessionId, + expect.objectContaining({ + __typename: 'QueueReordered', + sequence: expect.any(Number), + uuid: climb1.uuid, + oldIndex: 0, + newIndex: 1, + }) + ); + }); +}); diff --git a/packages/backend/src/__tests__/setup.ts b/packages/backend/src/__tests__/setup.ts index 82ece0dc..138aa28e 100644 --- a/packages/backend/src/__tests__/setup.ts +++ b/packages/backend/src/__tests__/setup.ts @@ -17,6 +17,12 @@ let db: ReturnType; // SQL to create only the tables needed for backend tests const createTablesSQL = ` + -- Drop existing tables to ensure schema is up-to-date + DROP TABLE IF EXISTS "board_session_queues" CASCADE; + DROP TABLE IF EXISTS "board_session_clients" CASCADE; + DROP TABLE IF EXISTS "board_sessions" CASCADE; + DROP TABLE IF EXISTS "users" CASCADE; + -- Create users table (minimal, needed for FK reference) CREATE TABLE IF NOT EXISTS "users" ( "id" text PRIMARY KEY NOT NULL, @@ -58,6 +64,7 @@ const createTablesSQL = ` "queue" jsonb DEFAULT '[]'::jsonb NOT NULL, "current_climb_queue_item" jsonb DEFAULT 'null'::jsonb, "version" integer DEFAULT 1 NOT NULL, + "sequence" integer DEFAULT 0 NOT NULL, "updated_at" timestamp DEFAULT now() NOT NULL ); diff --git a/packages/backend/src/__tests__/websocket-sync.test.ts b/packages/backend/src/__tests__/websocket-sync.test.ts new file mode 100644 index 00000000..d0f5d669 --- /dev/null +++ b/packages/backend/src/__tests__/websocket-sync.test.ts @@ -0,0 +1,246 @@ +import { describe, it, expect, beforeEach } from 'vitest'; +import { randomUUID } from 'crypto'; +import { roomManager } from '../services/room-manager.js'; +import { db } from '../db/client.js'; +import { sessions, sessionQueues } from '../db/schema.js'; +import type { ClimbQueueItem } from '@boardsesh/shared-schema'; + +// Generate unique session IDs to prevent conflicts with parallel tests +const uniqueId = () => `ws-sync-${randomUUID().slice(0, 8)}`; + +// Helper to create test climb queue items +function createTestClimbQueueItem(uuid: string, name: string): ClimbQueueItem { + return { + uuid, + climb: { + uuid: `climb-${uuid}`, + setter_username: 'test-setter', + name, + description: 'Test climb', + frames: 'test-frames', + angle: 40, + ascensionist_count: 10, + difficulty: 'V5', + quality_average: '4.5', + stars: 4.5, + difficulty_error: '0.5', + litUpHoldsMap: {}, + mirrored: false, + benchmark_difficulty: null, + }, + tickedBy: [], + addedBy: 'test-user', + suggested: false, + }; +} + +describe('WebSocket Sync - getQueueState Redis Priority', () => { + describe('Unit Tests (mocked Redis)', () => { + beforeEach(() => { + // Reset room manager state + roomManager.reset(); + }); + + it('should return Redis data when available (not fall back to Postgres)', async () => { + const sessionId = uniqueId(); + + // Create session in Postgres with old data + await db.insert(sessions).values({ + id: sessionId, + boardPath: '/kilter/test', + createdAt: new Date(), + lastActivity: new Date(), + }); + + await db.insert(sessionQueues).values({ + sessionId, + queue: [createTestClimbQueueItem('old-1', 'Old Climb from Postgres')], + currentClimbQueueItem: null, + version: 5, + sequence: 5, + updatedAt: new Date(), + }); + + // When Redis is not initialized, should fall back to Postgres + const state = await roomManager.getQueueState(sessionId); + + expect(state.sequence).toBe(5); + expect(state.queue.length).toBe(1); + expect(state.queue[0].climb.name).toBe('Old Climb from Postgres'); + }); + + it('should return empty state for non-existent session', async () => { + const state = await roomManager.getQueueState('non-existent-session'); + + expect(state.queue).toEqual([]); + expect(state.currentClimbQueueItem).toBeNull(); + expect(state.version).toBe(0); + expect(state.sequence).toBe(0); + }); + }); +}); + +// Note: Full WebSocket subscription tests require proper auth and session setup. +// These tests verify the subscription logic indirectly through unit tests. +// The subscription filtering logic is tested in the resolver itself. +describe('WebSocket Sync - Subscription Event Filtering (Unit)', () => { + beforeEach(async () => { + roomManager.reset(); + }); + + it('should have sequence in FullSync state from getQueueState', async () => { + const sessionId = uniqueId(); + + // Create session with queue state + await db.insert(sessions).values({ + id: sessionId, + boardPath: '/kilter/test', + createdAt: new Date(), + lastActivity: new Date(), + }); + + await db.insert(sessionQueues).values({ + sessionId, + queue: [createTestClimbQueueItem('item-1', 'Test Climb')], + currentClimbQueueItem: null, + version: 3, + sequence: 10, + updatedAt: new Date(), + }); + + // getQueueState should return the correct sequence for FullSync + const state = await roomManager.getQueueState(sessionId); + + expect(state.sequence).toBe(10); + expect(state.queue.length).toBe(1); + expect(state.version).toBe(3); + }); + + it('should have stateHash for state change detection', async () => { + const sessionId = uniqueId(); + + // Create session with queue state + await db.insert(sessions).values({ + id: sessionId, + boardPath: '/kilter/test', + createdAt: new Date(), + lastActivity: new Date(), + }); + + await db.insert(sessionQueues).values({ + sessionId, + queue: [createTestClimbQueueItem('item-1', 'Test Climb')], + currentClimbQueueItem: null, + version: 1, + sequence: 5, + updatedAt: new Date(), + }); + + const state = await roomManager.getQueueState(sessionId); + + // stateHash should be present and non-empty + expect(state.stateHash).toBeDefined(); + expect(state.stateHash.length).toBeGreaterThan(0); + }); +}); + +describe('WebSocket Sync - Sequence Number Consistency', () => { + beforeEach(async () => { + roomManager.reset(); + }); + + it('should increment sequence on each queue update', async () => { + const sessionId = uniqueId(); + + // Create session with initial queue state + await db.insert(sessions).values({ + id: sessionId, + boardPath: '/kilter/test', + createdAt: new Date(), + lastActivity: new Date(), + }); + + await db.insert(sessionQueues).values({ + sessionId, + queue: [], + currentClimbQueueItem: null, + version: 1, + sequence: 1, + updatedAt: new Date(), + }); + + // Initial state should have sequence 1 + let state = await roomManager.getQueueState(sessionId); + const initialSequence = state.sequence; + expect(initialSequence).toBe(1); + + // Use updateQueueStateImmediate for immediate Postgres writes (no Redis in tests) + const version1 = await roomManager.updateQueueStateImmediate( + sessionId, + [createTestClimbQueueItem('item-1', 'Climb 1')], + null, + 1, // Pass current version for optimistic locking + ); + + // Check sequence after first update + state = await roomManager.getQueueState(sessionId); + expect(state.sequence).toBe(initialSequence + 1); + + // Another update + await roomManager.updateQueueStateImmediate( + sessionId, + [ + createTestClimbQueueItem('item-1', 'Climb 1'), + createTestClimbQueueItem('item-2', 'Climb 2'), + ], + null, + version1, // Pass previous version + ); + + // Check sequence after second update + state = await roomManager.getQueueState(sessionId); + expect(state.sequence).toBe(initialSequence + 2); + }); + + it('should return consistent sequence from getQueueState after updates', async () => { + const sessionId = uniqueId(); + + // Create session with initial queue state + await db.insert(sessions).values({ + id: sessionId, + boardPath: '/kilter/test', + createdAt: new Date(), + lastActivity: new Date(), + }); + + await db.insert(sessionQueues).values({ + sessionId, + queue: [], + currentClimbQueueItem: null, + version: 1, + sequence: 1, + updatedAt: new Date(), + }); + + // Get initial version for optimistic locking + let state = await roomManager.getQueueState(sessionId); + let currentVersion = state.version; + + // Make several updates using updateQueueStateImmediate for Postgres-only mode + for (let i = 1; i <= 5; i++) { + currentVersion = await roomManager.updateQueueStateImmediate( + sessionId, + [createTestClimbQueueItem(`item-${i}`, `Climb ${i}`)], + null, + currentVersion, + ); + } + + // Get state - should have sequence reflecting all updates + state = await roomManager.getQueueState(sessionId); + + // Sequence should be 6 (initial 1 + 5 updates) + expect(state.sequence).toBe(6); + expect(state.queue.length).toBe(1); // Last update had 1 item + }); +}); diff --git a/packages/backend/src/graphql/resolvers/queue/mutations.ts b/packages/backend/src/graphql/resolvers/queue/mutations.ts index 75ce1a4a..e1980ea1 100644 --- a/packages/backend/src/graphql/resolvers/queue/mutations.ts +++ b/packages/backend/src/graphql/resolvers/queue/mutations.ts @@ -35,6 +35,8 @@ export const queueMutations = { // Track the original queue length for position calculation let originalQueueLength = 0; + let itemWasAdded = false; + let resultSequence = 0; // Retry loop for optimistic locking for (let attempt = 0; attempt < MAX_RETRIES; attempt++) { @@ -46,8 +48,9 @@ export const queueMutations = { // Only add if not already in queue if (queue.some((i) => i.uuid === item.uuid)) { - // Item already in queue, just return it - break; + // Item already in queue - return without publishing event + if (DEBUG) console.log('[addQueueItem] Item already in queue, skipping'); + return item; } if (position !== undefined && position >= 0 && position <= queue.length) { @@ -58,7 +61,9 @@ export const queueMutations = { try { // Use updateQueueOnly with version check to avoid race conditions - await roomManager.updateQueueOnly(sessionId, queue, currentState.version); + const result = await roomManager.updateQueueOnly(sessionId, queue, currentState.version); + itemWasAdded = true; + resultSequence = result.sequence; break; // Success, exit retry loop } catch (error) { if (error instanceof VersionConflictError && attempt < MAX_RETRIES - 1) { @@ -69,19 +74,23 @@ export const queueMutations = { } } - // Calculate actual position where item was inserted - // If position was valid, item is at that index; otherwise it was appended - const actualPosition = - position !== undefined && position >= 0 && position <= originalQueueLength - ? position - : originalQueueLength; // Item was appended at end of original queue - - // Broadcast to subscribers with the actual position - pubsub.publishQueueEvent(sessionId, { - __typename: 'QueueItemAdded', - item: item, - position: actualPosition, - }); + // Only publish event if item was actually added + if (itemWasAdded) { + // Calculate actual position where item was inserted + // If position was valid, item is at that index; otherwise it was appended + const actualPosition = + position !== undefined && position >= 0 && position <= originalQueueLength + ? position + : originalQueueLength; // Item was appended at end of original queue + + // Broadcast to subscribers with the actual position + pubsub.publishQueueEvent(sessionId, { + __typename: 'QueueItemAdded', + sequence: resultSequence, + item: item, + position: actualPosition, + }); + } return item; }, @@ -106,10 +115,11 @@ export const queueMutations = { currentClimb = null; } - await roomManager.updateQueueState(sessionId, queue, currentClimb); + const { sequence } = await roomManager.updateQueueState(sessionId, queue, currentClimb); pubsub.publishQueueEvent(sessionId, { __typename: 'QueueItemRemoved', + sequence, uuid, }); @@ -140,15 +150,19 @@ export const queueMutations = { throw new Error(`Invalid index: queue has ${queue.length} items`); } + let resultSequence = currentState.sequence; + if (oldIndex >= 0 && oldIndex < queue.length && newIndex >= 0 && newIndex < queue.length) { const [movedItem] = queue.splice(oldIndex, 1); queue.splice(newIndex, 0, movedItem); // Use updateQueueOnly to avoid overwriting currentClimbQueueItem - await roomManager.updateQueueOnly(sessionId, queue); + const result = await roomManager.updateQueueOnly(sessionId, queue); + resultSequence = result.sequence; } pubsub.publishQueueEvent(sessionId, { __typename: 'QueueReordered', + sequence: resultSequence, uuid, oldIndex, newIndex, @@ -164,7 +178,7 @@ export const queueMutations = { */ setCurrentClimb: async ( _: unknown, - { item, shouldAddToQueue }: { item: ClimbQueueItem | null; shouldAddToQueue?: boolean }, + { item, shouldAddToQueue, correlationId }: { item: ClimbQueueItem | null; shouldAddToQueue?: boolean; correlationId?: string }, ctx: ConnectionContext ) => { applyRateLimit(ctx); @@ -180,11 +194,12 @@ export const queueMutations = { if (item === null) { console.log('[setCurrentClimb] Setting current climb to NULL by client:', ctx.connectionId, 'session:', sessionId); } else { - console.log('[setCurrentClimb] Setting current climb to:', item.climb?.name, 'by client:', ctx.connectionId); + console.log('[setCurrentClimb] Setting current climb to:', item.climb?.name, 'by client:', ctx.connectionId, 'correlationId:', correlationId); } } // Retry loop for optimistic locking + let sequence = 0; for (let attempt = 0; attempt < MAX_RETRIES; attempt++) { const currentState = await roomManager.getQueueState(sessionId); let queue = currentState.queue; @@ -195,7 +210,8 @@ export const queueMutations = { } try { - await roomManager.updateQueueState(sessionId, queue, item, currentState.version); + const result = await roomManager.updateQueueState(sessionId, queue, item, currentState.version); + sequence = result.sequence; break; // Success, exit retry loop } catch (error) { if (error instanceof VersionConflictError && attempt < MAX_RETRIES - 1) { @@ -208,7 +224,10 @@ export const queueMutations = { pubsub.publishQueueEvent(sessionId, { __typename: 'CurrentClimbChanged', + sequence, item: item, + clientId: ctx.connectionId || null, + correlationId: correlationId || null, }); return item; @@ -224,6 +243,7 @@ export const queueMutations = { const currentState = await roomManager.getQueueState(sessionId); let currentClimb = currentState.currentClimbQueueItem; + let sequence = currentState.sequence; if (currentClimb) { // Update the mirrored state @@ -237,11 +257,13 @@ export const queueMutations = { i.uuid === currentClimb!.uuid ? { ...i, climb: { ...i.climb, mirrored } } : i ); - await roomManager.updateQueueState(sessionId, queue, currentClimb); + const result = await roomManager.updateQueueState(sessionId, queue, currentClimb); + sequence = result.sequence; } pubsub.publishQueueEvent(sessionId, { __typename: 'ClimbMirrored', + sequence, mirrored, }); @@ -273,12 +295,13 @@ export const queueMutations = { currentClimb = item; } - await roomManager.updateQueueState(sessionId, queue, currentClimb); + const { sequence, stateHash } = await roomManager.updateQueueState(sessionId, queue, currentClimb); // Publish as FullSync since replace is less common pubsub.publishQueueEvent(sessionId, { __typename: 'FullSync', - state: { queue, currentClimbQueueItem: currentClimb }, + sequence, + state: { sequence, stateHash, queue, currentClimbQueueItem: currentClimb }, }); return item; @@ -302,15 +325,18 @@ export const queueMutations = { validateInput(ClimbQueueItemSchema, currentClimbQueueItem, 'currentClimbQueueItem'); } - await roomManager.updateQueueState(sessionId, queue, currentClimbQueueItem || null); + const { sequence, stateHash } = await roomManager.updateQueueState(sessionId, queue, currentClimbQueueItem || null); const state: QueueState = { + sequence, + stateHash, queue, currentClimbQueueItem: currentClimbQueueItem || null, }; pubsub.publishQueueEvent(sessionId, { __typename: 'FullSync', + sequence, state, }); diff --git a/packages/backend/src/graphql/resolvers/queue/subscriptions.ts b/packages/backend/src/graphql/resolvers/queue/subscriptions.ts index f6e1d261..aa5bf037 100644 --- a/packages/backend/src/graphql/resolvers/queue/subscriptions.ts +++ b/packages/backend/src/graphql/resolvers/queue/subscriptions.ts @@ -28,21 +28,26 @@ export const queueSubscriptions = { // Now fetch the current state (any events during this time are queued) const queueState = await roomManager.getQueueState(sessionId); + const fullSyncSequence = queueState.sequence; // Send initial FullSync yield { queueUpdates: { __typename: 'FullSync', + sequence: fullSyncSequence, state: queueState, } as QueueEvent, }; // Continue with queued and new events - // Note: The client's reducer handles duplicate events via UUID deduplication, - // so any events that occurred before FullSync but are also in FullSync - // will be safely ignored. + // Filter out events with sequence <= fullSyncSequence to prevent: + // 1. Duplicate events (already included in FullSync state) + // 2. Sequence gap detection on client (e.g., FullSync seq=7, then event seq=3) + // Events queued between subscribing and fetching state will have lower sequences. for await (const event of asyncIterator) { - yield { queueUpdates: event }; + if (event.sequence > fullSyncSequence) { + yield { queueUpdates: event }; + } } }, }, diff --git a/packages/backend/src/graphql/resolvers/sessions/mutations.ts b/packages/backend/src/graphql/resolvers/sessions/mutations.ts index c1cd1237..4c7cf10c 100644 --- a/packages/backend/src/graphql/resolvers/sessions/mutations.ts +++ b/packages/backend/src/graphql/resolvers/sessions/mutations.ts @@ -61,6 +61,8 @@ export const sessionMutations = { boardPath, users: result.users, queueState: { + sequence: result.sequence, + stateHash: result.stateHash, queue: result.queue, currentClimbQueueItem: result.currentClimbQueueItem, }, @@ -126,6 +128,8 @@ export const sessionMutations = { boardPath: input.boardPath, users: result.users, queueState: { + sequence: result.sequence, + stateHash: result.stateHash, queue: result.queue, currentClimbQueueItem: result.currentClimbQueueItem, }, diff --git a/packages/backend/src/graphql/resolvers/sessions/queries.ts b/packages/backend/src/graphql/resolvers/sessions/queries.ts index 57376f28..3b47ceb9 100644 --- a/packages/backend/src/graphql/resolvers/sessions/queries.ts +++ b/packages/backend/src/graphql/resolvers/sessions/queries.ts @@ -1,6 +1,7 @@ -import type { ConnectionContext } from '@boardsesh/shared-schema'; +import type { ConnectionContext, EventsReplayResponse } from '@boardsesh/shared-schema'; import { roomManager, type DiscoverableSession } from '../../../services/room-manager.js'; -import { validateInput } from '../shared/helpers.js'; +import { pubsub } from '../../../pubsub/index.js'; +import { validateInput, requireSessionMember } from '../shared/helpers.js'; import { SessionIdSchema, LatitudeSchema, LongitudeSchema, RadiusMetersSchema } from '../../../validation/schemas.js'; export const sessionQueries = { @@ -29,6 +30,33 @@ export const sessionQueries = { }; }, + /** + * Get buffered events since a sequence number for delta sync (Phase 2) + * Used during reconnection to catch up on missed events + */ + eventsReplay: async ( + _: unknown, + { sessionId, sinceSequence }: { sessionId: string; sinceSequence: number }, + ctx: ConnectionContext + ): Promise => { + // Validate inputs + validateInput(SessionIdSchema, sessionId, 'sessionId'); + + // Verify user is a member of the session + await requireSessionMember(ctx, sessionId); + + // Get events from buffer + const events = await pubsub.getEventsSince(sessionId, sinceSequence); + + // Get current sequence from queue state + const queueState = await roomManager.getQueueState(sessionId); + + return { + events, + currentSequence: queueState.sequence, + }; + }, + /** * Find nearby sessions using GPS coordinates * Returns discoverable sessions within the specified radius diff --git a/packages/backend/src/pubsub/index.ts b/packages/backend/src/pubsub/index.ts index b7db6ce2..e44d4c75 100644 --- a/packages/backend/src/pubsub/index.ts +++ b/packages/backend/src/pubsub/index.ts @@ -5,6 +5,10 @@ import { createRedisPubSubAdapter, type RedisPubSubAdapter } from './redis-adapt type QueueSubscriber = (event: QueueEvent) => void; type SessionSubscriber = (event: SessionEvent) => void; +// Event buffer configuration (Phase 2: Delta sync) +const EVENT_BUFFER_SIZE = 100; // Store last 100 events per session +const EVENT_BUFFER_TTL = 300; // 5 minutes + /** * Hybrid PubSub that supports both local-only and Redis-backed modes. * @@ -183,9 +187,77 @@ class PubSub { }; } + /** + * Store a queue event in the event buffer for delta sync (Phase 2). + * Events are stored in a Redis list with a TTL. + */ + private async storeEventInBuffer(sessionId: string, event: QueueEvent): Promise { + if (!this.redisAdapter) { + // No Redis - skip event buffering (will fallback to full sync) + return; + } + + try { + const { publisher } = redisClientManager.getClients(); + const bufferKey = `session:${sessionId}:events`; + const eventJson = JSON.stringify(event); + + // Add to front of list (newest events first) + await publisher.lpush(bufferKey, eventJson); + // Trim to keep only last N events + await publisher.ltrim(bufferKey, 0, EVENT_BUFFER_SIZE - 1); + // Set TTL (5 minutes) + await publisher.expire(bufferKey, EVENT_BUFFER_TTL); + } catch (error) { + console.error('[PubSub] Failed to store event in buffer:', error); + // Don't throw - event buffering is optional (will fallback to full sync) + } + } + + /** + * Retrieve events since a given sequence number (Phase 2). + * Used for delta sync on reconnection. + * Returns events in ascending sequence order. + */ + async getEventsSince(sessionId: string, sinceSequence: number): Promise { + if (!this.redisAdapter) { + throw new Error('Event buffer requires Redis'); + } + + try { + const { publisher } = redisClientManager.getClients(); + const bufferKey = `session:${sessionId}:events`; + + // Get all events from buffer (newest first due to lpush) + const eventJsons = await publisher.lrange(bufferKey, 0, -1); + + // Parse and filter events + const events: QueueEvent[] = []; + for (const json of eventJsons) { + try { + const event = JSON.parse(json) as QueueEvent; + if (event.sequence > sinceSequence) { + events.push(event); + } + } catch (parseError) { + console.error('[PubSub] Failed to parse buffered event:', parseError); + } + } + + // Sort by sequence (ascending) since buffer is newest-first + events.sort((a, b) => a.sequence - b.sequence); + + return events; + } catch (error) { + console.error('[PubSub] Failed to retrieve events from buffer:', error); + throw error; + } + } + /** * Publish a queue event to all subscribers of a session. * Dispatches locally first, then publishes to Redis for other instances. + * Also stores event in buffer for delta sync (Phase 2). * * Note: Redis publish errors are logged but not thrown to avoid blocking * the local dispatch. In Redis mode, events may not reach other instances @@ -195,6 +267,13 @@ class PubSub { // Always dispatch to local subscribers first (low latency) this.dispatchToLocalQueueSubscribers(sessionId, event); + // Store event in buffer for delta sync (Phase 2) + // Fire and forget - don't block on buffer storage + this.storeEventInBuffer(sessionId, event).catch((error) => { + console.error(`[PubSub] Failed to buffer event for session ${sessionId}:`, error); + // Non-fatal: clients will fall back to full sync if delta sync fails + }); + // Also publish to Redis if available if (this.redisAdapter) { this.redisAdapter.publishQueueEvent(sessionId, event).catch((error) => { diff --git a/packages/backend/src/services/redis-session-store.ts b/packages/backend/src/services/redis-session-store.ts index e0a10348..0cca5f9d 100644 --- a/packages/backend/src/services/redis-session-store.ts +++ b/packages/backend/src/services/redis-session-store.ts @@ -22,6 +22,8 @@ export interface RedisSessionData { queue: ClimbQueueItem[]; currentClimbQueueItem: ClimbQueueItem | null; version: number; + sequence: number; + stateHash: string; lastActivity: Date; discoverable: boolean; latitude: number | null; @@ -59,6 +61,8 @@ export class RedisSessionStore { ? JSON.stringify(data.currentClimbQueueItem) : '', version: data.version.toString(), + sequence: data.sequence.toString(), + stateHash: data.stateHash, lastActivity: data.lastActivity.getTime().toString(), discoverable: data.discoverable ? '1' : '0', latitude: data.latitude?.toString() || '', @@ -84,7 +88,9 @@ export class RedisSessionStore { sessionId: string, queue: ClimbQueueItem[], currentClimbQueueItem: ClimbQueueItem | null, - version: number + version: number, + sequence: number, + stateHash: string ): Promise { const key = `boardsesh:session:${sessionId}`; const multi = this.redis.multi(); @@ -95,6 +101,8 @@ export class RedisSessionStore { ? JSON.stringify(currentClimbQueueItem) : '', version: version.toString(), + sequence: sequence.toString(), + stateHash: stateHash, lastActivity: Date.now().toString(), }); @@ -120,7 +128,9 @@ export class RedisSessionStore { boardPath: data.boardPath, queue: safeJSONParse(data.queue, []), currentClimbQueueItem: safeJSONParse(data.currentClimbQueueItem, null), - version: parseInt(data.version, 10), + version: parseInt(data.version, 10) || 0, + sequence: parseInt(data.sequence, 10) || 0, + stateHash: data.stateHash || '', lastActivity: new Date(parseInt(data.lastActivity, 10)), discoverable: data.discoverable === '1', latitude: data.latitude ? parseFloat(data.latitude) : null, diff --git a/packages/backend/src/services/room-manager.ts b/packages/backend/src/services/room-manager.ts index c205c017..4b022538 100644 --- a/packages/backend/src/services/room-manager.ts +++ b/packages/backend/src/services/room-manager.ts @@ -6,6 +6,7 @@ import { eq, and, sql, gt, gte, lte, ne } from 'drizzle-orm'; import type { ClimbQueueItem, SessionUser } from '@boardsesh/shared-schema'; import { haversineDistance, getBoundingBox, DEFAULT_SEARCH_RADIUS_METERS } from '../utils/geo.js'; import { RedisSessionStore } from './redis-session-store.js'; +import { computeQueueStateHash } from '../utils/hash.js'; // Custom error for version conflicts export class VersionConflictError extends Error { @@ -42,7 +43,7 @@ class RoomManager { private sessions: Map> = new Map(); private redisStore: RedisSessionStore | null = null; private postgresWriteTimers: Map = new Map(); - private pendingWrites: Map = new Map(); + private pendingWrites: Map = new Map(); private readonly MAX_RETRY_ATTEMPTS = 3; private readonly RETRY_BASE_DELAY = 1000; // 1 second private writeRetryAttempts: Map = new Map(); @@ -100,6 +101,8 @@ class RoomManager { users: SessionUser[]; queue: ClimbQueueItem[]; currentClimbQueueItem: ClimbQueueItem | null; + sequence: number; + stateHash: string; isLeader: boolean; }> { const client = this.clients.get(connectionId); @@ -154,6 +157,8 @@ class RoomManager { queue: queueState.queue, currentClimbQueueItem: queueState.currentClimbQueueItem, version: queueState.version, + sequence: queueState.sequence, + stateHash: queueState.stateHash, lastActivity: pgSession.lastActivity, discoverable: pgSession.discoverable, latitude: pgSession.latitude, @@ -175,12 +180,28 @@ class RoomManager { await this.redisStore.releaseLock(lockKey, lockValue); } } else { - // Lock not acquired, wait briefly for restoration to complete - console.log(`[RoomManager] Lock not acquired for session ${sessionId}, waiting...`); - await new Promise(resolve => setTimeout(resolve, 100)); + // Lock not acquired - wait with exponential backoff for restoration to complete + console.log(`[RoomManager] Lock not acquired for session ${sessionId}, waiting with backoff...`); + let waitTime = 50; + const maxWait = 2000; + const maxAttempts = 5; + + for (let attempt = 0; attempt < maxAttempts; attempt++) { + await new Promise(resolve => setTimeout(resolve, waitTime)); + + // Check if session was restored by another instance + if (this.sessions.has(sessionId)) { + console.log(`[RoomManager] Session ${sessionId} restored by another instance after ${attempt + 1} attempts`); + break; + } + + // Exponential backoff + waitTime = Math.min(waitTime * 2, maxWait); + } // After waiting, session should exist if another instance initialized it if (!this.sessions.has(sessionId)) { + console.log(`[RoomManager] Session ${sessionId} not restored after backoff, creating local entry`); this.sessions.set(sessionId, new Set()); } } @@ -238,6 +259,8 @@ class RoomManager { users, queue: queueState.queue, currentClimbQueueItem: queueState.currentClimbQueueItem, + sequence: queueState.sequence, + stateHash: queueState.stateHash, isLeader, }; } @@ -363,31 +386,47 @@ class RoomManager { queue: ClimbQueueItem[], currentClimbQueueItem: ClimbQueueItem | null, expectedVersion?: number - ): Promise { - // Get current version from Redis if available, otherwise from Postgres + ): Promise<{ version: number; sequence: number; stateHash: string }> { + // Get current version and sequence from Redis if available, otherwise from Postgres let currentVersion = expectedVersion; + let currentSequence = 0; + if (currentVersion === undefined) { if (this.redisStore) { const redisSession = await this.redisStore.getSession(sessionId); currentVersion = redisSession?.version ?? 0; + currentSequence = redisSession?.sequence ?? 0; } if (currentVersion === undefined || currentVersion === 0) { const pgState = await this.getQueueState(sessionId); currentVersion = pgState.version; + currentSequence = pgState.sequence; + } + } else { + // If version is provided, get sequence from Redis or Postgres + if (this.redisStore) { + const redisSession = await this.redisStore.getSession(sessionId); + currentSequence = redisSession?.sequence ?? 0; + } + if (currentSequence === 0) { + const pgState = await this.getQueueState(sessionId); + currentSequence = pgState.sequence; } } const newVersion = currentVersion + 1; + const newSequence = currentSequence + 1; + const stateHash = computeQueueStateHash(queue, currentClimbQueueItem?.uuid || null); // Write to Redis immediately (source of truth for active sessions) if (this.redisStore) { - await this.redisStore.updateQueueState(sessionId, queue, currentClimbQueueItem, newVersion); + await this.redisStore.updateQueueState(sessionId, queue, currentClimbQueueItem, newVersion, newSequence, stateHash); } // Debounce Postgres write (30 seconds) - eventual consistency - this.schedulePostgresWrite(sessionId, queue, currentClimbQueueItem, newVersion); + this.schedulePostgresWrite(sessionId, queue, currentClimbQueueItem, newVersion, newSequence); - return newVersion; + return { version: newVersion, sequence: newSequence, stateHash }; } /** @@ -410,6 +449,7 @@ class RoomManager { queue, currentClimbQueueItem, version: 1, + sequence: 1, // Initial sequence for new session updatedAt: new Date(), }) .onConflictDoNothing() @@ -421,7 +461,8 @@ class RoomManager { // Also update Redis if (this.redisStore) { - await this.redisStore.updateQueueState(sessionId, queue, currentClimbQueueItem, result[0].version); + const stateHash = computeQueueStateHash(queue, currentClimbQueueItem?.uuid || null); + await this.redisStore.updateQueueState(sessionId, queue, currentClimbQueueItem, result[0].version, result[0].sequence, stateHash); } return result[0].version; @@ -434,6 +475,7 @@ class RoomManager { queue, currentClimbQueueItem, version: sql`${sessionQueues.version} + 1`, + sequence: sql`${sessionQueues.sequence} + 1`, updatedAt: new Date(), }) .where(and( @@ -448,7 +490,8 @@ class RoomManager { // Also update Redis if (this.redisStore) { - await this.redisStore.updateQueueState(sessionId, queue, currentClimbQueueItem, result[0].version); + const stateHash = computeQueueStateHash(queue, currentClimbQueueItem?.uuid || null); + await this.redisStore.updateQueueState(sessionId, queue, currentClimbQueueItem, result[0].version, result[0].sequence, stateHash); } return result[0].version; @@ -462,6 +505,7 @@ class RoomManager { queue, currentClimbQueueItem, version: 1, + sequence: 1, // Initial sequence for new session updatedAt: new Date(), }) .onConflictDoUpdate({ @@ -470,16 +514,19 @@ class RoomManager { queue, currentClimbQueueItem, version: sql`${sessionQueues.version} + 1`, + sequence: sql`${sessionQueues.sequence} + 1`, updatedAt: new Date(), }, }) .returning(); const newVersion = result[0]?.version ?? 1; + const newSequence = result[0]?.sequence ?? 1; // Also update Redis if (this.redisStore) { - await this.redisStore.updateQueueState(sessionId, queue, currentClimbQueueItem, newVersion); + const stateHash = computeQueueStateHash(queue, currentClimbQueueItem?.uuid || null); + await this.redisStore.updateQueueState(sessionId, queue, currentClimbQueueItem, newVersion, newSequence, stateHash); } return newVersion; @@ -487,82 +534,106 @@ class RoomManager { /** * Update only the queue without touching currentClimbQueueItem. + * Uses Redis as source of truth for real-time state. Postgres writes are debounced. * This avoids race conditions when other operations are modifying currentClimbQueueItem. */ - async updateQueueOnly(sessionId: string, queue: ClimbQueueItem[], expectedVersion?: number): Promise { - if (expectedVersion !== undefined) { - if (expectedVersion === 0) { - // Version 0 means no row exists yet - try to insert - // If a row was created between our read and this insert, the conflict will - // cause us to return nothing, triggering a VersionConflictError and retry - const result = await db - .insert(sessionQueues) - .values({ - sessionId, - queue, - currentClimbQueueItem: null, - version: 1, - updatedAt: new Date(), - }) - .onConflictDoNothing() - .returning(); + async updateQueueOnly( + sessionId: string, + queue: ClimbQueueItem[], + expectedVersion?: number + ): Promise<{ version: number; sequence: number; stateHash: string }> { + // Get current state from Redis (source of truth for real-time sync) + let currentVersion = 0; + let currentSequence = 0; + let currentClimbQueueItem: ClimbQueueItem | null = null; - if (result.length === 0) { - // Row was created by a concurrent operation, trigger retry - throw new VersionConflictError(sessionId, expectedVersion); - } - return result[0].version; + if (this.redisStore) { + const redisSession = await this.redisStore.getSession(sessionId); + if (redisSession) { + currentVersion = redisSession.version; + currentSequence = redisSession.sequence; + currentClimbQueueItem = redisSession.currentClimbQueueItem; } + } - // Optimistic locking: only update if version matches - const result = await db - .update(sessionQueues) - .set({ - queue, - version: sql`${sessionQueues.version} + 1`, - updatedAt: new Date(), - }) - .where(and( - eq(sessionQueues.sessionId, sessionId), - eq(sessionQueues.version, expectedVersion) - )) - .returning(); + // Fallback to Postgres if Redis doesn't have the data + if (currentVersion === 0 && currentSequence === 0) { + const pgState = await this.getQueueState(sessionId); + currentVersion = pgState.version; + currentSequence = pgState.sequence; + currentClimbQueueItem = pgState.currentClimbQueueItem; + } - if (result.length === 0) { - throw new VersionConflictError(sessionId, expectedVersion); - } - return result[0].version; + // Validate expectedVersion if provided (optimistic locking) + if (expectedVersion !== undefined && currentVersion !== expectedVersion) { + throw new VersionConflictError(sessionId, expectedVersion); } - // No version check - const result = await db - .update(sessionQueues) - .set({ - queue, - version: sql`${sessionQueues.version} + 1`, - updatedAt: new Date(), - }) - .where(eq(sessionQueues.sessionId, sessionId)) - .returning(); + const newVersion = currentVersion + 1; + const newSequence = currentSequence + 1; + const stateHash = computeQueueStateHash(queue, currentClimbQueueItem?.uuid || null); + + // Write to Redis immediately (source of truth for real-time state) + if (this.redisStore) { + await this.redisStore.updateQueueState( + sessionId, queue, currentClimbQueueItem, newVersion, newSequence, stateHash + ); + } - return result[0]?.version ?? 1; + // Debounce Postgres write (for queue history - eventual consistency) + this.schedulePostgresWrite(sessionId, queue, currentClimbQueueItem, newVersion, newSequence); + + return { version: newVersion, sequence: newSequence, stateHash }; } async getQueueState(sessionId: string): Promise<{ queue: ClimbQueueItem[]; currentClimbQueueItem: ClimbQueueItem | null; version: number; + sequence: number; + stateHash: string; }> { + // Check Redis first (source of truth for active sessions) + // Redis is written to immediately, while Postgres writes are debounced (30s) + if (this.redisStore) { + const redisSession = await this.redisStore.getSession(sessionId); + if (redisSession) { + return { + queue: redisSession.queue, + currentClimbQueueItem: redisSession.currentClimbQueueItem, + version: redisSession.version, + sequence: redisSession.sequence, + stateHash: redisSession.stateHash, + }; + } + } + + // Fall back to Postgres (for dormant sessions or when Redis is unavailable) const result = await db.select().from(sessionQueues).where(eq(sessionQueues.sessionId, sessionId)).limit(1); if (result.length === 0) { - return { queue: [], currentClimbQueueItem: null, version: 0 }; + // Return initial state with empty hash + return { + queue: [], + currentClimbQueueItem: null, + version: 0, + sequence: 0, + stateHash: computeQueueStateHash([], null), + }; } + // Compute hash from current state + const stateHash = computeQueueStateHash( + result[0].queue, + result[0].currentClimbQueueItem?.uuid || null + ); + return { queue: result[0].queue, currentClimbQueueItem: result[0].currentClimbQueueItem, version: result[0].version, + sequence: result[0].sequence, + stateHash, }; } @@ -847,10 +918,11 @@ class RoomManager { sessionId: string, queue: ClimbQueueItem[], currentClimbQueueItem: ClimbQueueItem | null, - version: number + version: number, + sequence: number ): void { // Store latest state - this.pendingWrites.set(sessionId, { queue, currentClimbQueueItem, version }); + this.pendingWrites.set(sessionId, { queue, currentClimbQueueItem, version, sequence }); // Clear existing timer const existingTimer = this.postgresWriteTimers.get(sessionId); @@ -886,7 +958,7 @@ class RoomManager { */ private async writeQueueStateToPostgres( sessionId: string, - state: { queue: ClimbQueueItem[]; currentClimbQueueItem: ClimbQueueItem | null; version: number } + state: { queue: ClimbQueueItem[]; currentClimbQueueItem: ClimbQueueItem | null; version: number; sequence: number } ): Promise { await db .insert(sessionQueues) @@ -895,6 +967,7 @@ class RoomManager { queue: state.queue, currentClimbQueueItem: state.currentClimbQueueItem, version: state.version, + sequence: state.sequence, updatedAt: new Date(), }) .onConflictDoUpdate({ @@ -903,6 +976,7 @@ class RoomManager { queue: state.queue, currentClimbQueueItem: state.currentClimbQueueItem, version: state.version, + sequence: state.sequence, updatedAt: new Date(), }, }); diff --git a/packages/backend/src/utils/hash.ts b/packages/backend/src/utils/hash.ts new file mode 100644 index 00000000..bbe04489 --- /dev/null +++ b/packages/backend/src/utils/hash.ts @@ -0,0 +1,55 @@ +/** + * Fast non-cryptographic hash function for state verification + * Uses FNV-1a (Fowler-Noll-Vo) algorithm - fast and good distribution + * + * NOTE: This is NOT a cryptographic hash - use only for integrity checking + * and detecting state drift, not for security purposes. + */ + +/** + * FNV-1a 32-bit hash + * https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function + * + * @param str - String to hash + * @returns Hexadecimal hash string + */ +export function fnv1aHash(str: string): string { + // FNV-1a parameters for 32-bit hash + const FNV_PRIME = 0x01000193; + const FNV_OFFSET_BASIS = 0x811c9dc5; + + let hash = FNV_OFFSET_BASIS; + + for (let i = 0; i < str.length; i++) { + // XOR with byte + hash ^= str.charCodeAt(i); + + // Multiply by FNV prime (with 32-bit overflow) + hash = Math.imul(hash, FNV_PRIME); + } + + // Convert to unsigned 32-bit integer and return as hex + return (hash >>> 0).toString(16).padStart(8, '0'); +} + +/** + * Compute a deterministic hash of queue state + * + * Creates a canonical string representation of the queue state + * (sorted queue UUIDs + current item UUID) and hashes it. + * + * @param queue - Array of queue item UUIDs + * @param currentItemUuid - UUID of current climb queue item (or null) + * @returns Hash string + */ +export function computeQueueStateHash( + queue: Array<{ uuid: string }>, + currentItemUuid: string | null +): string { + // Create canonical representation: sorted queue UUIDs + current UUID + const queueUuids = queue.map(item => item.uuid).sort().join(','); + const currentUuid = currentItemUuid || 'null'; + const canonical = `${queueUuids}|${currentUuid}`; + + return fnv1aHash(canonical); +} diff --git a/packages/backend/vitest.config.ts b/packages/backend/vitest.config.ts index 13bf8e57..561ccaa6 100644 --- a/packages/backend/vitest.config.ts +++ b/packages/backend/vitest.config.ts @@ -10,6 +10,12 @@ export default defineConfig({ setupFiles: ['./src/__tests__/setup.ts'], testTimeout: 10000, hookTimeout: 30000, + // Run test files sequentially because: + // 1. Tests share a singleton roomManager (reset in beforeEach can conflict) + // 2. Tests share a database with truncation in setup.ts beforeEach + // 3. Integration tests start servers on specific ports + // To enable parallelism, each test file would need isolated state. + fileParallelism: false, coverage: { provider: 'v8', reporter: ['text', 'json', 'html'], diff --git a/packages/db/drizzle/0027_add_sequence_column.sql b/packages/db/drizzle/0027_add_sequence_column.sql new file mode 100644 index 00000000..78ce6dc8 --- /dev/null +++ b/packages/db/drizzle/0027_add_sequence_column.sql @@ -0,0 +1,3 @@ +-- Add sequence column for delta sync tracking +-- This column tracks the event sequence independently of the optimistic locking version +ALTER TABLE "board_session_queues" ADD COLUMN "sequence" integer DEFAULT 0 NOT NULL; diff --git a/packages/db/src/schema/app/sessions.ts b/packages/db/src/schema/app/sessions.ts index b7e50292..36d5731e 100644 --- a/packages/db/src/schema/app/sessions.ts +++ b/packages/db/src/schema/app/sessions.ts @@ -45,6 +45,8 @@ export const boardSessionQueues = pgTable('board_session_queues', { queue: jsonb('queue').$type().default([]).notNull(), currentClimbQueueItem: jsonb('current_climb_queue_item').$type().default(null), version: integer('version').default(1).notNull(), + // Sequence number for event ordering (separate from version used for optimistic locking) + sequence: integer('sequence').default(0).notNull(), updatedAt: timestamp('updated_at').defaultNow().notNull(), }); diff --git a/packages/shared-schema/src/operations.ts b/packages/shared-schema/src/operations.ts index 6fda8003..ecb6cd85 100644 --- a/packages/shared-schema/src/operations.ts +++ b/packages/shared-schema/src/operations.ts @@ -55,6 +55,8 @@ export const JOIN_SESSION = ` avatarUrl } queueState { + sequence + stateHash queue { ${QUEUE_ITEM_FIELDS} } @@ -99,8 +101,8 @@ export const REORDER_QUEUE_ITEM = ` `; export const SET_CURRENT_CLIMB = ` - mutation SetCurrentClimb($item: ClimbQueueItemInput, $shouldAddToQueue: Boolean) { - setCurrentClimb(item: $item, shouldAddToQueue: $shouldAddToQueue) { + mutation SetCurrentClimb($item: ClimbQueueItemInput, $shouldAddToQueue: Boolean, $correlationId: ID) { + setCurrentClimb(item: $item, shouldAddToQueue: $shouldAddToQueue, correlationId: $correlationId) { ${QUEUE_ITEM_FIELDS} } } @@ -117,6 +119,8 @@ export const MIRROR_CURRENT_CLIMB = ` export const SET_QUEUE = ` mutation SetQueue($queue: [ClimbQueueItemInput!]!, $currentClimbQueueItem: ClimbQueueItemInput) { setQueue(queue: $queue, currentClimbQueueItem: $currentClimbQueueItem) { + sequence + stateHash queue { ${QUEUE_ITEM_FIELDS} } @@ -141,6 +145,8 @@ export const CREATE_SESSION = ` avatarUrl } queueState { + sequence + stateHash queue { ${QUEUE_ITEM_FIELDS} } @@ -179,12 +185,69 @@ export const SESSION_UPDATES = ` } `; +// Query for delta sync event replay (Phase 2) +export const EVENTS_REPLAY = ` + query EventsReplay($sessionId: ID!, $sinceSequence: Int!) { + eventsReplay(sessionId: $sessionId, sinceSequence: $sinceSequence) { + currentSequence + events { + __typename + ... on FullSync { + sequence + state { + sequence + stateHash + queue { + ${QUEUE_ITEM_FIELDS} + } + currentClimbQueueItem { + ${QUEUE_ITEM_FIELDS} + } + } + } + ... on QueueItemAdded { + sequence + addedItem: item { + ${QUEUE_ITEM_FIELDS} + } + position + } + ... on QueueItemRemoved { + sequence + uuid + } + ... on QueueReordered { + sequence + uuid + oldIndex + newIndex + } + ... on CurrentClimbChanged { + sequence + currentItem: item { + ${QUEUE_ITEM_FIELDS} + } + clientId + correlationId + } + ... on ClimbMirrored { + sequence + mirrored + } + } + } + } +`; + export const QUEUE_UPDATES = ` subscription QueueUpdates($sessionId: ID!) { queueUpdates(sessionId: $sessionId) { __typename ... on FullSync { + sequence state { + sequence + stateHash queue { ${QUEUE_ITEM_FIELDS} } @@ -194,25 +257,32 @@ export const QUEUE_UPDATES = ` } } ... on QueueItemAdded { + sequence addedItem: item { ${QUEUE_ITEM_FIELDS} } position } ... on QueueItemRemoved { + sequence uuid } ... on QueueReordered { + sequence uuid oldIndex newIndex } ... on CurrentClimbChanged { + sequence currentItem: item { ${QUEUE_ITEM_FIELDS} } + clientId + correlationId } ... on ClimbMirrored { + sequence mirrored } } diff --git a/packages/shared-schema/src/schema.ts b/packages/shared-schema/src/schema.ts index ea948f16..0b9bc48c 100644 --- a/packages/shared-schema/src/schema.ts +++ b/packages/shared-schema/src/schema.ts @@ -80,10 +80,18 @@ export const typeDefs = /* GraphQL */ ` } type QueueState { + sequence: Int! + stateHash: String! queue: [ClimbQueueItem!]! currentClimbQueueItem: ClimbQueueItem } + # Response for delta sync event replay (Phase 2) + type EventsReplayResponse { + events: [QueueEvent!]! + currentSequence: Int! + } + type Session { id: ID! boardPath: String! @@ -345,6 +353,8 @@ export const typeDefs = /* GraphQL */ ` type Query { session(sessionId: ID!): Session + # Get buffered events since a sequence number for delta sync (Phase 2) + eventsReplay(sessionId: ID!, sinceSequence: Int!): EventsReplayResponse! # Find discoverable sessions near a location nearbySessions(latitude: Float!, longitude: Float!, radiusMeters: Float): [DiscoverableSession!]! # Get current user's recent sessions (requires auth context) @@ -425,7 +435,7 @@ export const typeDefs = /* GraphQL */ ` addQueueItem(item: ClimbQueueItemInput!, position: Int): ClimbQueueItem! removeQueueItem(uuid: ID!): Boolean! reorderQueueItem(uuid: ID!, oldIndex: Int!, newIndex: Int!): Boolean! - setCurrentClimb(item: ClimbQueueItemInput, shouldAddToQueue: Boolean): ClimbQueueItem + setCurrentClimb(item: ClimbQueueItemInput, shouldAddToQueue: Boolean, correlationId: ID): ClimbQueueItem mirrorCurrentClimb(mirrored: Boolean!): ClimbQueueItem replaceQueueItem(uuid: ID!, item: ClimbQueueItemInput!): ClimbQueueItem! setQueue(queue: [ClimbQueueItemInput!]!, currentClimbQueueItem: ClimbQueueItemInput): QueueState! @@ -511,29 +521,37 @@ export const typeDefs = /* GraphQL */ ` | ClimbMirrored type FullSync { + sequence: Int! state: QueueState! } type QueueItemAdded { + sequence: Int! item: ClimbQueueItem! position: Int } type QueueItemRemoved { + sequence: Int! uuid: ID! } type QueueReordered { + sequence: Int! uuid: ID! oldIndex: Int! newIndex: Int! } type CurrentClimbChanged { + sequence: Int! item: ClimbQueueItem + clientId: ID + correlationId: ID } type ClimbMirrored { + sequence: Int! mirrored: Boolean! } `; diff --git a/packages/shared-schema/src/types.ts b/packages/shared-schema/src/types.ts index 496e09ab..d19d388d 100644 --- a/packages/shared-schema/src/types.ts +++ b/packages/shared-schema/src/types.ts @@ -87,10 +87,19 @@ export type SessionUser = { }; export type QueueState = { + sequence: number; + stateHash: string; queue: ClimbQueueItem[]; currentClimbQueueItem: ClimbQueueItem | null; }; +// Response for delta sync event replay (Phase 2) +// Uses QueueEvent since this is a query returning buffered events with standard field names +export type EventsReplayResponse = { + events: QueueEvent[]; + currentSequence: number; +}; + // ============================================ // Board Configuration Types // ============================================ @@ -252,57 +261,34 @@ export type GetTicksInput = { * * ## Type Aliasing Strategy * - * There are TWO event types because of GraphQL field aliasing: - * - * 1. **QueueEvent** (Server-side) - * - Used by the backend when publishing events via PubSub - * - Uses the actual GraphQL field names defined in the schema (e.g., `item`) - * - * 2. **ClientQueueEvent** (Client-side) - * - Used by the web app when receiving subscription events - * - Uses aliased field names from the subscription query (e.g., `addedItem`, `currentItem`) - * - * The reason for this split is that the GraphQL subscription query in operations.ts - * uses aliases to give more descriptive names to fields: - * - * ```graphql - * subscription QueueUpdates($sessionId: String!) { - * queueUpdates(sessionId: $sessionId) { - * ... on QueueItemAdded { - * addedItem: item { ... } # 'item' aliased to 'addedItem' - * } - * ... on CurrentClimbChanged { - * currentItem: item { ... } # 'item' aliased to 'currentItem' - * } - * } - * } - * ``` + * There are TWO event types due to GraphQL union type constraints: * - * This aliasing is intentional for clarity in client code, but it means the - * TypeScript types must reflect what the client actually receives. + * 1. `QueueEvent` - Server-side type using `item` field. Used by backend PubSub + * and for eventsReplay query responses. * - * When working with these types: - * - In the backend (server): use `QueueEvent` - * - In the web app (client): use `ClientQueueEvent` + * 2. `SubscriptionQueueEvent` - Client-side type using aliased fields (`addedItem`, + * `currentItem`). Required because GraphQL doesn't allow the same field name + * with different nullability in a union (QueueItemAdded.item is non-null, + * CurrentClimbChanged.item is nullable). */ // Server-side event type - uses actual GraphQL field names export type QueueEvent = - | { __typename: 'FullSync'; state: QueueState } - | { __typename: 'QueueItemAdded'; item: ClimbQueueItem; position?: number } - | { __typename: 'QueueItemRemoved'; uuid: string } - | { __typename: 'QueueReordered'; uuid: string; oldIndex: number; newIndex: number } - | { __typename: 'CurrentClimbChanged'; item: ClimbQueueItem | null } - | { __typename: 'ClimbMirrored'; mirrored: boolean }; - -// Client-side event type - uses aliased field names from subscription query -export type ClientQueueEvent = - | { __typename: 'FullSync'; state: QueueState } - | { __typename: 'QueueItemAdded'; addedItem: ClimbQueueItem; position?: number } - | { __typename: 'QueueItemRemoved'; uuid: string } - | { __typename: 'QueueReordered'; uuid: string; oldIndex: number; newIndex: number } - | { __typename: 'CurrentClimbChanged'; currentItem: ClimbQueueItem | null } - | { __typename: 'ClimbMirrored'; mirrored: boolean }; + | { __typename: 'FullSync'; sequence: number; state: QueueState } + | { __typename: 'QueueItemAdded'; sequence: number; item: ClimbQueueItem; position?: number } + | { __typename: 'QueueItemRemoved'; sequence: number; uuid: string } + | { __typename: 'QueueReordered'; sequence: number; uuid: string; oldIndex: number; newIndex: number } + | { __typename: 'CurrentClimbChanged'; sequence: number; item: ClimbQueueItem | null; clientId: string | null; correlationId: string | null } + | { __typename: 'ClimbMirrored'; sequence: number; mirrored: boolean }; + +// Client-side subscription event type - uses aliased field names to avoid GraphQL union conflicts +export type SubscriptionQueueEvent = + | { __typename: 'FullSync'; sequence: number; state: QueueState } + | { __typename: 'QueueItemAdded'; sequence: number; addedItem: ClimbQueueItem; position?: number } + | { __typename: 'QueueItemRemoved'; sequence: number; uuid: string } + | { __typename: 'QueueReordered'; sequence: number; uuid: string; oldIndex: number; newIndex: number } + | { __typename: 'CurrentClimbChanged'; sequence: number; currentItem: ClimbQueueItem | null; clientId: string | null; correlationId: string | null } + | { __typename: 'ClimbMirrored'; sequence: number; mirrored: boolean }; export type SessionEvent = | { __typename: 'UserJoined'; user: SessionUser } diff --git a/packages/web/app/components/graphql-queue/QueueContext.tsx b/packages/web/app/components/graphql-queue/QueueContext.tsx index b55a9895..0e7e7140 100644 --- a/packages/web/app/components/graphql-queue/QueueContext.tsx +++ b/packages/web/app/components/graphql-queue/QueueContext.tsx @@ -1,6 +1,6 @@ 'use client'; -import React, { useContext, createContext, ReactNode, useCallback, useMemo, useState, useEffect } from 'react'; +import React, { useContext, createContext, ReactNode, useCallback, useMemo, useState, useEffect, useRef } from 'react'; import { useSearchParams, useRouter, usePathname } from 'next/navigation'; import { v4 as uuidv4 } from 'uuid'; import { useQueueReducer } from '../queue-control/reducer'; @@ -10,7 +10,7 @@ import { urlParamsToSearchParams, searchParamsToUrlParams } from '@/app/lib/url- import { Climb, ParsedBoardRouteParameters, BoardDetails } from '@/app/lib/types'; import { useConnectionSettings } from '../connection-manager/connection-settings-context'; import { usePartyProfile } from '../party-manager/party-profile-context'; -import { ClientQueueEvent } from '@boardsesh/shared-schema'; +import { SubscriptionQueueEvent } from '@boardsesh/shared-schema'; import { saveSessionToHistory } from '../setup-wizard/session-history-panel'; import { usePersistentSession } from '../persistent-session'; import { FavoritesProvider } from '../climb-actions/favorites-batch-context'; @@ -69,6 +69,9 @@ export const GraphQLQueueProvider = ({ parsedParams, boardDetails, children }: G const initialSearchParams = urlParamsToSearchParams(searchParams); const [state, dispatch] = useQueueReducer(initialSearchParams); + // Correlation ID counter for tracking local updates (keeps reducer pure) + const correlationCounterRef = useRef(0); + // Get backend URL from settings const { backendUrl } = useConnectionSettings(); @@ -198,7 +201,7 @@ export const GraphQLQueueProvider = ({ parsedParams, boardDetails, children }: G useEffect(() => { if (!isPersistentSessionActive) return; - const unsubscribe = persistentSession.subscribeToQueueEvents((event: ClientQueueEvent) => { + const unsubscribe = persistentSession.subscribeToQueueEvents((event: SubscriptionQueueEvent) => { switch (event.__typename) { case 'FullSync': dispatch({ @@ -240,6 +243,10 @@ export const GraphQLQueueProvider = ({ parsedParams, boardDetails, children }: G payload: { item: event.currentItem as ClimbQueueItem | null, shouldAddToQueue: false, + isServerEvent: true, + eventClientId: event.clientId || undefined, + myClientId: persistentSession.clientId || undefined, + serverCorrelationId: event.correlationId || undefined, }, }); break; @@ -255,6 +262,61 @@ export const GraphQLQueueProvider = ({ parsedParams, boardDetails, children }: G return unsubscribe; }, [isPersistentSessionActive, persistentSession, dispatch]); + // Cleanup orphaned pending updates (network failures, timeouts) + // This is the ONLY place with time-based logic, isolated from reducer + // FIX: Use ref to persist timestamps across renders (was being recreated every render) + const pendingTimestampsRef = useRef(new Map()); + + useEffect(() => { + if (!isPersistentSessionActive || state.pendingCurrentClimbUpdates.length === 0) { + return; + } + + // Get persisted timestamp map from ref + const pendingTimestamps = pendingTimestampsRef.current; + + // Add timestamps for NEW correlation IDs only + state.pendingCurrentClimbUpdates.forEach(id => { + if (!pendingTimestamps.has(id)) { + pendingTimestamps.set(id, Date.now()); + } + }); + + // Remove timestamps for correlation IDs no longer pending + Array.from(pendingTimestamps.keys()).forEach(id => { + if (!state.pendingCurrentClimbUpdates.includes(id)) { + pendingTimestamps.delete(id); + } + }); + + // Set up cleanup timer for stale entries (>5 seconds) + // Reduced from 10s/5s to 5s/2s to minimize stale update window + const cleanupTimer = setInterval(() => { + const now = Date.now(); + const staleIds: string[] = []; + + pendingTimestamps.forEach((timestamp, id) => { + if (now - timestamp > 5000) { // 5 seconds (reduced from 10s) + staleIds.push(id); + } + }); + + // Batch cleanup to avoid multiple re-renders + if (staleIds.length > 0) { + console.warn('[QueueContext] Cleaning up orphaned pending updates:', staleIds); + dispatch({ + type: 'CLEANUP_PENDING_UPDATES_BATCH', + payload: { correlationIds: staleIds }, + }); + staleIds.forEach(id => pendingTimestamps.delete(id)); + } + }, 2000); // Check every 2 seconds (reduced from 5s) + + return () => { + clearInterval(cleanupTimer); + }; + }, [isPersistentSessionActive, state.pendingCurrentClimbUpdates, dispatch]); + // Use persistent session values when active const clientId = isPersistentSessionActive ? persistentSession.clientId : null; const isLeader = isPersistentSessionActive ? persistentSession.isLeader : false; @@ -679,16 +741,31 @@ export const GraphQLQueueProvider = ({ parsedParams, boardDetails, children }: G }, setCurrentClimbQueueItem: (item: ClimbQueueItem) => { - // Optimistic update + // Generate correlation ID OUTSIDE reducer (keeps reducer pure) + // Format: clientId-counter (e.g., "client-abc123-5") + const correlationId = clientId ? `${clientId}-${++correlationCounterRef.current}` : undefined; + + // Optimistic update with correlation ID dispatch({ type: 'DELTA_UPDATE_CURRENT_CLIMB', - payload: { item, shouldAddToQueue: item.suggested }, + payload: { + item, + shouldAddToQueue: item.suggested, + correlationId, + }, }); // Send to server only if connected if (hasConnected && isPersistentSessionActive) { - persistentSession.setCurrentClimb(item, item.suggested).catch((error) => { + persistentSession.setCurrentClimb(item, item.suggested, correlationId).catch((error) => { console.error('Failed to set current climb:', error); + // Remove from pending on error to prevent blocking future updates + if (correlationId) { + dispatch({ + type: 'CLEANUP_PENDING_UPDATE', + payload: { correlationId }, + }); + } }); } }, diff --git a/packages/web/app/components/graphql-queue/use-queue-session.ts b/packages/web/app/components/graphql-queue/use-queue-session.ts index 141b5399..0e873962 100644 --- a/packages/web/app/components/graphql-queue/use-queue-session.ts +++ b/packages/web/app/components/graphql-queue/use-queue-session.ts @@ -13,7 +13,7 @@ import { SESSION_UPDATES, QUEUE_UPDATES, SessionUser, - ClientQueueEvent, + SubscriptionQueueEvent, SessionEvent, QueueState, } from '@boardsesh/shared-schema'; @@ -74,7 +74,7 @@ export interface UseQueueSessionOptions { avatarUrl?: string; /** Auth token for backend authentication (e.g., NextAuth session token) */ authToken?: string | null; - onQueueEvent?: (event: ClientQueueEvent) => void; + onQueueEvent?: (event: SubscriptionQueueEvent) => void; onSessionEvent?: (event: SessionEvent) => void; } @@ -234,13 +234,14 @@ export function useQueueSession({ if (onQueueEventRef.current && sessionData.queueState) { onQueueEventRef.current({ __typename: 'FullSync', + sequence: sessionData.queueState.sequence, state: sessionData.queueState, }); } // Subscribe to queue updates AFTER joining session if (DEBUG) console.log('[QueueSession] Setting up queue subscription...'); - queueUnsubscribeRef.current = subscribe<{ queueUpdates: ClientQueueEvent }>( + queueUnsubscribeRef.current = subscribe<{ queueUpdates: SubscriptionQueueEvent }>( graphqlClient, { query: QUEUE_UPDATES, diff --git a/packages/web/app/components/persistent-session/board-session-bridge.tsx b/packages/web/app/components/persistent-session/board-session-bridge.tsx index 4a1e2ceb..8666d8f7 100644 --- a/packages/web/app/components/persistent-session/board-session-bridge.tsx +++ b/packages/web/app/components/persistent-session/board-session-bridge.tsx @@ -26,12 +26,19 @@ const BoardSessionBridge: React.FC = ({ const { activeSession, activateSession } = usePersistentSession(); + // Refs to hold stable references to boardDetails and parsedParams + // These values change reference on every render but we only need their current values + const boardDetailsRef = React.useRef(boardDetails); + const parsedParamsRef = React.useRef(parsedParams); + boardDetailsRef.current = boardDetails; + parsedParamsRef.current = parsedParams; + // Activate or update session when we have a session param and board details // This effect handles: // 1. Initial session activation when joining via shared link // 2. Updates when pathname changes (e.g., angle change) while session remains active useEffect(() => { - if (sessionIdFromUrl && boardDetails) { + if (sessionIdFromUrl && boardDetailsRef.current) { // Activate session when URL has session param and either: // - Session ID changed // - Board path changed (e.g., navigating to different angle) @@ -39,8 +46,8 @@ const BoardSessionBridge: React.FC = ({ activateSession({ sessionId: sessionIdFromUrl, boardPath: pathname, - boardDetails, - parsedParams, + boardDetails: boardDetailsRef.current, + parsedParams: parsedParamsRef.current, }); } } @@ -49,8 +56,8 @@ const BoardSessionBridge: React.FC = ({ }, [ sessionIdFromUrl, pathname, - boardDetails, - parsedParams, + // boardDetails and parsedParams removed - accessed via refs to prevent unnecessary reconnections + // Their object references change on every render but the actual values don't affect session activation activeSession?.sessionId, activeSession?.boardPath, activateSession, diff --git a/packages/web/app/components/persistent-session/persistent-session-context.tsx b/packages/web/app/components/persistent-session/persistent-session-context.tsx index dc97fe30..cc622729 100644 --- a/packages/web/app/components/persistent-session/persistent-session-context.tsx +++ b/packages/web/app/components/persistent-session/persistent-session-context.tsx @@ -13,18 +13,50 @@ import { SET_QUEUE, SESSION_UPDATES, QUEUE_UPDATES, + EVENTS_REPLAY, SessionUser, - ClientQueueEvent, + QueueEvent, + SubscriptionQueueEvent, SessionEvent, QueueState, + EventsReplayResponse, } from '@boardsesh/shared-schema'; import { ClimbQueueItem as LocalClimbQueueItem } from '../queue-control/types'; import { BoardDetails, ParsedBoardRouteParameters } from '@/app/lib/types'; import { useWsAuthToken } from '@/app/hooks/use-ws-auth-token'; import { usePartyProfile } from '../party-manager/party-profile-context'; +import { computeQueueStateHash } from '@/app/utils/hash'; const DEBUG = process.env.NODE_ENV === 'development'; +/** + * Transform QueueEvent (from eventsReplay) to SubscriptionQueueEvent format. + * eventsReplay returns server format with `item`, but handlers expect subscription + * format with `addedItem`/`currentItem`. + */ +function transformToSubscriptionEvent(event: QueueEvent): SubscriptionQueueEvent { + switch (event.__typename) { + case 'QueueItemAdded': + return { + __typename: 'QueueItemAdded', + sequence: event.sequence, + addedItem: event.item, + position: event.position, + }; + case 'CurrentClimbChanged': + return { + __typename: 'CurrentClimbChanged', + sequence: event.sequence, + currentItem: event.item, + clientId: event.clientId, + correlationId: event.correlationId, + }; + default: + // Other event types have identical structure + return event as SubscriptionQueueEvent; + } +} + // Default backend URL from environment variable const DEFAULT_BACKEND_URL = process.env.NEXT_PUBLIC_WS_URL || null; @@ -128,12 +160,12 @@ export interface PersistentSessionContextType { // Mutation functions addQueueItem: (item: LocalClimbQueueItem, position?: number) => Promise; removeQueueItem: (uuid: string) => Promise; - setCurrentClimb: (item: LocalClimbQueueItem | null, shouldAddToQueue?: boolean) => Promise; + setCurrentClimb: (item: LocalClimbQueueItem | null, shouldAddToQueue?: boolean, correlationId?: string) => Promise; mirrorCurrentClimb: (mirrored: boolean) => Promise; setQueue: (queue: LocalClimbQueueItem[], currentClimbQueueItem?: LocalClimbQueueItem | null) => Promise; // Event subscription for board-level components - subscribeToQueueEvents: (callback: (event: ClientQueueEvent) => void) => () => void; + subscribeToQueueEvents: (callback: (event: SubscriptionQueueEvent) => void) => () => void; subscribeToSessionEvents: (callback: (event: SessionEvent) => void) => () => void; } @@ -176,6 +208,12 @@ export const PersistentSessionProvider: React.FC<{ children: React.ReactNode }> const [currentClimbQueueItem, setCurrentClimbQueueItem] = useState(null); const [queue, setQueueState] = useState([]); + // Sequence tracking for gap detection and state verification + const [lastReceivedSequence, setLastReceivedSequence] = useState(null); + const [lastReceivedStateHash, setLastReceivedStateHash] = useState(null); + // Ref for synchronous access to sequence (avoids stale closure in handleQueueEvent) + const lastReceivedSequenceRef = useRef(null); + // Local queue state (persists without WebSocket session) const [localQueue, setLocalQueue] = useState([]); const [localCurrentClimbQueueItem, setLocalCurrentClimbQueueItem] = useState(null); @@ -194,11 +232,14 @@ export const PersistentSessionProvider: React.FC<{ children: React.ReactNode }> const sessionUnsubscribeRef = useRef<(() => void) | null>(null); const sessionRef = useRef(null); const isReconnectingRef = useRef(false); + const isConnectingRef = useRef(false); // Prevents duplicate connections during React re-renders const activeSessionRef = useRef(null); const mountedRef = useRef(false); + // Ref to store reconnect handler for use by hash verification + const triggerResyncRef = useRef<(() => void) | null>(null); // Event subscribers - const queueEventSubscribersRef = useRef void>>(new Set()); + const queueEventSubscribersRef = useRef void>>(new Set()); const sessionEventSubscribersRef = useRef void>>(new Set()); // Keep refs in sync @@ -211,7 +252,7 @@ export const PersistentSessionProvider: React.FC<{ children: React.ReactNode }> }, [activeSession]); // Notify queue event subscribers - const notifyQueueSubscribers = useCallback((event: ClientQueueEvent) => { + const notifyQueueSubscribers = useCallback((event: SubscriptionQueueEvent) => { queueEventSubscribersRef.current.forEach((callback) => callback(event)); }, []); @@ -220,12 +261,36 @@ export const PersistentSessionProvider: React.FC<{ children: React.ReactNode }> sessionEventSubscribersRef.current.forEach((callback) => callback(event)); }, []); + // Helper to update sequence in both ref and state + const updateLastReceivedSequence = useCallback((sequence: number) => { + lastReceivedSequenceRef.current = sequence; + setLastReceivedSequence(sequence); + }, []); + // Handle queue events internally - const handleQueueEvent = useCallback((event: ClientQueueEvent) => { + const handleQueueEvent = useCallback((event: SubscriptionQueueEvent) => { + // Sequence validation for gap detection (use ref to avoid stale closure) + const lastSeq = lastReceivedSequenceRef.current; + if (event.__typename !== 'FullSync' && lastSeq !== null) { + const expectedSequence = lastSeq + 1; + if (event.sequence !== expectedSequence) { + console.warn( + `[PersistentSession] Sequence gap detected: expected ${expectedSequence}, got ${event.sequence}. ` + + `This may indicate missed events.` + ); + // Note: Reconnection handles delta sync automatically. + // Mid-session gaps are rare (server skipped sequence or pubsub delivery issue). + // For now, we log and continue - state hash verification will catch drift. + } + } + switch (event.__typename) { case 'FullSync': setQueueState(event.state.queue as LocalClimbQueueItem[]); setCurrentClimbQueueItem(event.state.currentClimbQueueItem as LocalClimbQueueItem | null); + // Reset sequence tracking on full sync + updateLastReceivedSequence(event.sequence); + setLastReceivedStateHash(event.state.stateHash); break; case 'QueueItemAdded': setQueueState((prev) => { @@ -237,9 +302,11 @@ export const PersistentSessionProvider: React.FC<{ children: React.ReactNode }> } return newQueue; }); + updateLastReceivedSequence(event.sequence); break; case 'QueueItemRemoved': setQueueState((prev) => prev.filter((item) => item.uuid !== event.uuid)); + updateLastReceivedSequence(event.sequence); break; case 'QueueReordered': setQueueState((prev) => { @@ -248,9 +315,11 @@ export const PersistentSessionProvider: React.FC<{ children: React.ReactNode }> newQueue.splice(event.newIndex, 0, item); return newQueue; }); + updateLastReceivedSequence(event.sequence); break; case 'CurrentClimbChanged': setCurrentClimbQueueItem(event.currentItem as LocalClimbQueueItem | null); + updateLastReceivedSequence(event.sequence); break; case 'ClimbMirrored': setCurrentClimbQueueItem((prev) => { @@ -263,12 +332,21 @@ export const PersistentSessionProvider: React.FC<{ children: React.ReactNode }> }, }; }); + updateLastReceivedSequence(event.sequence); break; } // Notify external subscribers notifyQueueSubscribers(event); - }, [notifyQueueSubscribers]); + }, [notifyQueueSubscribers, updateLastReceivedSequence]); + + // Keep state hash in sync with local state after delta events + // This ensures hash verification compares against current state, not stale FullSync hash + useEffect(() => { + if (!session) return; // Only update hash when connected + const newHash = computeQueueStateHash(queue, currentClimbQueueItem?.uuid || null); + setLastReceivedStateHash(newHash); + }, [session, queue, currentClimbQueueItem]); // Handle session events internally const handleSessionEvent = useCallback((event: SessionEvent) => { @@ -386,18 +464,96 @@ export const PersistentSessionProvider: React.FC<{ children: React.ReactNode }> isReconnectingRef.current = true; try { if (DEBUG) console.log('[PersistentSession] Reconnecting...'); + + // Save last received sequence before rejoining + // Use ref to avoid stale closure (state variable would capture value from effect creation) + const lastSeq = lastReceivedSequenceRef.current; + const sessionData = await joinSession(graphqlClient); // Double-check mounted state after async operation - if (sessionData && mountedRef.current) { - setSession(sessionData); - if (DEBUG) console.log('[PersistentSession] Reconnected, clientId:', sessionData.clientId); + if (!sessionData || !mountedRef.current) return; + + // Calculate sequence gap + const currentSeq = sessionData.queueState.sequence; + const gap = lastSeq !== null ? currentSeq - lastSeq : 0; + + if (DEBUG) console.log(`[PersistentSession] Reconnected. Last seq: ${lastSeq}, Current seq: ${currentSeq}, Gap: ${gap}`); + + // Attempt delta sync if gap is reasonable + if (gap > 0 && gap <= 100 && lastSeq !== null && sessionId) { + try { + if (DEBUG) console.log(`[PersistentSession] Attempting delta sync for ${gap} missed events...`); + + const response = await execute<{ eventsReplay: EventsReplayResponse }>(graphqlClient, { + query: EVENTS_REPLAY, + variables: { sessionId, sinceSequence: lastSeq }, + }); + + if (response.eventsReplay.events.length > 0) { + if (DEBUG) console.log(`[PersistentSession] Replaying ${response.eventsReplay.events.length} events`); + + // Apply each event in order (transform from server to subscription format) + response.eventsReplay.events.forEach(event => { + handleQueueEvent(transformToSubscriptionEvent(event)); + }); + + if (DEBUG) console.log('[PersistentSession] Delta sync completed successfully'); + } else { + if (DEBUG) console.log('[PersistentSession] No events to replay'); + } + } catch (err) { + console.warn('[PersistentSession] Delta sync failed, falling back to full sync:', err); + // Fall through to full sync below + applyFullSync(sessionData); + } + } else if (gap > 100) { + // Gap too large - use full sync + if (DEBUG) console.log(`[PersistentSession] Gap too large (${gap}), using full sync`); + applyFullSync(sessionData); + } else if (lastSeq === null) { + // First connection after state was reset - apply initial state + if (DEBUG) console.log('[PersistentSession] First connection, applying initial state'); + applyFullSync(sessionData); + } else if (gap === 0) { + // No sequence gap, but verify state is actually in sync via hash + const localHash = computeQueueStateHash(queue, currentClimbQueueItem?.uuid || null); + if (localHash !== sessionData.queueState.stateHash) { + if (DEBUG) console.log('[PersistentSession] Hash mismatch on reconnect despite gap=0, applying full sync'); + applyFullSync(sessionData); + } else { + if (DEBUG) console.log('[PersistentSession] No missed events, already in sync'); + } } + + setSession(sessionData); + if (DEBUG) console.log('[PersistentSession] Reconnection complete, clientId:', sessionData.clientId); } finally { isReconnectingRef.current = false; } } + // Store reconnect handler for use by hash verification + triggerResyncRef.current = handleReconnect; + + // Helper to apply full sync from session data + function applyFullSync(sessionData: any) { + if (sessionData.queueState) { + handleQueueEvent({ + __typename: 'FullSync', + sequence: sessionData.queueState.sequence, + state: sessionData.queueState, + }); + } + } + async function connect() { + // Prevent duplicate connections during React re-renders or Strict Mode + if (isConnectingRef.current) { + if (DEBUG) console.log('[PersistentSession] Connection already in progress, skipping'); + return; + } + isConnectingRef.current = true; + if (DEBUG) console.log('[PersistentSession] Connecting to session:', sessionId); setIsConnecting(true); setError(null); @@ -412,6 +568,7 @@ export const PersistentSessionProvider: React.FC<{ children: React.ReactNode }> if (!mountedRef.current) { graphqlClient.dispose(); + isConnectingRef.current = false; return; } @@ -438,12 +595,13 @@ export const PersistentSessionProvider: React.FC<{ children: React.ReactNode }> if (sessionData.queueState) { handleQueueEvent({ __typename: 'FullSync', + sequence: sessionData.queueState.sequence, state: sessionData.queueState, }); } // Subscribe to queue updates - queueUnsubscribeRef.current = subscribe<{ queueUpdates: ClientQueueEvent }>( + queueUnsubscribeRef.current = subscribe<{ queueUpdates: SubscriptionQueueEvent }>( graphqlClient, { query: QUEUE_UPDATES, variables: { sessionId } }, { @@ -490,8 +648,11 @@ export const PersistentSessionProvider: React.FC<{ children: React.ReactNode }> }, }, ); + // Mark connection as complete + isConnectingRef.current = false; } catch (err) { console.error('[PersistentSession] Connection failed:', err); + isConnectingRef.current = false; if (mountedRef.current) { setError(err instanceof Error ? err : new Error(String(err))); setIsConnecting(false); @@ -508,22 +669,28 @@ export const PersistentSessionProvider: React.FC<{ children: React.ReactNode }> if (DEBUG) console.log('[PersistentSession] Cleaning up connection'); // Set mounted ref to false FIRST to prevent any reconnect callbacks from executing mountedRef.current = false; - - // Clean up subscriptions - if (queueUnsubscribeRef.current) { - queueUnsubscribeRef.current(); - queueUnsubscribeRef.current = null; - } - if (sessionUnsubscribeRef.current) { - sessionUnsubscribeRef.current(); - sessionUnsubscribeRef.current = null; - } - - if (graphqlClient) { - if (sessionRef.current) { - execute(graphqlClient, { query: LEAVE_SESSION }).catch(() => {}); - } - graphqlClient.dispose(); + // Reset connecting ref to allow new connections after cleanup + isConnectingRef.current = false; + + // Capture client reference before cleanup to avoid race conditions + const clientToCleanup = graphqlClient; + graphqlClient = null; // Prevent new operations on this client + + // Clean up subscriptions synchronously + queueUnsubscribeRef.current?.(); + queueUnsubscribeRef.current = null; + sessionUnsubscribeRef.current?.(); + sessionUnsubscribeRef.current = null; + + // Dispose client - use microtask to let pending operations complete + // This prevents "WebSocket already in CLOSING state" errors + if (clientToCleanup) { + Promise.resolve().then(() => { + if (sessionRef.current) { + execute(clientToCleanup, { query: LEAVE_SESSION }).catch(() => {}); + } + clientToCleanup.dispose(); + }); } setClient(null); @@ -534,6 +701,56 @@ export const PersistentSessionProvider: React.FC<{ children: React.ReactNode }> // Note: username, avatarUrl, wsAuthToken are accessed via refs to prevent reconnection on changes }, [activeSession, isAuthLoading, handleQueueEvent, handleSessionEvent]); + // Periodic state hash verification + // Runs every 60 seconds to detect state drift and auto-resync if needed + useEffect(() => { + if (!session || !lastReceivedStateHash || queue.length === 0) { + // Skip if not connected or no state to verify + return; + } + + const verifyInterval = setInterval(() => { + // Compute local state hash + const localHash = computeQueueStateHash(queue, currentClimbQueueItem?.uuid || null); + + if (localHash !== lastReceivedStateHash) { + console.warn( + '[PersistentSession] State hash mismatch detected!', + `Local: ${localHash}, Server: ${lastReceivedStateHash}`, + 'Triggering automatic resync...' + ); + // Trigger resync to get back in sync with server + // The reconnect handler will do delta sync or full sync as appropriate + if (triggerResyncRef.current) { + triggerResyncRef.current(); + } + } else { + if (DEBUG) console.log('[PersistentSession] State hash verification passed'); + } + }, 60000); // Every 60 seconds + + return () => clearInterval(verifyInterval); + }, [session, lastReceivedStateHash, queue, currentClimbQueueItem]); + + // Defensive state consistency check + // If currentClimbQueueItem exists but is not found in queue, trigger resync + useEffect(() => { + if (!session || !currentClimbQueueItem || queue.length === 0) { + return; + } + + const isCurrentInQueue = queue.some(item => item.uuid === currentClimbQueueItem.uuid); + + if (!isCurrentInQueue) { + console.warn( + '[PersistentSession] Current climb not found in queue - state inconsistency detected. Triggering resync.' + ); + if (triggerResyncRef.current) { + triggerResyncRef.current(); + } + } + }, [session, currentClimbQueueItem, queue]); + // Session lifecycle functions const activateSession = useCallback((info: ActiveSessionInfo) => { setActiveSession((prev) => { @@ -613,13 +830,14 @@ export const PersistentSessionProvider: React.FC<{ children: React.ReactNode }> ); const setCurrentClimbMutation = useCallback( - async (item: LocalClimbQueueItem | null, shouldAddToQueue?: boolean) => { + async (item: LocalClimbQueueItem | null, shouldAddToQueue?: boolean, correlationId?: string) => { if (!client || !session) throw new Error('Not connected to session'); await execute(client, { query: SET_CURRENT_CLIMB, variables: { item: item ? toClimbQueueItemInput(item) : null, shouldAddToQueue, + correlationId, }, }); }, @@ -652,7 +870,7 @@ export const PersistentSessionProvider: React.FC<{ children: React.ReactNode }> ); // Event subscription functions - const subscribeToQueueEvents = useCallback((callback: (event: ClientQueueEvent) => void) => { + const subscribeToQueueEvents = useCallback((callback: (event: SubscriptionQueueEvent) => void) => { queueEventSubscribersRef.current.add(callback); return () => { queueEventSubscribersRef.current.delete(callback); diff --git a/packages/web/app/components/queue-control/__tests__/pending-updates-integration.test.ts b/packages/web/app/components/queue-control/__tests__/pending-updates-integration.test.ts new file mode 100644 index 00000000..a94a997e --- /dev/null +++ b/packages/web/app/components/queue-control/__tests__/pending-updates-integration.test.ts @@ -0,0 +1,480 @@ +import { describe, it, expect } from 'vitest'; +import { queueReducer } from '../reducer'; +import { QueueState, QueueAction, ClimbQueueItem } from '../types'; +import { SearchRequestPagination, Climb } from '@/app/lib/types'; + +const mockClimb: Climb = { + uuid: 'climb-1', + setter_username: 'setter1', + name: 'Test Climb', + description: 'A test climb', + frames: '', + angle: 40, + ascensionist_count: 5, + difficulty: '7', + quality_average: '3.5', + stars: 3, + difficulty_error: '', + litUpHoldsMap: {}, + mirrored: false, + benchmark_difficulty: null, + userAscents: 0, + userAttempts: 0 +}; + +const mockSearchParams: SearchRequestPagination = { + page: 1, + pageSize: 20, + gradeAccuracy: 1, + maxGrade: 18, + minAscents: 1, + minGrade: 1, + minRating: 1, + sortBy: 'quality', + sortOrder: 'desc', + name: '', + onlyClassics: false, + onlyTallClimbs: false, + settername: [], + setternameSuggestion: '', + holdsFilter: {}, + hideAttempted: false, + hideCompleted: false, + showOnlyAttempted: false, + showOnlyCompleted: false +}; + +const initialState: QueueState = { + queue: [], + currentClimbQueueItem: null, + climbSearchParams: mockSearchParams, + hasDoneFirstFetch: false, + initialQueueDataReceivedFromPeers: false, + pendingCurrentClimbUpdates: [], + lastReceivedSequence: null, + lastReceivedStateHash: null, +}; + +describe('Pending Updates - Integration Tests', () => { + describe('Rapid navigation scenario', () => { + it('should handle rapid local updates followed by delayed server echoes', () => { + const items: ClimbQueueItem[] = Array.from({ length: 15 }, (_, i) => ({ + climb: { ...mockClimb, uuid: `climb-${i}` }, + addedBy: 'user-1', + uuid: `item-${i}`, + suggested: false, + })); + + let state = initialState; + + // Simulate rapid navigation (15 local updates with correlation IDs) + items.forEach((item, i) => { + state = queueReducer(state, { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { item, shouldAddToQueue: false, isServerEvent: false, correlationId: `client-123-${i}` }, + }); + }); + + // Should have 15 pending updates (correlation IDs) + expect(state.pendingCurrentClimbUpdates).toHaveLength(15); + expect(state.currentClimbQueueItem).toEqual(items[14]); // Last item + + // Simulate server echoes arriving (delayed) + items.forEach((item, index) => { + state = queueReducer(state, { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { item, shouldAddToQueue: false, isServerEvent: true, serverCorrelationId: `client-123-${index}` }, + }); + + // Each echo should be skipped and removed from pending + expect(state.pendingCurrentClimbUpdates).toHaveLength(14 - index); + // Current climb should remain the last item (not overwritten by echo) + expect(state.currentClimbQueueItem).toEqual(items[14]); + }); + + // All pending updates should be cleared + expect(state.pendingCurrentClimbUpdates).toHaveLength(0); + }); + + it('should handle interleaved local and server events', () => { + const item1: ClimbQueueItem = { + climb: { ...mockClimb, uuid: 'climb-1' }, + addedBy: 'user-1', + uuid: 'item-1', + suggested: false, + }; + + const item2: ClimbQueueItem = { + climb: { ...mockClimb, uuid: 'climb-2' }, + addedBy: 'user-1', + uuid: 'item-2', + suggested: false, + }; + + const item3: ClimbQueueItem = { + climb: { ...mockClimb, uuid: 'climb-3' }, + addedBy: 'user-2', // From another user + uuid: 'item-3', + suggested: false, + }; + + let state = initialState; + + // Local update 1 + state = queueReducer(state, { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { item: item1, shouldAddToQueue: false, isServerEvent: false, correlationId: 'client-123-1' }, + }); + expect(state.pendingCurrentClimbUpdates).toEqual(['client-123-1']); + expect(state.currentClimbQueueItem).toEqual(item1); + + // Local update 2 + state = queueReducer(state, { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { item: item2, shouldAddToQueue: false, isServerEvent: false, correlationId: 'client-123-2' }, + }); + expect(state.pendingCurrentClimbUpdates).toEqual(['client-123-1', 'client-123-2']); + expect(state.currentClimbQueueItem).toEqual(item2); + + // Server echo of item1 arrives (should be skipped) + state = queueReducer(state, { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { item: item1, shouldAddToQueue: false, isServerEvent: true, serverCorrelationId: 'client-123-1' }, + }); + expect(state.pendingCurrentClimbUpdates).toEqual(['client-123-2']); // client-123-1 removed + expect(state.currentClimbQueueItem).toEqual(item2); // Still item2 + + // Server event from another user (should be applied) + state = queueReducer(state, { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { item: item3, shouldAddToQueue: false, isServerEvent: true, serverCorrelationId: 'client-456-1', eventClientId: 'client-456', myClientId: 'client-123' }, + }); + expect(state.pendingCurrentClimbUpdates).toEqual(['client-123-2']); // Unchanged + expect(state.currentClimbQueueItem).toEqual(item3); // Updated to item3 + + // Server echo of item2 arrives (should be skipped) + state = queueReducer(state, { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { item: item2, shouldAddToQueue: false, isServerEvent: true, serverCorrelationId: 'client-123-2' }, + }); + expect(state.pendingCurrentClimbUpdates).toEqual([]); // client-123-2 removed + expect(state.currentClimbQueueItem).toEqual(item3); // Still item3 + }); + }); + + describe('Edge cases', () => { + it('should handle array saturation beyond 50 items', () => { + const items: ClimbQueueItem[] = Array.from({ length: 55 }, (_, i) => ({ + climb: { ...mockClimb, uuid: `climb-${i}` }, + addedBy: 'user-1', + uuid: `item-${i}`, + suggested: false, + })); + + let state = initialState; + + // Rapid navigation through 55 items + items.forEach((item, i) => { + state = queueReducer(state, { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { item, shouldAddToQueue: false, isServerEvent: false, correlationId: `client-123-${i}` }, + }); + }); + + // Should be bounded to 50 + expect(state.pendingCurrentClimbUpdates).toHaveLength(50); + // Should contain correlation IDs 5-54 (oldest 5 dropped) + expect(state.pendingCurrentClimbUpdates[0]).toBe('client-123-5'); + expect(state.pendingCurrentClimbUpdates[49]).toBe('client-123-54'); + + // Server echoes of dropped correlation IDs should NOT be skipped + state = queueReducer(state, { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { item: items[0], shouldAddToQueue: false, isServerEvent: true, serverCorrelationId: 'client-123-0' }, + }); + expect(state.currentClimbQueueItem).toEqual(items[0]); // Applied (not in pending) + + // Server echoes of retained items SHOULD be skipped + state = queueReducer(state, { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { item: items[10], shouldAddToQueue: false, isServerEvent: true, serverCorrelationId: 'client-123-10' }, + }); + expect(state.currentClimbQueueItem).toEqual(items[0]); // Skipped (still items[0]) + expect(state.pendingCurrentClimbUpdates.includes('client-123-10')).toBe(false); // Removed from pending + }); + + it('should handle full sync clearing all pending updates', () => { + const items: ClimbQueueItem[] = Array.from({ length: 20 }, (_, i) => ({ + climb: { ...mockClimb, uuid: `climb-${i}` }, + addedBy: 'user-1', + uuid: `item-${i}`, + suggested: false, + })); + + let state = initialState; + + // Add 20 pending updates + items.forEach((item, i) => { + state = queueReducer(state, { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { item, shouldAddToQueue: false, isServerEvent: false, correlationId: `client-123-${i}` }, + }); + }); + + expect(state.pendingCurrentClimbUpdates).toHaveLength(20); + + // Full sync should clear all pending + state = queueReducer(state, { + type: 'INITIAL_QUEUE_DATA', + payload: { + queue: [items[5]], + currentClimbQueueItem: items[5], + }, + }); + + expect(state.pendingCurrentClimbUpdates).toHaveLength(0); + expect(state.queue).toEqual([items[5]]); + }); + }); + + describe('Race conditions', () => { + it('should handle same UUID appearing multiple times (deduplication)', () => { + const item: ClimbQueueItem = { + climb: mockClimb, + addedBy: 'user-1', + uuid: 'item-1', + suggested: false, + }; + + let state = initialState; + + // First local update + state = queueReducer(state, { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { item, shouldAddToQueue: false, isServerEvent: false }, + }); + + // Deduplication check - same item again + const state2 = queueReducer(state, { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { item, shouldAddToQueue: false, isServerEvent: false }, + }); + + // Should be deduplicated (no state change) + expect(state2).toBe(state); + }); + + it('should handle cleanup action during rapid navigation', () => { + const items: ClimbQueueItem[] = Array.from({ length: 5 }, (_, i) => ({ + climb: { ...mockClimb, uuid: `climb-${i}` }, + addedBy: 'user-1', + uuid: `item-${i}`, + suggested: false, + })); + + let state = initialState; + + // Add 5 pending updates + items.forEach((item, i) => { + state = queueReducer(state, { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { item, shouldAddToQueue: false, isServerEvent: false, correlationId: `client-123-${i}` }, + }); + }); + + expect(state.pendingCurrentClimbUpdates).toHaveLength(5); + + // Cleanup item-2 (simulating timeout) + state = queueReducer(state, { + type: 'CLEANUP_PENDING_UPDATE', + payload: { correlationId: 'client-123-2' }, + }); + + expect(state.pendingCurrentClimbUpdates).toEqual(['client-123-0', 'client-123-1', 'client-123-3', 'client-123-4']); + + // Server echo of item-2 should now be applied (not skipped) + state = queueReducer(state, { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { item: items[2], shouldAddToQueue: false, isServerEvent: true, serverCorrelationId: 'client-123-2' }, + }); + + expect(state.currentClimbQueueItem).toEqual(items[2]); // Applied (not in pending anymore) + }); + }); + + describe('ClientId-based echo detection', () => { + const myClientId = 'client-123'; + const otherClientId = 'client-456'; + + it('should skip server events from own client (our echo)', () => { + const item: ClimbQueueItem = { + climb: mockClimb, + addedBy: 'user-1', + uuid: 'item-1', + suggested: false, + }; + + let state = initialState; + + // Local update from our client + state = queueReducer(state, { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { item, shouldAddToQueue: false, isServerEvent: false, correlationId: 'client-123-1' }, + }); + + expect(state.currentClimbQueueItem).toEqual(item); + expect(state.pendingCurrentClimbUpdates).toHaveLength(1); + + // Server echo with our own clientId and correlationId - should be skipped + state = queueReducer(state, { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { + item, + shouldAddToQueue: false, + isServerEvent: true, + eventClientId: myClientId, + myClientId: myClientId, + serverCorrelationId: 'client-123-1', + }, + }); + + // State should not change (echo skipped) + expect(state.currentClimbQueueItem).toEqual(item); + // Pending should be cleared + expect(state.pendingCurrentClimbUpdates).toHaveLength(0); + }); + + it('should apply server events from other clients', () => { + const item1: ClimbQueueItem = { + climb: { ...mockClimb, uuid: 'climb-1' }, + addedBy: 'user-1', + uuid: 'item-1', + suggested: false, + }; + + const item2: ClimbQueueItem = { + climb: { ...mockClimb, uuid: 'climb-2' }, + addedBy: 'user-2', + uuid: 'item-2', + suggested: false, + }; + + let state = initialState; + + // Local update from our client + state = queueReducer(state, { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { item: item1, shouldAddToQueue: false, isServerEvent: false, correlationId: 'client-123-1' }, + }); + + expect(state.currentClimbQueueItem).toEqual(item1); + + // Server event from OTHER client - should be applied + state = queueReducer(state, { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { + item: item2, + shouldAddToQueue: false, + isServerEvent: true, + eventClientId: otherClientId, + myClientId: myClientId, + serverCorrelationId: 'client-456-1', + }, + }); + + // Should update to item2 (from other client) + expect(state.currentClimbQueueItem).toEqual(item2); + }); + + it('should handle the same climb from different clients correctly', () => { + const sharedClimb: ClimbQueueItem = { + climb: mockClimb, + addedBy: 'user-1', + uuid: 'item-shared', + suggested: false, + }; + + let state = initialState; + + // Our client navigates to this climb + state = queueReducer(state, { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { item: sharedClimb, shouldAddToQueue: false, isServerEvent: false, correlationId: 'client-123-1' }, + }); + + expect(state.currentClimbQueueItem).toEqual(sharedClimb); + expect(state.pendingCurrentClimbUpdates).toHaveLength(1); + + // Other client also navigates to same climb - should be applied + state = queueReducer(state, { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { + item: sharedClimb, + shouldAddToQueue: false, + isServerEvent: true, + eventClientId: otherClientId, + myClientId: myClientId, + serverCorrelationId: 'client-456-1', + }, + }); + + // Should still show the climb (applied from other client) + expect(state.currentClimbQueueItem).toEqual(sharedClimb); + // Pending should still have our update + expect(state.pendingCurrentClimbUpdates).toHaveLength(1); + + // Now our echo arrives - should be skipped + state = queueReducer(state, { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { + item: sharedClimb, + shouldAddToQueue: false, + isServerEvent: true, + eventClientId: myClientId, + myClientId: myClientId, + serverCorrelationId: 'client-123-1', + }, + }); + + // Still showing the climb + expect(state.currentClimbQueueItem).toEqual(sharedClimb); + // Pending cleared (our echo removed) + expect(state.pendingCurrentClimbUpdates).toHaveLength(0); + }); + + it('should fallback to pending list when clientIds unavailable', () => { + const item: ClimbQueueItem = { + climb: mockClimb, + addedBy: 'user-1', + uuid: 'item-1', + suggested: false, + }; + + let state = initialState; + + // Local update + state = queueReducer(state, { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { item, shouldAddToQueue: false, isServerEvent: false, correlationId: 'client-123-1' }, + }); + + expect(state.pendingCurrentClimbUpdates).toHaveLength(1); + + // Server event WITHOUT clientId - should use correlation ID if available + state = queueReducer(state, { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { + item, + shouldAddToQueue: false, + isServerEvent: true, + serverCorrelationId: 'client-123-1', + // No clientIds provided + }, + }); + + // Should skip based on correlation ID match + expect(state.currentClimbQueueItem).toEqual(item); + expect(state.pendingCurrentClimbUpdates).toHaveLength(0); // Removed from pending + }); + }); +}); diff --git a/packages/web/app/components/queue-control/__tests__/reducer.test.ts b/packages/web/app/components/queue-control/__tests__/reducer.test.ts index e6b8d1e6..015519f5 100644 --- a/packages/web/app/components/queue-control/__tests__/reducer.test.ts +++ b/packages/web/app/components/queue-control/__tests__/reducer.test.ts @@ -56,7 +56,10 @@ const initialState: QueueState = { currentClimbQueueItem: null, climbSearchParams: mockSearchParams, hasDoneFirstFetch: false, - initialQueueDataReceivedFromPeers: false + initialQueueDataReceivedFromPeers: false, + pendingCurrentClimbUpdates: [], + lastReceivedSequence: null, + lastReceivedStateHash: null, }; describe('queueReducer', () => { @@ -771,4 +774,287 @@ describe('queueReducer', () => { expect(result).toEqual(initialState); }); }); + + describe('DELTA_UPDATE_CURRENT_CLIMB - Server Event Handling', () => { + it('should track pending updates for local actions', () => { + const action: QueueAction = { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { + item: mockClimbQueueItem, + shouldAddToQueue: false, + isServerEvent: false, + correlationId: 'client-123-1', + }, + }; + + const result = queueReducer(initialState, action); + + expect(result.pendingCurrentClimbUpdates).toContain('client-123-1'); + expect(result.pendingCurrentClimbUpdates).toHaveLength(1); + }); + + it('should skip server events that match pending updates', () => { + const stateWithPending: QueueState = { + ...initialState, + pendingCurrentClimbUpdates: ['client-123-1'], + currentClimbQueueItem: null, + }; + + const action: QueueAction = { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { + item: mockClimbQueueItem, + shouldAddToQueue: false, + isServerEvent: true, + serverCorrelationId: 'client-123-1', + }, + }; + + const result = queueReducer(stateWithPending, action); + + // Should not update current climb (echo was skipped) + expect(result.currentClimbQueueItem).toBeNull(); + // Should remove from pending + expect(result.pendingCurrentClimbUpdates).not.toContain('client-123-1'); + expect(result.pendingCurrentClimbUpdates).toHaveLength(0); + }); + + it('should apply server events that do not match pending updates', () => { + const otherItem: ClimbQueueItem = { + ...mockClimbQueueItem, + uuid: 'other-uuid', + }; + + const stateWithPending: QueueState = { + ...initialState, + pendingCurrentClimbUpdates: ['client-123-1'], + }; + + const action: QueueAction = { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { + item: otherItem, + shouldAddToQueue: false, + isServerEvent: true, + serverCorrelationId: 'client-456-1', // Different correlation ID + }, + }; + + const result = queueReducer(stateWithPending, action); + + // Should apply the update (different correlation ID) + expect(result.currentClimbQueueItem).toEqual(otherItem); + // Should not remove from pending (different correlation ID) + expect(result.pendingCurrentClimbUpdates).toContain('client-123-1'); + }); + + it('should maintain pending array bounded to last 50 items', () => { + // Create state with 49 pending correlation IDs + const existingPending = Array.from({ length: 49 }, (_, i) => `client-123-${i}`); + const stateWithPending: QueueState = { + ...initialState, + pendingCurrentClimbUpdates: existingPending, + }; + + const newItem: ClimbQueueItem = { + ...mockClimbQueueItem, + uuid: 'uuid-50', + }; + + const action: QueueAction = { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { + item: newItem, + shouldAddToQueue: false, + isServerEvent: false, + correlationId: 'client-123-50', + }, + }; + + const result = queueReducer(stateWithPending, action); + + expect(result.pendingCurrentClimbUpdates).toHaveLength(50); + expect(result.pendingCurrentClimbUpdates[49]).toBe('client-123-50'); + }); + + it('should drop oldest item when exceeding 50 pending items', () => { + // Create state with 50 pending correlation IDs + const existingPending = Array.from({ length: 50 }, (_, i) => `client-123-${i}`); + const stateWithPending: QueueState = { + ...initialState, + pendingCurrentClimbUpdates: existingPending, + }; + + const newItem: ClimbQueueItem = { + ...mockClimbQueueItem, + uuid: 'uuid-51', + }; + + const action: QueueAction = { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { + item: newItem, + shouldAddToQueue: false, + isServerEvent: false, + correlationId: 'client-123-51', + }, + }; + + const result = queueReducer(stateWithPending, action); + + expect(result.pendingCurrentClimbUpdates).toHaveLength(50); + expect(result.pendingCurrentClimbUpdates[0]).toBe('client-123-1'); // client-123-0 dropped + expect(result.pendingCurrentClimbUpdates[49]).toBe('client-123-51'); + }); + + it('should not track pending for server events', () => { + const action: QueueAction = { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { + item: mockClimbQueueItem, + shouldAddToQueue: false, + isServerEvent: true, + }, + }; + + const result = queueReducer(initialState, action); + + expect(result.pendingCurrentClimbUpdates).toHaveLength(0); + }); + + it('should handle null item from server event', () => { + const stateWithPending: QueueState = { + ...initialState, + pendingCurrentClimbUpdates: ['client-123-1'], + currentClimbQueueItem: mockClimbQueueItem, + }; + + const action: QueueAction = { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { + item: null, + shouldAddToQueue: false, + isServerEvent: true, + }, + }; + + const result = queueReducer(stateWithPending, action); + + expect(result.currentClimbQueueItem).toBeNull(); + // Pending list should still contain the entry (server event without matching correlationId) + expect(result.pendingCurrentClimbUpdates).toContain('client-123-1'); + }); + + it('should add to queue when shouldAddToQueue is true for local action', () => { + const action: QueueAction = { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { + item: mockClimbQueueItem, + shouldAddToQueue: true, + isServerEvent: false, + correlationId: 'client-123-1', + }, + }; + + const result = queueReducer(initialState, action); + + expect(result.queue).toContain(mockClimbQueueItem); + expect(result.pendingCurrentClimbUpdates).toContain('client-123-1'); + }); + + it('should not add duplicate to queue when item already exists', () => { + const stateWithQueue: QueueState = { + ...initialState, + queue: [mockClimbQueueItem], + }; + + const action: QueueAction = { + type: 'DELTA_UPDATE_CURRENT_CLIMB', + payload: { + item: mockClimbQueueItem, + shouldAddToQueue: true, + isServerEvent: false, + correlationId: 'client-123-1', + }, + }; + + const result = queueReducer(stateWithQueue, action); + + expect(result.queue).toHaveLength(1); // No duplicate + expect(result.pendingCurrentClimbUpdates).toContain('client-123-1'); + }); + }); + + describe('INITIAL_QUEUE_DATA - Pending Updates', () => { + it('should clear pending updates on initial queue data sync', () => { + const stateWithPending: QueueState = { + ...initialState, + pendingCurrentClimbUpdates: ['client-123-1', 'client-123-2', 'client-123-3'], + }; + + const action: QueueAction = { + type: 'INITIAL_QUEUE_DATA', + payload: { + queue: [mockClimbQueueItem], + currentClimbQueueItem: mockClimbQueueItem, + }, + }; + + const result = queueReducer(stateWithPending, action); + + expect(result.pendingCurrentClimbUpdates).toHaveLength(0); + expect(result.initialQueueDataReceivedFromPeers).toBe(true); + }); + }); + + describe('CLEANUP_PENDING_UPDATE', () => { + it('should remove specific correlationId from pending updates', () => { + const stateWithPending: QueueState = { + ...initialState, + pendingCurrentClimbUpdates: ['client-123-1', 'client-123-2', 'client-123-3'], + }; + + const action: QueueAction = { + type: 'CLEANUP_PENDING_UPDATE', + payload: { correlationId: 'client-123-2' }, + }; + + const result = queueReducer(stateWithPending, action); + + expect(result.pendingCurrentClimbUpdates).toHaveLength(2); + expect(result.pendingCurrentClimbUpdates).toContain('client-123-1'); + expect(result.pendingCurrentClimbUpdates).toContain('client-123-3'); + expect(result.pendingCurrentClimbUpdates).not.toContain('client-123-2'); + }); + + it('should handle cleanup of non-existent correlationId gracefully', () => { + const stateWithPending: QueueState = { + ...initialState, + pendingCurrentClimbUpdates: ['client-123-1', 'client-123-2'], + }; + + const action: QueueAction = { + type: 'CLEANUP_PENDING_UPDATE', + payload: { correlationId: 'client-999-1' }, + }; + + const result = queueReducer(stateWithPending, action); + + expect(result.pendingCurrentClimbUpdates).toHaveLength(2); + expect(result.pendingCurrentClimbUpdates).toContain('client-123-1'); + expect(result.pendingCurrentClimbUpdates).toContain('client-123-2'); + }); + + it('should handle cleanup on empty pending array', () => { + const action: QueueAction = { + type: 'CLEANUP_PENDING_UPDATE', + payload: { correlationId: 'client-123-1' }, + }; + + const result = queueReducer(initialState, action); + + expect(result.pendingCurrentClimbUpdates).toHaveLength(0); + }); + + }); }); \ No newline at end of file diff --git a/packages/web/app/components/queue-control/reducer.ts b/packages/web/app/components/queue-control/reducer.ts index 37d1d303..f0fa17fc 100644 --- a/packages/web/app/components/queue-control/reducer.ts +++ b/packages/web/app/components/queue-control/reducer.ts @@ -8,6 +8,9 @@ const initialState = (initialSearchParams: SearchRequestPagination): QueueState climbSearchParams: initialSearchParams, hasDoneFirstFetch: false, initialQueueDataReceivedFromPeers: false, + pendingCurrentClimbUpdates: [], + lastReceivedSequence: null, + lastReceivedStateHash: null, }); export function queueReducer(state: QueueState, action: QueueAction): QueueState { @@ -47,6 +50,8 @@ export function queueReducer(state: QueueState, action: QueueAction): QueueState queue: action.payload.queue, currentClimbQueueItem: action.payload.currentClimbQueueItem ?? state.currentClimbQueueItem, initialQueueDataReceivedFromPeers: true, + // Clear pending updates on full sync since we're getting complete server state + pendingCurrentClimbUpdates: [], }; case 'UPDATE_QUEUE': @@ -145,7 +150,45 @@ export function queueReducer(state: QueueState, action: QueueAction): QueueState } case 'DELTA_UPDATE_CURRENT_CLIMB': { - const { item, shouldAddToQueue } = action.payload; + const { + item, + shouldAddToQueue, + isServerEvent, + eventClientId, + myClientId, + correlationId, + serverCorrelationId + } = action.payload; + + // NO MORE TIMESTAMP FILTERING - reducer is now pure! + let pendingUpdates = state.pendingCurrentClimbUpdates; + + // For server events, check if this is an echo of our own update + if (isServerEvent && item) { + // Primary: Correlation ID matching (most precise) + if (serverCorrelationId && pendingUpdates.includes(serverCorrelationId)) { + // This is our own update echoed back - skip it and remove from pending + return { + ...state, + pendingCurrentClimbUpdates: pendingUpdates.filter(id => id !== serverCorrelationId), + }; + } + + // Fallback 1: ClientId-based detection + const isOurOwnEcho = eventClientId && myClientId && eventClientId === myClientId; + if (isOurOwnEcho) { + // Our echo, but without correlation ID - keep pending as-is (will be cleaned by effect) + return { + ...state, + pendingCurrentClimbUpdates: pendingUpdates, + }; + } + + // Note: UUID-based fallback was removed because it was incorrectly skipping + // legitimate server updates. Without correlation ID or clientId from the server, + // we cannot reliably detect echoes. The UI may briefly flash on legacy servers, + // but state will converge correctly. + } // Skip if this is the same item (deduplication for optimistic updates) if (item && state.currentClimbQueueItem?.uuid === item.uuid) { @@ -159,10 +202,39 @@ export function queueReducer(state: QueueState, action: QueueAction): QueueState newQueue = [...state.queue, item]; } + // For local updates, track correlation ID (no timestamp!) + if (!isServerEvent && item && correlationId) { + pendingUpdates = [ + ...pendingUpdates, + correlationId + ].slice(-50); // Still bound to 50 items for safety + } + return { ...state, queue: newQueue, currentClimbQueueItem: item, + pendingCurrentClimbUpdates: pendingUpdates, + }; + } + + case 'CLEANUP_PENDING_UPDATE': { + return { + ...state, + pendingCurrentClimbUpdates: state.pendingCurrentClimbUpdates.filter( + id => id !== action.payload.correlationId + ), + }; + } + + case 'CLEANUP_PENDING_UPDATES_BATCH': { + // Batch cleanup to avoid multiple re-renders + const idsToRemove = new Set(action.payload.correlationIds); + return { + ...state, + pendingCurrentClimbUpdates: state.pendingCurrentClimbUpdates.filter( + id => !idsToRemove.has(id) + ), }; } diff --git a/packages/web/app/components/queue-control/types.ts b/packages/web/app/components/queue-control/types.ts index f9e56029..75d94019 100644 --- a/packages/web/app/components/queue-control/types.ts +++ b/packages/web/app/components/queue-control/types.ts @@ -27,6 +27,12 @@ export interface QueueState { climbSearchParams: SearchRequestPagination; hasDoneFirstFetch: boolean; initialQueueDataReceivedFromPeers: boolean; + // Track locally-initiated current climb updates by correlation ID to skip server echoes + // Correlation IDs enable precise echo detection without time-based logic in the reducer + pendingCurrentClimbUpdates: string[]; + // Sequence tracking for gap detection and state verification + lastReceivedSequence: number | null; + lastReceivedStateHash: string | null; } export type QueueAction = @@ -43,9 +49,11 @@ export type QueueAction = | { type: 'DELTA_ADD_QUEUE_ITEM'; payload: { item: ClimbQueueItem; position?: number } } | { type: 'DELTA_REMOVE_QUEUE_ITEM'; payload: { uuid: string } } | { type: 'DELTA_REORDER_QUEUE_ITEM'; payload: { uuid: string; oldIndex: number; newIndex: number } } - | { type: 'DELTA_UPDATE_CURRENT_CLIMB'; payload: { item: ClimbQueueItem | null; shouldAddToQueue?: boolean } } + | { type: 'DELTA_UPDATE_CURRENT_CLIMB'; payload: { item: ClimbQueueItem | null; shouldAddToQueue?: boolean; isServerEvent?: boolean; eventClientId?: string; myClientId?: string; correlationId?: string; serverCorrelationId?: string } } | { type: 'DELTA_MIRROR_CURRENT_CLIMB'; payload: { mirrored: boolean } } - | { type: 'DELTA_REPLACE_QUEUE_ITEM'; payload: { uuid: string; item: ClimbQueueItem } }; + | { type: 'DELTA_REPLACE_QUEUE_ITEM'; payload: { uuid: string; item: ClimbQueueItem } } + | { type: 'CLEANUP_PENDING_UPDATE'; payload: { correlationId: string } } + | { type: 'CLEANUP_PENDING_UPDATES_BATCH'; payload: { correlationIds: string[] } }; export interface QueueContextType { queue: ClimbQueue; diff --git a/packages/web/app/utils/hash.ts b/packages/web/app/utils/hash.ts new file mode 100644 index 00000000..0ca7c0df --- /dev/null +++ b/packages/web/app/utils/hash.ts @@ -0,0 +1,36 @@ +/** + * FNV-1a hash algorithm implementation + * Fast, non-cryptographic hash for state verification + * Same implementation as backend for consistency + */ +export function fnv1aHash(str: string): string { + const FNV_PRIME = 0x01000193; + const FNV_OFFSET_BASIS = 0x811c9dc5; + + let hash = FNV_OFFSET_BASIS; + for (let i = 0; i < str.length; i++) { + hash ^= str.charCodeAt(i); + hash = Math.imul(hash, FNV_PRIME); + } + + // Convert to unsigned 32-bit integer and hex string + return (hash >>> 0).toString(16).padStart(8, '0'); +} + +/** + * Compute a deterministic hash of queue state + * Used for periodic verification against server state + */ +export function computeQueueStateHash( + queue: Array<{ uuid: string }>, + currentItemUuid: string | null +): string { + // Sort queue UUIDs for deterministic ordering + const queueUuids = queue.map(item => item.uuid).sort().join(','); + const currentUuid = currentItemUuid || 'null'; + + // Create canonical string representation + const canonical = `${queueUuids}|${currentUuid}`; + + return fnv1aHash(canonical); +} diff --git a/packages/web/scripts/test-save-climb.ts b/packages/web/scripts/test-save-climb.ts new file mode 100644 index 00000000..49c7ab51 --- /dev/null +++ b/packages/web/scripts/test-save-climb.ts @@ -0,0 +1,45 @@ +#!/usr/bin/env tsx + +import { saveClimb } from '../app/lib/api-wrappers/aurora/saveClimb'; + +const token = 'a23ee153eaea95706536064a71ebf30df5b0687a'; +const userId = 118684; + +async function testSaveClimb() { + console.log('Testing saveClimb with WEB_HOSTS pattern...\n'); + + try { + const result = await saveClimb('kilter', token, { + layout_id: 1, + setter_id: userId, + name: 'API Test Climb - DELETE ME', + description: 'Testing Aurora API fix - should be safe to delete', + is_draft: true, + frames: 'p1111r15', + angle: 40, + frames_count: 1, + frames_pace: 0, + }); + + console.log('āœ… Success!\n'); + console.log('Result:', JSON.stringify(result, null, 2)); + console.log('\nšŸ“Š Sync Status:', result.synced ? 'āœ… SYNCED' : 'āŒ NOT SYNCED'); + console.log('UUID:', result.uuid); + + if (result.synced) { + console.log('\nšŸŽ‰ Climb successfully synced to Aurora!'); + console.log('Check it at: https://kilterboardapp.com/climbs/' + result.uuid); + } else { + console.log('\nāš ļø Climb saved locally but NOT synced to Aurora'); + console.log('Check server logs for sync error details'); + } + + process.exit(result.synced ? 0 : 1); + } catch (error) { + console.error('āŒ Test failed with error:'); + console.error(error); + process.exit(1); + } +} + +testSaveClimb();