Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/agent/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ export class Agent {

async cleanup(): Promise<void> {
if (this.sessionLogWriter && this.taskRunId) {
await this.sessionLogWriter.flush(this.taskRunId);
await this.sessionLogWriter.flush(this.taskRunId, { coalesce: true });
}
await this.acpConnection?.cleanup();
}
Expand Down
17 changes: 13 additions & 4 deletions packages/agent/src/server/agent-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,9 @@ export class AgentServer {
// against async log persistence to object storage.
let assistantMessage: string | undefined;
try {
await this.session.logWriter.flush(this.session.payload.run_id);
await this.session.logWriter.flush(this.session.payload.run_id, {
coalesce: true,
});
assistantMessage = this.session.logWriter.getFullAgentResponse(
this.session.payload.run_id,
);
Expand Down Expand Up @@ -747,6 +749,9 @@ export class AgentServer {
});

this.logger.info("Session initialized successfully");
this.logger.info(
`Agent version: ${this.config.version ?? packageJson.version}`,
);

// Signal in_progress so the UI can start polling for updates
this.posthogAPI
Expand Down Expand Up @@ -1111,7 +1116,9 @@ Important:
): Promise<void> {
if (this.session?.payload.run_id === payload.run_id) {
try {
await this.session.logWriter.flush(payload.run_id);
await this.session.logWriter.flush(payload.run_id, {
coalesce: true,
});
} catch (error) {
this.logger.warn("Failed to flush session logs before completion", {
taskId: payload.task_id,
Expand Down Expand Up @@ -1270,7 +1277,7 @@ Important:
}

try {
await this.session.logWriter.flush(payload.run_id);
await this.session.logWriter.flush(payload.run_id, { coalesce: true });
} catch (error) {
this.logger.warn("Failed to flush logs before Slack relay", {
taskId: payload.task_id,
Expand Down Expand Up @@ -1473,7 +1480,9 @@ Important:
}

try {
await this.session.logWriter.flush(this.session.payload.run_id);
await this.session.logWriter.flush(this.session.payload.run_id, {
coalesce: true,
});
} catch (error) {
this.logger.error("Failed to flush session logs", error);
}
Expand Down
216 changes: 216 additions & 0 deletions packages/agent/src/session-log-writer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,222 @@ describe("SessionLogWriter", () => {
});
});

describe("_doFlush does not prematurely coalesce", () => {
it("does not coalesce buffered chunks during a timed flush", async () => {
const sessionId = "s1";
logWriter.register(sessionId, { taskId: "t1", runId: sessionId });

// Buffer some chunks (no non-chunk event to trigger coalescing)
logWriter.appendRawLine(
sessionId,
makeSessionUpdate("agent_message_chunk", {
content: { type: "text", text: "Hello " },
}),
);
logWriter.appendRawLine(
sessionId,
makeSessionUpdate("agent_message_chunk", {
content: { type: "text", text: "world" },
}),
);

// Flush without any non-chunk event arriving — simulates
// the 500ms debounce timer firing mid-stream
await logWriter.flush(sessionId);

// No entries should have been sent — chunks are still buffered
expect(mockAppendLog).not.toHaveBeenCalled();

// Now a non-chunk event arrives, triggering natural coalescing
logWriter.appendRawLine(
sessionId,
makeSessionUpdate("usage_update", { used: 100 }),
);

await logWriter.flush(sessionId);

expect(mockAppendLog).toHaveBeenCalledTimes(1);
const entries: StoredNotification[] = mockAppendLog.mock.calls[0][2];
expect(entries).toHaveLength(2); // coalesced agent_message + usage_update
const coalesced = entries[0].notification;
expect(coalesced.params?.update).toEqual({
sessionUpdate: "agent_message",
content: { type: "text", text: "Hello world" },
});
});
});

describe("flushAll coalesces on shutdown", () => {
it("coalesces remaining chunks before flushing", async () => {
const sessionId = "s1";
logWriter.register(sessionId, { taskId: "t1", runId: sessionId });

logWriter.appendRawLine(
sessionId,
makeSessionUpdate("agent_message_chunk", {
content: { type: "text", text: "partial response" },
}),
);

await logWriter.flushAll();

expect(mockAppendLog).toHaveBeenCalledTimes(1);
const entries: StoredNotification[] = mockAppendLog.mock.calls[0][2];
expect(entries).toHaveLength(1);
const coalesced = entries[0].notification;
expect(coalesced.params?.update).toEqual({
sessionUpdate: "agent_message",
content: { type: "text", text: "partial response" },
});
});
});

describe("flush with coalesce option", () => {
it("drains chunk buffer when coalesce is true", async () => {
const sessionId = "s1";
logWriter.register(sessionId, { taskId: "t1", runId: sessionId });

logWriter.appendRawLine(
sessionId,
makeSessionUpdate("agent_message_chunk", {
content: { type: "text", text: "complete text" },
}),
);

await logWriter.flush(sessionId, { coalesce: true });

expect(mockAppendLog).toHaveBeenCalledTimes(1);
const entries: StoredNotification[] = mockAppendLog.mock.calls[0][2];
const coalesced = entries[0].notification;
expect(coalesced.params?.update).toEqual({
sessionUpdate: "agent_message",
content: { type: "text", text: "complete text" },
});
});

it("does not coalesce when coalesce is false", async () => {
const sessionId = "s1";
logWriter.register(sessionId, { taskId: "t1", runId: sessionId });

logWriter.appendRawLine(
sessionId,
makeSessionUpdate("agent_message_chunk", {
content: { type: "text", text: "buffered" },
}),
);

await logWriter.flush(sessionId, { coalesce: false });

expect(mockAppendLog).not.toHaveBeenCalled();
});
});

describe("direct agent_message supersedes chunks", () => {
it("discards buffered chunks when a direct agent_message arrives", async () => {
const sessionId = "s1";
logWriter.register(sessionId, { taskId: "t1", runId: sessionId });

// Buffer partial chunks
logWriter.appendRawLine(
sessionId,
makeSessionUpdate("agent_message_chunk", {
content: { type: "text", text: "partial " },
}),
);
logWriter.appendRawLine(
sessionId,
makeSessionUpdate("agent_message_chunk", {
content: { type: "text", text: "text" },
}),
);

// Direct agent_message arrives — authoritative full text
logWriter.appendRawLine(
sessionId,
makeSessionUpdate("agent_message", {
content: { type: "text", text: "complete full response" },
}),
);

await logWriter.flush(sessionId);

expect(mockAppendLog).toHaveBeenCalledTimes(1);
const entries: StoredNotification[] = mockAppendLog.mock.calls[0][2];
// Only the direct agent_message — no coalesced partial entry
expect(entries).toHaveLength(1);
const coalesced = entries[0].notification;
expect(coalesced.params?.update).toEqual({
sessionUpdate: "agent_message",
content: { type: "text", text: "complete full response" },
});
expect(logWriter.getLastAgentMessage(sessionId)).toBe(
"complete full response",
);
});

it("is additive with earlier coalesced text in multi-message turns", async () => {
const sessionId = "s1";
logWriter.register(sessionId, { taskId: "t1", runId: sessionId });

// First assistant message: chunks coalesced by a tool_call event
logWriter.appendRawLine(
sessionId,
makeSessionUpdate("agent_message_chunk", {
content: { type: "text", text: "first message" },
}),
);
logWriter.appendRawLine(
sessionId,
makeSessionUpdate("tool_call", { toolCallId: "tc1" }),
);
// "first message" is now coalesced into currentTurnMessages

// Second assistant message arrives as direct agent_message
// (e.g., after tool result, no active chunk buffer)
logWriter.appendRawLine(
sessionId,
makeSessionUpdate("agent_message", {
content: { type: "text", text: "second message" },
}),
);

const response = logWriter.getFullAgentResponse(sessionId);
// Both messages are preserved — direct message is additive
expect(response).toBe("first message\n\nsecond message");
});

it("persisted log does not contain stale entries when chunks are superseded", async () => {
const sessionId = "s1";
logWriter.register(sessionId, { taskId: "t1", runId: sessionId });

// Chunks buffered, then direct agent_message supersedes before coalescing
logWriter.appendRawLine(
sessionId,
makeSessionUpdate("agent_message_chunk", {
content: { type: "text", text: "partial" },
}),
);
logWriter.appendRawLine(
sessionId,
makeSessionUpdate("agent_message", {
content: { type: "text", text: "complete" },
}),
);

await logWriter.flush(sessionId);

expect(mockAppendLog).toHaveBeenCalledTimes(1);
const entries: StoredNotification[] = mockAppendLog.mock.calls[0][2];
// Only the direct agent_message — no coalesced partial entry
expect(entries).toHaveLength(1);
const persisted = entries[0].notification;
expect(persisted.params?.update).toEqual({
sessionUpdate: "agent_message",
content: { type: "text", text: "complete" },
});
});
});

describe("register", () => {
it("does not re-register existing sessions", () => {
const sessionId = "s1";
Expand Down
59 changes: 48 additions & 11 deletions packages/agent/src/session-log-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,12 @@ export class SessionLogWriter {
}

async flushAll(): Promise<void> {
const sessionIds = [...this.sessions.keys()];
// Coalesce any in-progress chunk buffers before the final flush
// During normal operation, chunks are coalesced when the next non-chunk
// event arrives, but on shutdown there may be no subsequent event
const flushPromises: Promise<void>[] = [];
for (const sessionId of sessionIds) {
for (const [sessionId, session] of this.sessions) {
this.emitCoalescedMessage(sessionId, session);
flushPromises.push(this.flush(sessionId));
}
await Promise.all(flushPromises);
Expand Down Expand Up @@ -123,8 +126,14 @@ export class SessionLogWriter {
return;
}

// Non-chunk event: flush any buffered chunks first
this.emitCoalescedMessage(sessionId, session);
// Non-chunk event: flush any buffered chunks first.
// If this is a direct agent_message AND there are buffered chunks,
// the direct message supersedes the partial chunks
if (this.isDirectAgentMessage(message) && session.chunkBuffer) {
session.chunkBuffer = undefined;
} else {
this.emitCoalescedMessage(sessionId, session);
}

const nonChunkAgentText = this.extractAgentMessageText(message);
if (nonChunkAgentText) {
Expand Down Expand Up @@ -155,7 +164,17 @@ export class SessionLogWriter {
}
}

async flush(sessionId: string): Promise<void> {
async flush(
sessionId: string,
{ coalesce = false }: { coalesce?: boolean } = {},
): Promise<void> {
if (coalesce) {
const session = this.sessions.get(sessionId);
if (session) {
this.emitCoalescedMessage(sessionId, session);
}
}

// Serialize flushes per session
const prev = this.flushQueues.get(sessionId) ?? Promise.resolve();
const next = prev.catch(() => {}).then(() => this._doFlush(sessionId));
Expand All @@ -175,9 +194,6 @@ export class SessionLogWriter {
return;
}

// Emit any buffered chunks before flushing
this.emitCoalescedMessage(sessionId, session);

const pending = this.pendingEntries.get(sessionId);
if (!this.posthogAPI || !pending?.length) {
return;
Expand Down Expand Up @@ -231,11 +247,21 @@ export class SessionLogWriter {
}
}

private isAgentMessageChunk(message: Record<string, unknown>): boolean {
if (message.method !== "session/update") return false;
private getSessionUpdateType(
message: Record<string, unknown>,
): string | undefined {
if (message.method !== "session/update") return undefined;
const params = message.params as Record<string, unknown> | undefined;
const update = params?.update as Record<string, unknown> | undefined;
return update?.sessionUpdate === "agent_message_chunk";
return update?.sessionUpdate as string | undefined;
}

private isDirectAgentMessage(message: Record<string, unknown>): boolean {
return this.getSessionUpdateType(message) === "agent_message";
}

private isAgentMessageChunk(message: Record<string, unknown>): boolean {
return this.getSessionUpdateType(message) === "agent_message_chunk";
}

private extractChunkText(message: Record<string, unknown>): string {
Expand Down Expand Up @@ -290,6 +316,17 @@ export class SessionLogWriter {
getFullAgentResponse(sessionId: string): string | undefined {
const session = this.sessions.get(sessionId);
if (!session || session.currentTurnMessages.length === 0) return undefined;

if (session.chunkBuffer) {
this.logger.warn(
"getFullAgentResponse called with non-empty chunk buffer",
{
sessionId,
bufferedLength: session.chunkBuffer.text.length,
},
);
}

return session.currentTurnMessages.join("\n\n");
}

Expand Down
Loading