Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions apps/server/src/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
MessageId,
ExternalLauncherError,
type OrchestrationThreadShell,
type OrchestrationThread,
TerminalNotRunningError,
type OrchestrationCommand,
type OrchestrationEvent,
Expand Down Expand Up @@ -44,12 +45,14 @@ import * as Deferred from "effect/Deferred";
import * as DateTime from "effect/DateTime";
import * as Duration from "effect/Duration";
import * as Effect from "effect/Effect";
import * as Fiber from "effect/Fiber";
import * as FileSystem from "effect/FileSystem";
import * as Layer from "effect/Layer";
import * as ManagedRuntime from "effect/ManagedRuntime";
import * as Option from "effect/Option";
import * as Path from "effect/Path";
import * as PubSub from "effect/PubSub";
import * as Queue from "effect/Queue";
import * as Stream from "effect/Stream";
import * as TestClock from "effect/testing/TestClock";
import { ChildProcessSpawner } from "effect/unstable/process";
Expand Down Expand Up @@ -5401,6 +5404,96 @@ it.layer(NodeServices.layer)("server router seam", (it) => {
}).pipe(Effect.provide(NodeHttpServer.layerTest)),
);

it.effect("buffers thread detail events emitted while loading the initial thread snapshot", () =>
Effect.gen(function* () {
const now = "2026-01-01T00:00:00.000Z";
const threadId = ThreadId.make("thread-racy-subscribe");
const messageId = MessageId.make("message-racy-first-user");
const snapshotGate = yield* Deferred.make<void>();
const events = yield* Queue.unbounded<OrchestrationEvent>();
const thread = {
id: threadId,
projectId: defaultProjectId,
title: "Racy Subscribe",
modelSelection: defaultModelSelection,
interactionMode: "default" as const,
runtimeMode: "full-access" as const,
branch: null,
worktreePath: null,
createdAt: now,
updatedAt: now,
archivedAt: null,
latestTurn: null,
messages: [],
session: null,
activities: [],
proposedPlans: [],
checkpoints: [],
deletedAt: null,
} satisfies OrchestrationThread;
const userMessageEvent = {
sequence: 2,
eventId: EventId.make("event-racy-first-user"),
aggregateKind: "thread" as const,
aggregateId: threadId,
occurredAt: now,
commandId: CommandId.make("cmd-racy-first-user"),
causationEventId: null,
correlationId: null,
metadata: {},
type: "thread.message-sent" as const,
payload: {
threadId,
messageId,
role: "user" as const,
text: "hi",
attachments: [],
turnId: null,
streaming: false,
createdAt: now,
updatedAt: now,
},
} satisfies OrchestrationEvent;

yield* buildAppUnderTest({
layers: {
projectionSnapshotQuery: {
getThreadDetailById: () =>
Deferred.await(snapshotGate).pipe(Effect.as(Option.some(thread))),
getSnapshotSequence: () => Effect.succeed({ snapshotSequence: 1 }),
},
orchestrationEngine: {
streamDomainEvents: Stream.fromQueue(events),
},
},
});

const wsUrl = yield* getWsServerUrl("/ws");
const items = yield* Effect.scoped(
withWsRpcClient(wsUrl, (client) =>
Effect.gen(function* () {
const fiber = yield* client[ORCHESTRATION_WS_METHODS.subscribeThread]({
threadId,
}).pipe(Stream.take(2), Stream.runCollect, Effect.forkScoped);
yield* Queue.offer(events, userMessageEvent);
yield* Deferred.succeed(snapshotGate, undefined);
return Array.from(yield* Fiber.join(fiber));
}),
),
);

assert.equal(items[0]?.kind, "snapshot");
assert.equal(items[1]?.kind, "event");
if (items[1]?.kind === "event") {
const event = items[1].event;
assert.equal(event.type, "thread.message-sent");
if (event.type === "thread.message-sent") {
assert.equal(event.payload.text, "hi");
}
}
}).pipe(Effect.provide(NodeHttpServer.layerTest)),
);

it.effect("routes websocket rpc orchestration shell snapshot errors", () =>
Effect.gen(function* () {
const projectionError = new PersistenceSqlError({
Expand Down
40 changes: 29 additions & 11 deletions apps/server/src/ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,24 @@ const makeWsRpcLayer = (currentSession: AuthenticatedSession) =>
const enrichOrchestrationEvents = (events: ReadonlyArray<OrchestrationEvent>) =>
Effect.forEach(events, enrichProjectEvent, { concurrency: 4 });

const streamThreadDetailEventsAfterSnapshot = (
threadId: ThreadId,
snapshotSequence: number,
) =>
orchestrationEngine.streamDomainEvents.pipe(
Stream.filter(
(event) =>
event.sequence > snapshotSequence &&
event.aggregateKind === "thread" &&
event.aggregateId === threadId &&
isThreadDetailEvent(event),
),
Stream.map((event) => ({
kind: "event" as const,
event,
})),
);

const toShellStreamEvent = (
event: OrchestrationEvent,
): Effect.Effect<Option.Option<OrchestrationShellStreamEvent>, never, never> => {
Expand Down Expand Up @@ -971,6 +989,15 @@ const makeWsRpcLayer = (currentSession: AuthenticatedSession) =>
observeRpcStreamEffect(
ORCHESTRATION_WS_METHODS.subscribeThread,
Effect.gen(function* () {
const liveEventQueue = yield* Queue.unbounded<{
readonly kind: "event";
readonly event: OrchestrationEvent;
}>();
yield* streamThreadDetailEventsAfterSnapshot(input.threadId, 0).pipe(
Stream.runForEach((item) => Queue.offer(liveEventQueue, item).pipe(Effect.asVoid)),
Effect.forkScoped,
);

const [threadDetail, snapshotSequence] = yield* Effect.all([
projectionSnapshotQuery.getThreadDetailById(input.threadId).pipe(
Effect.mapError(
Expand Down Expand Up @@ -1000,17 +1027,8 @@ const makeWsRpcLayer = (currentSession: AuthenticatedSession) =>
});
}

const liveStream = orchestrationEngine.streamDomainEvents.pipe(
Stream.filter(
(event) =>
event.aggregateKind === "thread" &&
event.aggregateId === input.threadId &&
isThreadDetailEvent(event),
),
Stream.map((event) => ({
kind: "event" as const,
event,
})),
const liveStream = Stream.fromQueue(liveEventQueue).pipe(
Stream.filter((item) => item.event.sequence > snapshotSequence),
);

return Stream.concat(
Expand Down
Loading