From 614534598720307d0f0e61ac37d59b1d9f2268f7 Mon Sep 17 00:00:00 2001 From: Marco de Jongh Date: Mon, 22 Dec 2025 09:19:42 +1100 Subject: [PATCH] Fix initial queue state not syncing to new users in daemon mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a user joined a party mode session via daemon/websocket, they would not receive the initial queue state - only seeing updates made after joining. Root causes: 1. Race condition where session-joined arrived before QueueProvider subscribed 2. Re-subscription on every queue change replayed stale initial data Changes: - Add request-queue-state message type for explicit queue state requests - Store initial queue data in DaemonContext and provide to late subscribers - Use refs in QueueProvider to stabilize handlePeerData callback - Add hasReceivedInitialData flag to prevent replaying initial data - Fix reconnection loop by using refs for callbacks in connect() 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../connection-manager/daemon-context.tsx | 92 ++++++++++++++++--- app/components/connection-manager/types.ts | 15 +++ .../queue-control/queue-context.tsx | 44 +++++++-- daemon/src/handlers/message.ts | 11 ++- daemon/src/handlers/room.ts | 23 +++++ daemon/src/types/messages.ts | 17 +++- 6 files changed, 178 insertions(+), 24 deletions(-) diff --git a/app/components/connection-manager/daemon-context.tsx b/app/components/connection-manager/daemon-context.tsx index 9ab6460f..4e7ec3cb 100644 --- a/app/components/connection-manager/daemon-context.tsx +++ b/app/components/connection-manager/daemon-context.tsx @@ -89,6 +89,7 @@ export interface DaemonContextType extends PeerContextType { daemonUsers: DaemonUser[]; disconnect: () => void; connectionError: string | null; + requestQueueState: () => void; } export const DaemonContext = createContext(undefined); @@ -123,14 +124,37 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr const lastHeartbeatResponse = useRef(0); const connectionHealth = useRef<'HEALTHY' | 'DEGRADED' | 'POOR'>('HEALTHY'); + // Use refs for callbacks to avoid reconnecting when they change + const handleDaemonMessageRef = useRef<(data: DaemonMessage) => void>(() => {}); + const notifySubscribersRef = useRef<(data: ReceivedPeerData) => void>(() => {}); + const joinSessionRef = useRef<() => void>(() => {}); + + // Store initial queue data to provide to late subscribers + const initialQueueDataRef = useRef(null); + // Generate session ID from pathname const sessionId = pathname.replace(/\//g, '-').slice(1) || 'default'; const subscribeToData = useCallback((callback: (data: ReceivedPeerData) => void) => { const handlerId = uuidv4(); + console.log('[DEBUG] New subscriber registered, id:', handlerId, 'total handlers:', dataHandlers.current.length + 1); dataHandlers.current.push({ id: handlerId, callback }); + // If we have initial queue data, immediately provide it to the new subscriber + if (initialQueueDataRef.current) { + const storedData = initialQueueDataRef.current as { queue?: unknown[] }; + console.log('[DEBUG] Providing stored initial queue data to new subscriber, queue length:', storedData.queue?.length); + try { + callback(initialQueueDataRef.current); + } catch (error) { + console.error('Error providing initial queue data to subscriber:', error); + } + } else { + console.log('[DEBUG] No stored initial queue data yet for new subscriber'); + } + return () => { + console.log('[DEBUG] Subscriber unregistered, id:', handlerId); dataHandlers.current = dataHandlers.current.filter((handler) => handler.id !== handlerId); }; }, []); @@ -145,6 +169,9 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr }); }, []); + // Keep ref updated + notifySubscribersRef.current = notifySubscribers; + const sendData = useCallback( (data: PeerData, _connectionId?: string | null) => { if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN) { @@ -161,10 +188,21 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr [state.clientId], ); + const requestQueueState = useCallback(() => { + console.log('[DEBUG] requestQueueState called, sessionId:', state.sessionId, 'wsReady:', wsRef.current?.readyState === WebSocket.OPEN); + if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN && state.sessionId) { + console.log('[DEBUG] Sending request-queue-state'); + wsRef.current.send(JSON.stringify({ type: 'request-queue-state' })); + } else { + console.log('[DEBUG] NOT sending request-queue-state - conditions not met'); + } + }, [state.sessionId]); + const handleDaemonMessage = useCallback( (data: DaemonMessage) => { switch (data.type) { case 'session-joined': + console.log('[DEBUG] session-joined received, sessionId:', data.sessionId, 'queue length:', data.queue?.length); setState((prev) => ({ ...prev, clientId: data.clientId, @@ -174,15 +212,18 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr leaderId: data.users.find((u) => u.isLeader)?.id || null, })); - // Notify queue context about initial data - if (data.queue.length > 0 || data.currentClimbQueueItem) { - notifySubscribers({ - type: 'initial-queue-data', - queue: data.queue, - currentClimbQueueItem: data.currentClimbQueueItem, - source: 'daemon', - }); - } + // Store initial queue data, even if empty, so subscribers get it + const initialData: ReceivedPeerData = { + type: 'initial-queue-data', + queue: data.queue || [], + currentClimbQueueItem: data.currentClimbQueueItem || null, + source: 'daemon', + }; + initialQueueDataRef.current = initialData; + console.log('[DEBUG] Stored initial queue data, notifying subscribers'); + + // Notify any existing subscribers + notifySubscribers(initialData); break; case 'user-joined': @@ -243,6 +284,9 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr [notifySubscribers], ); + // Keep ref updated + handleDaemonMessageRef.current = handleDaemonMessage; + const joinSession = useCallback(() => { if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN && !hasJoinedSession.current) { const joinMessage = { @@ -262,6 +306,9 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr } }, [sessionId, pathname, username]); + // Keep ref updated + joinSessionRef.current = joinSession; + const connect = useCallback(() => { if (!daemonUrl) return; @@ -285,7 +332,7 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr connectionHealth.current = 'HEALTHY'; // Join session after connection - joinSession(); + joinSessionRef.current(); // Start heartbeat interval (every 10 seconds) if (heartbeatIntervalRef.current) { @@ -316,7 +363,22 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr // Handle daemon-specific messages if (isDaemonMessage(data)) { - handleDaemonMessage(data); + handleDaemonMessageRef.current(data); + return; + } + + // Handle queue state response - relay to subscribers as initial-queue-data + if (data.type === 'queue-state-response') { + console.log('[DEBUG] Received queue-state-response, queue length:', data.queue?.length); + const queueData: ReceivedPeerData = { + type: 'initial-queue-data', + queue: data.queue, + currentClimbQueueItem: data.currentClimbQueueItem, + source: 'daemon', + }; + // Update stored data + initialQueueDataRef.current = queueData; + notifySubscribersRef.current(queueData); return; } @@ -327,7 +389,7 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr }; if (isPeerData(dataWithSource)) { - notifySubscribers(dataWithSource); + notifySubscribersRef.current(dataWithSource); } else { console.warn('Received invalid peer data from daemon:', data.type); } @@ -374,7 +436,7 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr isConnecting: false, })); } - }, [daemonUrl, joinSession, handleDaemonMessage, notifySubscribers]); + }, [daemonUrl]); // Using refs for callbacks to avoid reconnecting when they change const disconnect = useCallback(() => { if (reconnectTimeoutRef.current) { @@ -421,7 +483,8 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr wsRef.current.close(1000, 'Component unmounting'); } }; - }, [daemonUrl, connect]); + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [daemonUrl]); // Only reconnect when daemonUrl changes // Send username update when it changes useEffect(() => { @@ -463,6 +526,7 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr sendData, subscribeToData, connectToPeer: () => {}, // No-op in daemon mode + requestQueueState, // Daemon-specific isDaemonMode: true, diff --git a/app/components/connection-manager/types.ts b/app/components/connection-manager/types.ts index 7dc35be4..6fed161b 100644 --- a/app/components/connection-manager/types.ts +++ b/app/components/connection-manager/types.ts @@ -54,6 +54,7 @@ export interface PeerContextType { currentLeader: PeerId; isLeader: boolean; initiateLeaderElection: () => void; + requestQueueState?: () => void; } export type PeerProviderProps = { @@ -71,6 +72,16 @@ interface RequestUpdateQueueData { type: 'request-update-queue'; } +interface RequestQueueStateData { + type: 'request-queue-state'; +} + +interface QueueStateResponseData { + type: 'queue-state-response'; + queue: ClimbQueue; + currentClimbQueueItem: ClimbQueueItem | null; +} + interface UpdateQueueData { type: 'update-queue' | 'initial-queue-data'; queue: ClimbQueue; @@ -166,6 +177,8 @@ interface ReconnectCoordinationData { // Union type of all possible message types export type PeerData = | RequestUpdateQueueData + | RequestQueueStateData + | QueueStateResponseData | UpdateQueueData | BroadcastOtherPeersData | NewConnectionData @@ -195,12 +208,14 @@ export function isPeerData(data: unknown): data is ReceivedPeerData { // Validate based on type switch (msg.type) { case 'request-update-queue': + case 'request-queue-state': case 'new-connection': return true; // No additional required fields case 'send-peer-info': return !!msg.username; case 'initial-queue-data': case 'update-queue': + case 'queue-state-response': return 'queue' in msg && 'currentClimbQueueItem' in msg; case 'broadcast-other-peers': return Array.isArray(msg.peers); diff --git a/app/components/queue-control/queue-context.tsx b/app/components/queue-control/queue-context.tsx index 00970e67..85514feb 100644 --- a/app/components/queue-control/queue-context.tsx +++ b/app/components/queue-control/queue-context.tsx @@ -1,7 +1,7 @@ // File: QueueContext.tsx 'use client'; -import React, { useContext, createContext, ReactNode, useEffect, useCallback } from 'react'; +import React, { useContext, createContext, ReactNode, useEffect, useCallback, useRef } from 'react'; import { useSearchParams, useRouter, usePathname } from 'next/navigation'; import { v4 as uuidv4 } from 'uuid'; import { useConnection } from '../connection-manager/use-connection'; @@ -43,8 +43,21 @@ export const QueueProvider = ({ parsedParams, children }: QueueContextProps) => const peerId = connection.peerId; const hostId = connection.hostId; const subscribeToData = connection.subscribeToData; + const requestQueueState = connection.requestQueueState; - // Set up queue update handler + // Track if we've already requested initial queue state + const hasRequestedQueueState = useRef(false); + // Track if we've received initial data to prevent replaying on re-subscription + const hasReceivedInitialData = useRef(false); + + // Use refs for values needed in handler to avoid re-subscription on state changes + const stateRef = useRef({ queue: state.queue, currentClimbQueueItem: state.currentClimbQueueItem }); + stateRef.current = { queue: state.queue, currentClimbQueueItem: state.currentClimbQueueItem }; + + const hostIdRef = useRef(hostId); + hostIdRef.current = hostId; + + // Set up queue update handler - use refs to avoid dependency changes const handlePeerData = useCallback( (data: ReceivedPeerData) => { switch (data.type) { @@ -52,18 +65,25 @@ export const QueueProvider = ({ parsedParams, children }: QueueContextProps) => sendData( { type: 'initial-queue-data', - queue: state.queue, - currentClimbQueueItem: state.currentClimbQueueItem, + queue: stateRef.current.queue, + currentClimbQueueItem: stateRef.current.currentClimbQueueItem, }, data.source, ); break; case 'initial-queue-data': + // Only accept initial data once to prevent replaying old state on re-subscription + if (hasReceivedInitialData.current) { + console.log('[DEBUG] Ignoring initial-queue-data, already received initial data'); + return; + } // Accept data from the host OR from the daemon (which is authoritative in daemon mode) - if (hostId !== data.source && data.source !== 'daemon') { - console.log(`Ignoring queue data from ${data.source} since it's not the host(${hostId}) or daemon.`); + if (hostIdRef.current !== data.source && data.source !== 'daemon') { + console.log(`Ignoring queue data from ${data.source} since it's not the host(${hostIdRef.current}) or daemon.`); return; } + hasReceivedInitialData.current = true; + console.log('[DEBUG] Processing initial-queue-data, queue length:', data.queue?.length); dispatch({ type: 'INITIAL_QUEUE_DATA', payload: { @@ -136,14 +156,24 @@ export const QueueProvider = ({ parsedParams, children }: QueueContextProps) => break; } }, - [sendData, state.queue, state.currentClimbQueueItem, hostId, dispatch], + [sendData, dispatch], // Stable dependencies only ); + // Subscribe once on mount - handler uses refs so it's stable useEffect(() => { const unsubscribe = subscribeToData(handlePeerData); return () => unsubscribe(); }, [subscribeToData, handlePeerData]); + // Request initial queue state once when requestQueueState becomes available + useEffect(() => { + if (requestQueueState && !hasRequestedQueueState.current) { + console.log('[DEBUG] QueueProvider requesting initial queue state'); + hasRequestedQueueState.current = true; + requestQueueState(); + } + }, [requestQueueState]); + const { climbSearchResults, suggestedClimbs, diff --git a/daemon/src/handlers/message.ts b/daemon/src/handlers/message.ts index db75f6a7..ce458815 100644 --- a/daemon/src/handlers/message.ts +++ b/daemon/src/handlers/message.ts @@ -1,7 +1,7 @@ import { WebSocket } from 'ws'; import { roomManager } from '../services/room-manager.js'; import { broadcastToSession, sendToClient } from '../services/broadcast.js'; -import { handleJoinSession, handleLeaveSession, handleUpdateUsername } from './room.js'; +import { handleJoinSession, handleLeaveSession, handleUpdateUsername, handleRequestQueueState } from './room.js'; import type { ClientMessage, AddQueueItemMessage, @@ -22,7 +22,10 @@ export async function handleMessage(ws: WebSocket, data: string): Promise try { const parsed = JSON.parse(data); + console.log('[DEBUG] Received message:', parsed.type, JSON.stringify(parsed).slice(0, 200)); + if (!isClientMessage(parsed)) { + console.warn('[DEBUG] Invalid message format - type not recognized:', parsed.type); console.warn('Invalid message format:', data); return; } @@ -38,6 +41,8 @@ export async function handleMessage(ws: WebSocket, data: string): Promise return; } + console.log('[DEBUG] Processing message:', message.type, 'from client:', client.clientId, 'session:', client.sessionId); + switch (message.type) { case 'join-session': await handleJoinSession(ws, message); @@ -51,6 +56,10 @@ export async function handleMessage(ws: WebSocket, data: string): Promise await handleUpdateUsername(ws, message); break; + case 'request-queue-state': + await handleRequestQueueState(ws); + break; + case 'heartbeat': handleHeartbeat(ws, message); break; diff --git a/daemon/src/handlers/room.ts b/daemon/src/handlers/room.ts index f0ae0e03..1eb87eb4 100644 --- a/daemon/src/handlers/room.ts +++ b/daemon/src/handlers/room.ts @@ -10,6 +10,7 @@ import type { LeaderChangedMessage, UpdateUsernameMessage, ErrorMessage, + QueueStateResponseMessage, } from '../types/messages.js'; export async function handleJoinSession( @@ -117,3 +118,25 @@ export async function handleUpdateUsername(ws: WebSocket, message: UpdateUsernam broadcastToSession(client.sessionId, userJoinedMessage, ws); } } + +export async function handleRequestQueueState(ws: WebSocket): Promise { + const client = roomManager.getClient(ws); + console.log('[DEBUG] handleRequestQueueState called, client:', client?.clientId, 'sessionId:', client?.sessionId); + + if (!client || !client.sessionId) { + console.log('[DEBUG] handleRequestQueueState: client not in session, ignoring'); + return; // Silently ignore if not in a session + } + + const queueState = await roomManager.getQueueState(client.sessionId); + console.log('[DEBUG] handleRequestQueueState: got queue state, queue length:', queueState.queue.length); + + const response: QueueStateResponseMessage = { + type: 'queue-state-response', + queue: queueState.queue, + currentClimbQueueItem: queueState.currentClimbQueueItem, + }; + + console.log('[DEBUG] handleRequestQueueState: sending response'); + sendToClient(ws, response); +} diff --git a/daemon/src/types/messages.ts b/daemon/src/types/messages.ts index a8b5dca2..ee891285 100644 --- a/daemon/src/types/messages.ts +++ b/daemon/src/types/messages.ts @@ -100,6 +100,10 @@ export interface HeartbeatMessage { timestamp: number; } +export interface RequestQueueStateMessage { + type: 'request-queue-state'; +} + // ============= Daemon -> Client Messages ============= export interface SessionJoinedMessage { @@ -145,6 +149,12 @@ export interface SessionEndedMessage { newPath?: string; } +export interface QueueStateResponseMessage { + type: 'queue-state-response'; + queue: ClimbQueueItem[]; + currentClimbQueueItem: ClimbQueueItem | null; +} + // Union types export type ClientMessage = | JoinSessionMessage @@ -157,7 +167,8 @@ export type ClientMessage = | UpdateCurrentClimbMessage | MirrorCurrentClimbMessage | ReplaceQueueItemMessage - | HeartbeatMessage; + | HeartbeatMessage + | RequestQueueStateMessage; export type DaemonMessage = | SessionJoinedMessage @@ -173,7 +184,8 @@ export type DaemonMessage = | UpdateQueueMessage | UpdateCurrentClimbMessage | MirrorCurrentClimbMessage - | ReplaceQueueItemMessage; + | ReplaceQueueItemMessage + | QueueStateResponseMessage; // Type guards export function isClientMessage(data: unknown): data is ClientMessage { @@ -193,6 +205,7 @@ export function isClientMessage(data: unknown): data is ClientMessage { 'mirror-current-climb', 'replace-queue-item', 'heartbeat', + 'request-queue-state', ]; return validTypes.includes(msg.type);