diff --git a/apps/server/src/server.test.ts b/apps/server/src/server.test.ts index 205833289ea..f29734cab38 100644 --- a/apps/server/src/server.test.ts +++ b/apps/server/src/server.test.ts @@ -16,6 +16,7 @@ import { MessageId, ExternalLauncherError, type OrchestrationThreadShell, + type OrchestrationThread, TerminalNotRunningError, type OrchestrationCommand, type OrchestrationEvent, @@ -44,12 +45,14 @@ import * as Deferred from "effect/Deferred"; import * as DateTime from "effect/DateTime"; import * as Duration from "effect/Duration"; import * as Effect from "effect/Effect"; +import * as Fiber from "effect/Fiber"; import * as FileSystem from "effect/FileSystem"; import * as Layer from "effect/Layer"; import * as ManagedRuntime from "effect/ManagedRuntime"; import * as Option from "effect/Option"; import * as Path from "effect/Path"; import * as PubSub from "effect/PubSub"; +import * as Queue from "effect/Queue"; import * as Stream from "effect/Stream"; import * as TestClock from "effect/testing/TestClock"; import { ChildProcessSpawner } from "effect/unstable/process"; @@ -5401,6 +5404,96 @@ it.layer(NodeServices.layer)("server router seam", (it) => { }).pipe(Effect.provide(NodeHttpServer.layerTest)), ); + it.effect("buffers thread detail events emitted while loading the initial thread snapshot", () => + Effect.gen(function* () { + const now = "2026-01-01T00:00:00.000Z"; + const threadId = ThreadId.make("thread-racy-subscribe"); + const messageId = MessageId.make("message-racy-first-user"); + const snapshotGate = yield* Deferred.make(); + const events = yield* Queue.unbounded(); + const thread = { + id: threadId, + projectId: defaultProjectId, + title: "Racy Subscribe", + modelSelection: defaultModelSelection, + interactionMode: "default" as const, + runtimeMode: "full-access" as const, + branch: null, + worktreePath: null, + createdAt: now, + updatedAt: now, + archivedAt: null, + latestTurn: null, + messages: [], + session: null, + activities: [], + proposedPlans: [], + checkpoints: [], + deletedAt: null, + } satisfies OrchestrationThread; + const userMessageEvent = { + sequence: 2, + eventId: EventId.make("event-racy-first-user"), + aggregateKind: "thread" as const, + aggregateId: threadId, + occurredAt: now, + commandId: CommandId.make("cmd-racy-first-user"), + causationEventId: null, + correlationId: null, + metadata: {}, + type: "thread.message-sent" as const, + payload: { + threadId, + messageId, + role: "user" as const, + text: "hi", + attachments: [], + turnId: null, + streaming: false, + createdAt: now, + updatedAt: now, + }, + } satisfies OrchestrationEvent; + + yield* buildAppUnderTest({ + layers: { + projectionSnapshotQuery: { + getThreadDetailById: () => + Deferred.await(snapshotGate).pipe(Effect.as(Option.some(thread))), + getSnapshotSequence: () => Effect.succeed({ snapshotSequence: 1 }), + }, + orchestrationEngine: { + streamDomainEvents: Stream.fromQueue(events), + }, + }, + }); + + const wsUrl = yield* getWsServerUrl("/ws"); + const items = yield* Effect.scoped( + withWsRpcClient(wsUrl, (client) => + Effect.gen(function* () { + const fiber = yield* client[ORCHESTRATION_WS_METHODS.subscribeThread]({ + threadId, + }).pipe(Stream.take(2), Stream.runCollect, Effect.forkScoped); + yield* Queue.offer(events, userMessageEvent); + yield* Deferred.succeed(snapshotGate, undefined); + return Array.from(yield* Fiber.join(fiber)); + }), + ), + ); + + assert.equal(items[0]?.kind, "snapshot"); + assert.equal(items[1]?.kind, "event"); + if (items[1]?.kind === "event") { + const event = items[1].event; + assert.equal(event.type, "thread.message-sent"); + if (event.type === "thread.message-sent") { + assert.equal(event.payload.text, "hi"); + } + } + }).pipe(Effect.provide(NodeHttpServer.layerTest)), + ); + it.effect("routes websocket rpc orchestration shell snapshot errors", () => Effect.gen(function* () { const projectionError = new PersistenceSqlError({ diff --git a/apps/server/src/ws.ts b/apps/server/src/ws.ts index 1ad37e7c49b..b09f9cf7ce5 100644 --- a/apps/server/src/ws.ts +++ b/apps/server/src/ws.ts @@ -471,6 +471,24 @@ const makeWsRpcLayer = (currentSession: AuthenticatedSession) => const enrichOrchestrationEvents = (events: ReadonlyArray) => Effect.forEach(events, enrichProjectEvent, { concurrency: 4 }); + const streamThreadDetailEventsAfterSnapshot = ( + threadId: ThreadId, + snapshotSequence: number, + ) => + orchestrationEngine.streamDomainEvents.pipe( + Stream.filter( + (event) => + event.sequence > snapshotSequence && + event.aggregateKind === "thread" && + event.aggregateId === threadId && + isThreadDetailEvent(event), + ), + Stream.map((event) => ({ + kind: "event" as const, + event, + })), + ); + const toShellStreamEvent = ( event: OrchestrationEvent, ): Effect.Effect, never, never> => { @@ -971,6 +989,15 @@ const makeWsRpcLayer = (currentSession: AuthenticatedSession) => observeRpcStreamEffect( ORCHESTRATION_WS_METHODS.subscribeThread, Effect.gen(function* () { + const liveEventQueue = yield* Queue.unbounded<{ + readonly kind: "event"; + readonly event: OrchestrationEvent; + }>(); + yield* streamThreadDetailEventsAfterSnapshot(input.threadId, 0).pipe( + Stream.runForEach((item) => Queue.offer(liveEventQueue, item).pipe(Effect.asVoid)), + Effect.forkScoped, + ); + const [threadDetail, snapshotSequence] = yield* Effect.all([ projectionSnapshotQuery.getThreadDetailById(input.threadId).pipe( Effect.mapError( @@ -1000,17 +1027,8 @@ const makeWsRpcLayer = (currentSession: AuthenticatedSession) => }); } - const liveStream = orchestrationEngine.streamDomainEvents.pipe( - Stream.filter( - (event) => - event.aggregateKind === "thread" && - event.aggregateId === input.threadId && - isThreadDetailEvent(event), - ), - Stream.map((event) => ({ - kind: "event" as const, - event, - })), + const liveStream = Stream.fromQueue(liveEventQueue).pipe( + Stream.filter((item) => item.event.sequence > snapshotSequence), ); return Stream.concat(