Skip to content

Commit ad7e163

Browse files
authored
🤖 fix: prevent compaction summary loss from race condition (#1007)
## Problem When compaction completes, there's a race condition that can cause the compaction summary to be lost from context: 1. StreamManager emits `stream-end` then continues to `deletePartial`/`updateHistory` 2. CompactionHandler runs (async), clears history, appends summary 3. `sendQueuedMessages` triggers `commitToHistory` 4. `commitToHistory` finds stale `partial.json` (not yet deleted by StreamManager) 5. The stale partial has the OLD historySequence from pre-compaction 6. `commitToHistory` appends this stale message to history, corrupting context ## Solution CompactionHandler now deletes `partial.json` BEFORE clearing history. This ensures any concurrent `commitToHistory` calls find no partial and become no-ops. ## Changes - `src/node/services/compactionHandler.ts` - Delete partial before compaction - `src/node/services/agentSession.ts` - Pass partialService to CompactionHandler - `src/node/services/compactionHandler.test.ts` - Test for the fix _Generated with `mux`_
1 parent efd8a16 commit ad7e163

File tree

3 files changed

+58
-0
lines changed

3 files changed

+58
-0
lines changed

src/node/services/agentSession.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ export class AgentSession {
110110
this.compactionHandler = new CompactionHandler({
111111
workspaceId: this.workspaceId,
112112
historyService: this.historyService,
113+
partialService: this.partialService,
113114
emitter: this.emitter,
114115
});
115116

src/node/services/compactionHandler.test.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { describe, it, expect, beforeEach, mock } from "bun:test";
22
import { CompactionHandler } from "./compactionHandler";
33
import type { HistoryService } from "./historyService";
4+
import type { PartialService } from "./partialService";
45
import type { EventEmitter } from "events";
56
import { createMuxMessage, type MuxMessage } from "@/common/types/message";
67
import type { StreamEndEvent } from "@/common/types/stream";
@@ -48,6 +49,26 @@ const createMockHistoryService = () => {
4849
};
4950
};
5051

52+
const createMockPartialService = () => {
53+
let deletePartialResult: Result<void, string> = Ok(undefined);
54+
55+
const deletePartial = mock((_) => Promise.resolve(deletePartialResult));
56+
const readPartial = mock((_) => Promise.resolve(null));
57+
const writePartial = mock((_, __) => Promise.resolve(Ok(undefined)));
58+
const commitToHistory = mock((_) => Promise.resolve(Ok(undefined)));
59+
60+
return {
61+
deletePartial,
62+
readPartial,
63+
writePartial,
64+
commitToHistory,
65+
// Allow setting mock return values
66+
mockDeletePartial: (result: Result<void, string>) => {
67+
deletePartialResult = result;
68+
},
69+
};
70+
};
71+
5172
const createMockEmitter = (): { emitter: EventEmitter; events: EmittedEvent[] } => {
5273
const events: EmittedEvent[] = [];
5374
const emitter = {
@@ -112,6 +133,7 @@ const setupSuccessfulCompaction = (
112133
describe("CompactionHandler", () => {
113134
let handler: CompactionHandler;
114135
let mockHistoryService: ReturnType<typeof createMockHistoryService>;
136+
let mockPartialService: ReturnType<typeof createMockPartialService>;
115137
let mockEmitter: EventEmitter;
116138
let emittedEvents: EmittedEvent[];
117139
const workspaceId = "test-workspace";
@@ -122,10 +144,12 @@ describe("CompactionHandler", () => {
122144
emittedEvents = events;
123145

124146
mockHistoryService = createMockHistoryService();
147+
mockPartialService = createMockPartialService();
125148

126149
handler = new CompactionHandler({
127150
workspaceId,
128151
historyService: mockHistoryService as unknown as HistoryService,
152+
partialService: mockPartialService as unknown as PartialService,
129153
emitter: mockEmitter,
130154
});
131155
});
@@ -209,6 +233,23 @@ describe("CompactionHandler", () => {
209233
);
210234
});
211235

236+
it("should delete partial.json before clearing history (race condition fix)", async () => {
237+
const compactionReq = createCompactionRequest();
238+
mockHistoryService.mockGetHistory(Ok([compactionReq]));
239+
mockHistoryService.mockClearHistory(Ok([0]));
240+
mockHistoryService.mockAppendToHistory(Ok(undefined));
241+
242+
const event = createStreamEndEvent("Summary");
243+
await handler.handleCompletion(event);
244+
245+
// deletePartial should be called once before clearHistory
246+
expect(mockPartialService.deletePartial.mock.calls).toHaveLength(1);
247+
expect(mockPartialService.deletePartial.mock.calls[0][0]).toBe(workspaceId);
248+
249+
// Verify deletePartial was called (we can't easily verify order without more complex mocking,
250+
// but the important thing is that it IS called during compaction)
251+
});
252+
212253
it("should call clearHistory() and appendToHistory()", async () => {
213254
const compactionReq = createCompactionRequest();
214255
mockHistoryService.mockGetHistory(Ok([compactionReq]));

src/node/services/compactionHandler.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { EventEmitter } from "events";
22
import type { HistoryService } from "./historyService";
3+
import type { PartialService } from "./partialService";
34
import type { StreamEndEvent } from "@/common/types/stream";
45
import type { WorkspaceChatMessage, DeleteMessage } from "@/common/orpc/types";
56
import type { Result } from "@/common/types/result";
@@ -13,6 +14,7 @@ import { log } from "@/node/services/log";
1314
interface CompactionHandlerOptions {
1415
workspaceId: string;
1516
historyService: HistoryService;
17+
partialService: PartialService;
1618
emitter: EventEmitter;
1719
}
1820

@@ -27,12 +29,14 @@ interface CompactionHandlerOptions {
2729
export class CompactionHandler {
2830
private readonly workspaceId: string;
2931
private readonly historyService: HistoryService;
32+
private readonly partialService: PartialService;
3033
private readonly emitter: EventEmitter;
3134
private readonly processedCompactionRequestIds: Set<string> = new Set<string>();
3235

3336
constructor(options: CompactionHandlerOptions) {
3437
this.workspaceId = options.workspaceId;
3538
this.historyService = options.historyService;
39+
this.partialService = options.partialService;
3640
this.emitter = options.emitter;
3741
}
3842

@@ -106,6 +110,18 @@ export class CompactionHandler {
106110

107111
const historicalUsage = usageHistory.length > 0 ? sumUsageHistory(usageHistory) : undefined;
108112

113+
// CRITICAL: Delete partial.json BEFORE clearing history
114+
// This prevents a race condition where:
115+
// 1. CompactionHandler clears history and appends summary
116+
// 2. sendQueuedMessages triggers commitToHistory
117+
// 3. commitToHistory finds stale partial.json and appends it to history
118+
// By deleting partial first, commitToHistory becomes a no-op
119+
const deletePartialResult = await this.partialService.deletePartial(this.workspaceId);
120+
if (!deletePartialResult.success) {
121+
log.warn(`Failed to delete partial before compaction: ${deletePartialResult.error}`);
122+
// Continue anyway - the partial may not exist, which is fine
123+
}
124+
109125
// Clear entire history and get deleted sequences
110126
const clearResult = await this.historyService.clearHistory(this.workspaceId);
111127
if (!clearResult.success) {

0 commit comments

Comments
 (0)