diff --git a/packages/agent/src/agent.ts b/packages/agent/src/agent.ts index d2adfd62c..88fc88941 100644 --- a/packages/agent/src/agent.ts +++ b/packages/agent/src/agent.ts @@ -175,7 +175,7 @@ export class Agent { async cleanup(): Promise { if (this.sessionLogWriter && this.taskRunId) { - await this.sessionLogWriter.flush(this.taskRunId); + await this.sessionLogWriter.flush(this.taskRunId, { coalesce: true }); } await this.acpConnection?.cleanup(); } diff --git a/packages/agent/src/server/agent-server.ts b/packages/agent/src/server/agent-server.ts index f87093f0a..1f7718907 100644 --- a/packages/agent/src/server/agent-server.ts +++ b/packages/agent/src/server/agent-server.ts @@ -523,7 +523,9 @@ export class AgentServer { // against async log persistence to object storage. let assistantMessage: string | undefined; try { - await this.session.logWriter.flush(this.session.payload.run_id); + await this.session.logWriter.flush(this.session.payload.run_id, { + coalesce: true, + }); assistantMessage = this.session.logWriter.getFullAgentResponse( this.session.payload.run_id, ); @@ -747,6 +749,9 @@ export class AgentServer { }); this.logger.info("Session initialized successfully"); + this.logger.info( + `Agent version: ${this.config.version ?? packageJson.version}`, + ); // Signal in_progress so the UI can start polling for updates this.posthogAPI @@ -1111,7 +1116,9 @@ Important: ): Promise { if (this.session?.payload.run_id === payload.run_id) { try { - await this.session.logWriter.flush(payload.run_id); + await this.session.logWriter.flush(payload.run_id, { + coalesce: true, + }); } catch (error) { this.logger.warn("Failed to flush session logs before completion", { taskId: payload.task_id, @@ -1270,7 +1277,7 @@ Important: } try { - await this.session.logWriter.flush(payload.run_id); + await this.session.logWriter.flush(payload.run_id, { coalesce: true }); } catch (error) { this.logger.warn("Failed to flush logs before Slack relay", { taskId: payload.task_id, @@ -1473,7 +1480,9 @@ Important: } try { - await this.session.logWriter.flush(this.session.payload.run_id); + await this.session.logWriter.flush(this.session.payload.run_id, { + coalesce: true, + }); } catch (error) { this.logger.error("Failed to flush session logs", error); } diff --git a/packages/agent/src/session-log-writer.test.ts b/packages/agent/src/session-log-writer.test.ts index b78e7f4e5..ddb1519eb 100644 --- a/packages/agent/src/session-log-writer.test.ts +++ b/packages/agent/src/session-log-writer.test.ts @@ -159,6 +159,222 @@ describe("SessionLogWriter", () => { }); }); + describe("_doFlush does not prematurely coalesce", () => { + it("does not coalesce buffered chunks during a timed flush", async () => { + const sessionId = "s1"; + logWriter.register(sessionId, { taskId: "t1", runId: sessionId }); + + // Buffer some chunks (no non-chunk event to trigger coalescing) + logWriter.appendRawLine( + sessionId, + makeSessionUpdate("agent_message_chunk", { + content: { type: "text", text: "Hello " }, + }), + ); + logWriter.appendRawLine( + sessionId, + makeSessionUpdate("agent_message_chunk", { + content: { type: "text", text: "world" }, + }), + ); + + // Flush without any non-chunk event arriving — simulates + // the 500ms debounce timer firing mid-stream + await logWriter.flush(sessionId); + + // No entries should have been sent — chunks are still buffered + expect(mockAppendLog).not.toHaveBeenCalled(); + + // Now a non-chunk event arrives, triggering natural coalescing + logWriter.appendRawLine( + sessionId, + makeSessionUpdate("usage_update", { used: 100 }), + ); + + await logWriter.flush(sessionId); + + expect(mockAppendLog).toHaveBeenCalledTimes(1); + const entries: StoredNotification[] = mockAppendLog.mock.calls[0][2]; + expect(entries).toHaveLength(2); // coalesced agent_message + usage_update + const coalesced = entries[0].notification; + expect(coalesced.params?.update).toEqual({ + sessionUpdate: "agent_message", + content: { type: "text", text: "Hello world" }, + }); + }); + }); + + describe("flushAll coalesces on shutdown", () => { + it("coalesces remaining chunks before flushing", async () => { + const sessionId = "s1"; + logWriter.register(sessionId, { taskId: "t1", runId: sessionId }); + + logWriter.appendRawLine( + sessionId, + makeSessionUpdate("agent_message_chunk", { + content: { type: "text", text: "partial response" }, + }), + ); + + await logWriter.flushAll(); + + expect(mockAppendLog).toHaveBeenCalledTimes(1); + const entries: StoredNotification[] = mockAppendLog.mock.calls[0][2]; + expect(entries).toHaveLength(1); + const coalesced = entries[0].notification; + expect(coalesced.params?.update).toEqual({ + sessionUpdate: "agent_message", + content: { type: "text", text: "partial response" }, + }); + }); + }); + + describe("flush with coalesce option", () => { + it("drains chunk buffer when coalesce is true", async () => { + const sessionId = "s1"; + logWriter.register(sessionId, { taskId: "t1", runId: sessionId }); + + logWriter.appendRawLine( + sessionId, + makeSessionUpdate("agent_message_chunk", { + content: { type: "text", text: "complete text" }, + }), + ); + + await logWriter.flush(sessionId, { coalesce: true }); + + expect(mockAppendLog).toHaveBeenCalledTimes(1); + const entries: StoredNotification[] = mockAppendLog.mock.calls[0][2]; + const coalesced = entries[0].notification; + expect(coalesced.params?.update).toEqual({ + sessionUpdate: "agent_message", + content: { type: "text", text: "complete text" }, + }); + }); + + it("does not coalesce when coalesce is false", async () => { + const sessionId = "s1"; + logWriter.register(sessionId, { taskId: "t1", runId: sessionId }); + + logWriter.appendRawLine( + sessionId, + makeSessionUpdate("agent_message_chunk", { + content: { type: "text", text: "buffered" }, + }), + ); + + await logWriter.flush(sessionId, { coalesce: false }); + + expect(mockAppendLog).not.toHaveBeenCalled(); + }); + }); + + describe("direct agent_message supersedes chunks", () => { + it("discards buffered chunks when a direct agent_message arrives", async () => { + const sessionId = "s1"; + logWriter.register(sessionId, { taskId: "t1", runId: sessionId }); + + // Buffer partial chunks + logWriter.appendRawLine( + sessionId, + makeSessionUpdate("agent_message_chunk", { + content: { type: "text", text: "partial " }, + }), + ); + logWriter.appendRawLine( + sessionId, + makeSessionUpdate("agent_message_chunk", { + content: { type: "text", text: "text" }, + }), + ); + + // Direct agent_message arrives — authoritative full text + logWriter.appendRawLine( + sessionId, + makeSessionUpdate("agent_message", { + content: { type: "text", text: "complete full response" }, + }), + ); + + await logWriter.flush(sessionId); + + expect(mockAppendLog).toHaveBeenCalledTimes(1); + const entries: StoredNotification[] = mockAppendLog.mock.calls[0][2]; + // Only the direct agent_message — no coalesced partial entry + expect(entries).toHaveLength(1); + const coalesced = entries[0].notification; + expect(coalesced.params?.update).toEqual({ + sessionUpdate: "agent_message", + content: { type: "text", text: "complete full response" }, + }); + expect(logWriter.getLastAgentMessage(sessionId)).toBe( + "complete full response", + ); + }); + + it("is additive with earlier coalesced text in multi-message turns", async () => { + const sessionId = "s1"; + logWriter.register(sessionId, { taskId: "t1", runId: sessionId }); + + // First assistant message: chunks coalesced by a tool_call event + logWriter.appendRawLine( + sessionId, + makeSessionUpdate("agent_message_chunk", { + content: { type: "text", text: "first message" }, + }), + ); + logWriter.appendRawLine( + sessionId, + makeSessionUpdate("tool_call", { toolCallId: "tc1" }), + ); + // "first message" is now coalesced into currentTurnMessages + + // Second assistant message arrives as direct agent_message + // (e.g., after tool result, no active chunk buffer) + logWriter.appendRawLine( + sessionId, + makeSessionUpdate("agent_message", { + content: { type: "text", text: "second message" }, + }), + ); + + const response = logWriter.getFullAgentResponse(sessionId); + // Both messages are preserved — direct message is additive + expect(response).toBe("first message\n\nsecond message"); + }); + + it("persisted log does not contain stale entries when chunks are superseded", async () => { + const sessionId = "s1"; + logWriter.register(sessionId, { taskId: "t1", runId: sessionId }); + + // Chunks buffered, then direct agent_message supersedes before coalescing + logWriter.appendRawLine( + sessionId, + makeSessionUpdate("agent_message_chunk", { + content: { type: "text", text: "partial" }, + }), + ); + logWriter.appendRawLine( + sessionId, + makeSessionUpdate("agent_message", { + content: { type: "text", text: "complete" }, + }), + ); + + await logWriter.flush(sessionId); + + expect(mockAppendLog).toHaveBeenCalledTimes(1); + const entries: StoredNotification[] = mockAppendLog.mock.calls[0][2]; + // Only the direct agent_message — no coalesced partial entry + expect(entries).toHaveLength(1); + const persisted = entries[0].notification; + expect(persisted.params?.update).toEqual({ + sessionUpdate: "agent_message", + content: { type: "text", text: "complete" }, + }); + }); + }); + describe("register", () => { it("does not re-register existing sessions", () => { const sessionId = "s1"; diff --git a/packages/agent/src/session-log-writer.ts b/packages/agent/src/session-log-writer.ts index a49890096..d585d98eb 100644 --- a/packages/agent/src/session-log-writer.ts +++ b/packages/agent/src/session-log-writer.ts @@ -54,9 +54,12 @@ export class SessionLogWriter { } async flushAll(): Promise { - const sessionIds = [...this.sessions.keys()]; + // Coalesce any in-progress chunk buffers before the final flush + // During normal operation, chunks are coalesced when the next non-chunk + // event arrives, but on shutdown there may be no subsequent event const flushPromises: Promise[] = []; - for (const sessionId of sessionIds) { + for (const [sessionId, session] of this.sessions) { + this.emitCoalescedMessage(sessionId, session); flushPromises.push(this.flush(sessionId)); } await Promise.all(flushPromises); @@ -123,8 +126,14 @@ export class SessionLogWriter { return; } - // Non-chunk event: flush any buffered chunks first - this.emitCoalescedMessage(sessionId, session); + // Non-chunk event: flush any buffered chunks first. + // If this is a direct agent_message AND there are buffered chunks, + // the direct message supersedes the partial chunks + if (this.isDirectAgentMessage(message) && session.chunkBuffer) { + session.chunkBuffer = undefined; + } else { + this.emitCoalescedMessage(sessionId, session); + } const nonChunkAgentText = this.extractAgentMessageText(message); if (nonChunkAgentText) { @@ -155,7 +164,17 @@ export class SessionLogWriter { } } - async flush(sessionId: string): Promise { + async flush( + sessionId: string, + { coalesce = false }: { coalesce?: boolean } = {}, + ): Promise { + if (coalesce) { + const session = this.sessions.get(sessionId); + if (session) { + this.emitCoalescedMessage(sessionId, session); + } + } + // Serialize flushes per session const prev = this.flushQueues.get(sessionId) ?? Promise.resolve(); const next = prev.catch(() => {}).then(() => this._doFlush(sessionId)); @@ -175,9 +194,6 @@ export class SessionLogWriter { return; } - // Emit any buffered chunks before flushing - this.emitCoalescedMessage(sessionId, session); - const pending = this.pendingEntries.get(sessionId); if (!this.posthogAPI || !pending?.length) { return; @@ -231,11 +247,21 @@ export class SessionLogWriter { } } - private isAgentMessageChunk(message: Record): boolean { - if (message.method !== "session/update") return false; + private getSessionUpdateType( + message: Record, + ): string | undefined { + if (message.method !== "session/update") return undefined; const params = message.params as Record | undefined; const update = params?.update as Record | undefined; - return update?.sessionUpdate === "agent_message_chunk"; + return update?.sessionUpdate as string | undefined; + } + + private isDirectAgentMessage(message: Record): boolean { + return this.getSessionUpdateType(message) === "agent_message"; + } + + private isAgentMessageChunk(message: Record): boolean { + return this.getSessionUpdateType(message) === "agent_message_chunk"; } private extractChunkText(message: Record): string { @@ -290,6 +316,17 @@ export class SessionLogWriter { getFullAgentResponse(sessionId: string): string | undefined { const session = this.sessions.get(sessionId); if (!session || session.currentTurnMessages.length === 0) return undefined; + + if (session.chunkBuffer) { + this.logger.warn( + "getFullAgentResponse called with non-empty chunk buffer", + { + sessionId, + bufferedLength: session.chunkBuffer.text.length, + }, + ); + } + return session.currentTurnMessages.join("\n\n"); }