Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions apps/sim/app/workspace/providers/socket-provider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { createLogger } from '@sim/logger'
import { useParams } from 'next/navigation'
import { io, type Socket } from 'socket.io-client'
import { getEnv } from '@/lib/core/config/env'
import { useOperationQueueStore } from '@/stores/operation-queue/store'

const logger = createLogger('SocketContext')

Expand Down Expand Up @@ -138,6 +139,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
const [authFailed, setAuthFailed] = useState(false)
const initializedRef = useRef(false)
const socketRef = useRef<Socket | null>(null)
const triggerOfflineMode = useOperationQueueStore((state) => state.triggerOfflineMode)

const params = useParams()
const urlWorkflowId = params?.workflowId as string | undefined
Expand Down Expand Up @@ -341,9 +343,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
})
})

socketInstance.on('join-workflow-error', ({ error }) => {
socketInstance.on('join-workflow-error', ({ error, code }) => {
isRejoiningRef.current = false
logger.error('Failed to join workflow:', error)
logger.error('Failed to join workflow:', { error, code })
if (code === 'ROOM_MANAGER_UNAVAILABLE') {
triggerOfflineMode()
}
})

socketInstance.on('workflow-operation', (data) => {
Expand Down
93 changes: 62 additions & 31 deletions apps/sim/socket/handlers/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,70 @@ import {
import { persistWorkflowOperation } from '@/socket/database/operations'
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import { checkRolePermission } from '@/socket/middleware/permissions'
import type { IRoomManager } from '@/socket/rooms'
import type { IRoomManager, UserSession } from '@/socket/rooms'
import { WorkflowOperationSchema } from '@/socket/validation/schemas'

const logger = createLogger('OperationsHandlers')

export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
socket.on('workflow-operation', async (data) => {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
const emitOperationError = (
forbidden: { type: string; message: string; operation?: string; target?: string },
failed?: { error: string; retryable?: boolean }
) => {
socket.emit('operation-forbidden', forbidden)
if (failed && data?.operationId) {
socket.emit('operation-failed', { operationId: data.operationId, ...failed })
}
}

if (!roomManager.isReady()) {
emitOperationError(
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
{ error: 'Realtime unavailable', retryable: true }
)
return
}

let workflowId: string | null = null
let session: UserSession | null = null

try {
workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
session = await roomManager.getUserSession(socket.id)
} catch (error) {
logger.error('Error loading session for workflow operation:', error)
emitOperationError(
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
{ error: 'Realtime unavailable', retryable: true }
)
return
}

if (!workflowId || !session) {
socket.emit('operation-forbidden', {
type: 'SESSION_ERROR',
message: 'Session expired, please rejoin workflow',
})
if (data?.operationId) {
socket.emit('operation-failed', { operationId: data.operationId, error: 'Session expired' })
}
emitOperationError(
{ type: 'SESSION_ERROR', message: 'Session expired, please rejoin workflow' },
{ error: 'Session expired' }
)
return
}

const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
let hasRoom = false
try {
hasRoom = await roomManager.hasWorkflowRoom(workflowId)
} catch (error) {
logger.error('Error checking workflow room:', error)
emitOperationError(
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
{ error: 'Realtime unavailable', retryable: true }
)
return
}
if (!hasRoom) {
socket.emit('operation-forbidden', {
type: 'ROOM_NOT_FOUND',
message: 'Workflow room not found',
})
if (data?.operationId) {
socket.emit('operation-failed', {
operationId: data.operationId,
error: 'Workflow room not found',
})
}
emitOperationError(
{ type: 'ROOM_NOT_FOUND', message: 'Workflow room not found' },
{ error: 'Workflow room not found' }
)
return
}

Expand Down Expand Up @@ -77,15 +108,15 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
// Check permissions from cached role for all other operations
if (!userPresence) {
logger.warn(`User presence not found for socket ${socket.id}`)
socket.emit('operation-forbidden', {
type: 'SESSION_ERROR',
message: 'User session not found',
operation,
target,
})
if (operationId) {
socket.emit('operation-failed', { operationId, error: 'User session not found' })
}
emitOperationError(
{
type: 'SESSION_ERROR',
message: 'User session not found',
operation,
target,
},
{ error: 'User session not found' }
)
return
}

Expand All @@ -97,7 +128,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
logger.warn(
`User ${session.userId} (role: ${userPresence.role}) forbidden from ${operation} on ${target}`
)
socket.emit('operation-forbidden', {
emitOperationError({
type: 'INSUFFICIENT_PERMISSIONS',
message: `${permissionCheck.reason} on '${target}'`,
operation,
Expand Down
15 changes: 15 additions & 0 deletions apps/sim/socket/handlers/subblocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,21 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
operationId,
} = data

if (!roomManager.isReady()) {
socket.emit('operation-forbidden', {
type: 'ROOM_MANAGER_UNAVAILABLE',
message: 'Realtime unavailable',
})
if (operationId) {
socket.emit('operation-failed', {
operationId,
error: 'Realtime unavailable',
retryable: true,
})
}
return
}

try {
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
Expand Down
15 changes: 15 additions & 0 deletions apps/sim/socket/handlers/variables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
socket.on('variable-update', async (data) => {
const { workflowId: payloadWorkflowId, variableId, field, value, timestamp, operationId } = data

if (!roomManager.isReady()) {
socket.emit('operation-forbidden', {
type: 'ROOM_MANAGER_UNAVAILABLE',
message: 'Realtime unavailable',
})
if (operationId) {
socket.emit('operation-failed', {
operationId,
error: 'Realtime unavailable',
retryable: true,
})
}
return
}

try {
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
Expand Down
19 changes: 18 additions & 1 deletion apps/sim/socket/handlers/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
return
}

if (!roomManager.isReady()) {
logger.warn(`Join workflow rejected: Room manager unavailable`)
socket.emit('join-workflow-error', {
error: 'Realtime unavailable',
code: 'ROOM_MANAGER_UNAVAILABLE',
})
return
}

logger.info(`Join workflow request from ${userId} (${userName}) for workflow ${workflowId}`)

// Verify workflow access
Expand Down Expand Up @@ -128,12 +137,20 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
// Undo socket.join and room manager entry if any operation failed
socket.leave(workflowId)
await roomManager.removeUserFromRoom(socket.id)
socket.emit('join-workflow-error', { error: 'Failed to join workflow' })
const isReady = roomManager.isReady()
socket.emit('join-workflow-error', {
error: isReady ? 'Failed to join workflow' : 'Realtime unavailable',
code: isReady ? undefined : 'ROOM_MANAGER_UNAVAILABLE',
})
}
})

socket.on('leave-workflow', async () => {
try {
if (!roomManager.isReady()) {
return
}

const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)

Expand Down
4 changes: 4 additions & 0 deletions apps/sim/socket/rooms/memory-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ export class MemoryRoomManager implements IRoomManager {
logger.info('MemoryRoomManager initialized (single-pod mode)')
}

isReady(): boolean {
return true
}

async shutdown(): Promise<void> {
this.workflowRooms.clear()
this.socketToWorkflow.clear()
Expand Down
20 changes: 9 additions & 11 deletions apps/sim/socket/rooms/redis-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,6 @@ export class RedisRoomManager implements IRoomManager {
this._io = io
this.redis = createClient({
url: redisUrl,
socket: {
reconnectStrategy: (retries) => {
if (retries > 10) {
logger.error('Redis reconnection failed after 10 attempts')
return new Error('Redis reconnection failed')
}
const delay = Math.min(retries * 100, 3000)
logger.warn(`Redis reconnecting in ${delay}ms (attempt ${retries})`)
return delay
},
},
})

this.redis.on('error', (err) => {
Expand All @@ -122,12 +111,21 @@ export class RedisRoomManager implements IRoomManager {
logger.info('Redis client ready')
this.isConnected = true
})

this.redis.on('end', () => {
logger.warn('Redis client connection closed')
this.isConnected = false
})
}

get io(): Server {
return this._io
}

isReady(): boolean {
return this.isConnected
}

async initialize(): Promise<void> {
if (this.isConnected) return

Expand Down
5 changes: 5 additions & 0 deletions apps/sim/socket/rooms/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ export interface IRoomManager {
*/
initialize(): Promise<void>

/**
* Whether the room manager is ready to serve requests
*/
isReady(): boolean

/**
* Clean shutdown
*/
Expand Down
5 changes: 5 additions & 0 deletions apps/sim/socket/routes/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ export function createHttpHandler(roomManager: IRoomManager, logger: Logger) {
res.end(JSON.stringify({ error: authResult.error }))
return
}

if (!roomManager.isReady()) {
sendError(res, 'Room manager unavailable', 503)
return
}
}

// Handle workflow deletion notifications from the main API
Expand Down