Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 78 additions & 14 deletions app/components/connection-manager/daemon-context.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ export interface DaemonContextType extends PeerContextType {
daemonUsers: DaemonUser[];
disconnect: () => void;
connectionError: string | null;
requestQueueState: () => void;
}

export const DaemonContext = createContext<DaemonContextType | undefined>(undefined);
Expand Down Expand Up @@ -123,14 +124,37 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr
const lastHeartbeatResponse = useRef<number>(0);
const connectionHealth = useRef<'HEALTHY' | 'DEGRADED' | 'POOR'>('HEALTHY');

// Use refs for callbacks to avoid reconnecting when they change
const handleDaemonMessageRef = useRef<(data: DaemonMessage) => void>(() => {});
const notifySubscribersRef = useRef<(data: ReceivedPeerData) => void>(() => {});
const joinSessionRef = useRef<() => void>(() => {});

// Store initial queue data to provide to late subscribers
const initialQueueDataRef = useRef<ReceivedPeerData | null>(null);

// Generate session ID from pathname
const sessionId = pathname.replace(/\//g, '-').slice(1) || 'default';

const subscribeToData = useCallback((callback: (data: ReceivedPeerData) => void) => {
const handlerId = uuidv4();
console.log('[DEBUG] New subscriber registered, id:', handlerId, 'total handlers:', dataHandlers.current.length + 1);
dataHandlers.current.push({ id: handlerId, callback });

// If we have initial queue data, immediately provide it to the new subscriber
if (initialQueueDataRef.current) {
const storedData = initialQueueDataRef.current as { queue?: unknown[] };
console.log('[DEBUG] Providing stored initial queue data to new subscriber, queue length:', storedData.queue?.length);
try {
callback(initialQueueDataRef.current);
} catch (error) {
console.error('Error providing initial queue data to subscriber:', error);
}
} else {
console.log('[DEBUG] No stored initial queue data yet for new subscriber');
}

return () => {
console.log('[DEBUG] Subscriber unregistered, id:', handlerId);
dataHandlers.current = dataHandlers.current.filter((handler) => handler.id !== handlerId);
};
}, []);
Expand All @@ -145,6 +169,9 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr
});
}, []);

// Keep ref updated
notifySubscribersRef.current = notifySubscribers;

const sendData = useCallback(
(data: PeerData, _connectionId?: string | null) => {
if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN) {
Expand All @@ -161,10 +188,21 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr
[state.clientId],
);

const requestQueueState = useCallback(() => {
console.log('[DEBUG] requestQueueState called, sessionId:', state.sessionId, 'wsReady:', wsRef.current?.readyState === WebSocket.OPEN);
if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN && state.sessionId) {
console.log('[DEBUG] Sending request-queue-state');
wsRef.current.send(JSON.stringify({ type: 'request-queue-state' }));
} else {
console.log('[DEBUG] NOT sending request-queue-state - conditions not met');
}
}, [state.sessionId]);

const handleDaemonMessage = useCallback(
(data: DaemonMessage) => {
switch (data.type) {
case 'session-joined':
console.log('[DEBUG] session-joined received, sessionId:', data.sessionId, 'queue length:', data.queue?.length);
setState((prev) => ({
...prev,
clientId: data.clientId,
Expand All @@ -174,15 +212,18 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr
leaderId: data.users.find((u) => u.isLeader)?.id || null,
}));

// Notify queue context about initial data
if (data.queue.length > 0 || data.currentClimbQueueItem) {
notifySubscribers({
type: 'initial-queue-data',
queue: data.queue,
currentClimbQueueItem: data.currentClimbQueueItem,
source: 'daemon',
});
}
// Store initial queue data, even if empty, so subscribers get it
const initialData: ReceivedPeerData = {
type: 'initial-queue-data',
queue: data.queue || [],
currentClimbQueueItem: data.currentClimbQueueItem || null,
source: 'daemon',
};
initialQueueDataRef.current = initialData;
console.log('[DEBUG] Stored initial queue data, notifying subscribers');

// Notify any existing subscribers
notifySubscribers(initialData);
break;

case 'user-joined':
Expand Down Expand Up @@ -243,6 +284,9 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr
[notifySubscribers],
);

// Keep ref updated
handleDaemonMessageRef.current = handleDaemonMessage;

const joinSession = useCallback(() => {
if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN && !hasJoinedSession.current) {
const joinMessage = {
Expand All @@ -262,6 +306,9 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr
}
}, [sessionId, pathname, username]);

// Keep ref updated
joinSessionRef.current = joinSession;

const connect = useCallback(() => {
if (!daemonUrl) return;

Expand All @@ -285,7 +332,7 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr
connectionHealth.current = 'HEALTHY';

// Join session after connection
joinSession();
joinSessionRef.current();

// Start heartbeat interval (every 10 seconds)
if (heartbeatIntervalRef.current) {
Expand Down Expand Up @@ -316,7 +363,22 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr

// Handle daemon-specific messages
if (isDaemonMessage(data)) {
handleDaemonMessage(data);
handleDaemonMessageRef.current(data);
return;
}

// Handle queue state response - relay to subscribers as initial-queue-data
if (data.type === 'queue-state-response') {
console.log('[DEBUG] Received queue-state-response, queue length:', data.queue?.length);
const queueData: ReceivedPeerData = {
type: 'initial-queue-data',
queue: data.queue,
currentClimbQueueItem: data.currentClimbQueueItem,
source: 'daemon',
};
// Update stored data
initialQueueDataRef.current = queueData;
notifySubscribersRef.current(queueData);
return;
}

Expand All @@ -327,7 +389,7 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr
};

if (isPeerData(dataWithSource)) {
notifySubscribers(dataWithSource);
notifySubscribersRef.current(dataWithSource);
} else {
console.warn('Received invalid peer data from daemon:', data.type);
}
Expand Down Expand Up @@ -374,7 +436,7 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr
isConnecting: false,
}));
}
}, [daemonUrl, joinSession, handleDaemonMessage, notifySubscribers]);
}, [daemonUrl]); // Using refs for callbacks to avoid reconnecting when they change

const disconnect = useCallback(() => {
if (reconnectTimeoutRef.current) {
Expand Down Expand Up @@ -421,7 +483,8 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr
wsRef.current.close(1000, 'Component unmounting');
}
};
}, [daemonUrl, connect]);
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [daemonUrl]); // Only reconnect when daemonUrl changes

// Send username update when it changes
useEffect(() => {
Expand Down Expand Up @@ -463,6 +526,7 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr
sendData,
subscribeToData,
connectToPeer: () => {}, // No-op in daemon mode
requestQueueState,

// Daemon-specific
isDaemonMode: true,
Expand Down
15 changes: 15 additions & 0 deletions app/components/connection-manager/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export interface PeerContextType {
currentLeader: PeerId;
isLeader: boolean;
initiateLeaderElection: () => void;
requestQueueState?: () => void;
}

export type PeerProviderProps = {
Expand All @@ -71,6 +72,16 @@ interface RequestUpdateQueueData {
type: 'request-update-queue';
}

interface RequestQueueStateData {
type: 'request-queue-state';
}

interface QueueStateResponseData {
type: 'queue-state-response';
queue: ClimbQueue;
currentClimbQueueItem: ClimbQueueItem | null;
}

interface UpdateQueueData {
type: 'update-queue' | 'initial-queue-data';
queue: ClimbQueue;
Expand Down Expand Up @@ -166,6 +177,8 @@ interface ReconnectCoordinationData {
// Union type of all possible message types
export type PeerData =
| RequestUpdateQueueData
| RequestQueueStateData
| QueueStateResponseData
| UpdateQueueData
| BroadcastOtherPeersData
| NewConnectionData
Expand Down Expand Up @@ -195,12 +208,14 @@ export function isPeerData(data: unknown): data is ReceivedPeerData {
// Validate based on type
switch (msg.type) {
case 'request-update-queue':
case 'request-queue-state':
case 'new-connection':
return true; // No additional required fields
case 'send-peer-info':
return !!msg.username;
case 'initial-queue-data':
case 'update-queue':
case 'queue-state-response':
return 'queue' in msg && 'currentClimbQueueItem' in msg;
case 'broadcast-other-peers':
return Array.isArray(msg.peers);
Expand Down
44 changes: 37 additions & 7 deletions app/components/queue-control/queue-context.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// File: QueueContext.tsx
'use client';

import React, { useContext, createContext, ReactNode, useEffect, useCallback } from 'react';
import React, { useContext, createContext, ReactNode, useEffect, useCallback, useRef } from 'react';
import { useSearchParams, useRouter, usePathname } from 'next/navigation';
import { v4 as uuidv4 } from 'uuid';
import { useConnection } from '../connection-manager/use-connection';
Expand Down Expand Up @@ -43,27 +43,47 @@ export const QueueProvider = ({ parsedParams, children }: QueueContextProps) =>
const peerId = connection.peerId;
const hostId = connection.hostId;
const subscribeToData = connection.subscribeToData;
const requestQueueState = connection.requestQueueState;

// Set up queue update handler
// Track if we've already requested initial queue state
const hasRequestedQueueState = useRef(false);
// Track if we've received initial data to prevent replaying on re-subscription
const hasReceivedInitialData = useRef(false);

// Use refs for values needed in handler to avoid re-subscription on state changes
const stateRef = useRef({ queue: state.queue, currentClimbQueueItem: state.currentClimbQueueItem });
stateRef.current = { queue: state.queue, currentClimbQueueItem: state.currentClimbQueueItem };

const hostIdRef = useRef(hostId);
hostIdRef.current = hostId;

// Set up queue update handler - use refs to avoid dependency changes
const handlePeerData = useCallback(
(data: ReceivedPeerData) => {
switch (data.type) {
case 'new-connection':
sendData(
{
type: 'initial-queue-data',
queue: state.queue,
currentClimbQueueItem: state.currentClimbQueueItem,
queue: stateRef.current.queue,
currentClimbQueueItem: stateRef.current.currentClimbQueueItem,
},
data.source,
);
break;
case 'initial-queue-data':
// Only accept initial data once to prevent replaying old state on re-subscription
if (hasReceivedInitialData.current) {
console.log('[DEBUG] Ignoring initial-queue-data, already received initial data');
return;
}
// Accept data from the host OR from the daemon (which is authoritative in daemon mode)
if (hostId !== data.source && data.source !== 'daemon') {
console.log(`Ignoring queue data from ${data.source} since it's not the host(${hostId}) or daemon.`);
if (hostIdRef.current !== data.source && data.source !== 'daemon') {
console.log(`Ignoring queue data from ${data.source} since it's not the host(${hostIdRef.current}) or daemon.`);
return;
}
hasReceivedInitialData.current = true;
console.log('[DEBUG] Processing initial-queue-data, queue length:', data.queue?.length);
dispatch({
type: 'INITIAL_QUEUE_DATA',
payload: {
Expand Down Expand Up @@ -136,14 +156,24 @@ export const QueueProvider = ({ parsedParams, children }: QueueContextProps) =>
break;
}
},
[sendData, state.queue, state.currentClimbQueueItem, hostId, dispatch],
[sendData, dispatch], // Stable dependencies only
);

// Subscribe once on mount - handler uses refs so it's stable
useEffect(() => {
const unsubscribe = subscribeToData(handlePeerData);
return () => unsubscribe();
}, [subscribeToData, handlePeerData]);

// Request initial queue state once when requestQueueState becomes available
useEffect(() => {
if (requestQueueState && !hasRequestedQueueState.current) {
console.log('[DEBUG] QueueProvider requesting initial queue state');
hasRequestedQueueState.current = true;
requestQueueState();
}
}, [requestQueueState]);

const {
climbSearchResults,
suggestedClimbs,
Expand Down
11 changes: 10 additions & 1 deletion daemon/src/handlers/message.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { WebSocket } from 'ws';
import { roomManager } from '../services/room-manager.js';
import { broadcastToSession, sendToClient } from '../services/broadcast.js';
import { handleJoinSession, handleLeaveSession, handleUpdateUsername } from './room.js';
import { handleJoinSession, handleLeaveSession, handleUpdateUsername, handleRequestQueueState } from './room.js';
import type {
ClientMessage,
AddQueueItemMessage,
Expand All @@ -22,7 +22,10 @@ export async function handleMessage(ws: WebSocket, data: string): Promise<void>

try {
const parsed = JSON.parse(data);
console.log('[DEBUG] Received message:', parsed.type, JSON.stringify(parsed).slice(0, 200));

if (!isClientMessage(parsed)) {
console.warn('[DEBUG] Invalid message format - type not recognized:', parsed.type);
console.warn('Invalid message format:', data);
return;
}
Expand All @@ -38,6 +41,8 @@ export async function handleMessage(ws: WebSocket, data: string): Promise<void>
return;
}

console.log('[DEBUG] Processing message:', message.type, 'from client:', client.clientId, 'session:', client.sessionId);

switch (message.type) {
case 'join-session':
await handleJoinSession(ws, message);
Expand All @@ -51,6 +56,10 @@ export async function handleMessage(ws: WebSocket, data: string): Promise<void>
await handleUpdateUsername(ws, message);
break;

case 'request-queue-state':
await handleRequestQueueState(ws);
break;

case 'heartbeat':
handleHeartbeat(ws, message);
break;
Expand Down
Loading
Loading