feat(agent): sync AgentController capability from tegg#5968
Conversation
Replace egg's earlier AgentController implementation with tegg's current implementation as the source of truth (egg's was the initial March port; tegg has since evolved the message model and runtime). - types(agent-runtime): switch to the SDK-aligned AgentMessage model (replaces the OpenAI-style MessageObject surface), add latestRunId / getLatestRunId, thread metadata, GetThreadOptions and AgentTimeoutError - agent-runtime: port AgentRuntime/OSSAgentStore/MessageConverter/RunBuilder/ SSEWriter to the current behavior (per-thread creation-time index, persist-on-abort, session-committed cancel gating, run/thread metadata, getRunStream / getLatestRunId) - controller-decorator: add getLatestRunId (GET /threads/:id/latest-run) and getRunStream (GET /runs/:id/stream) routes, multi-param routing, and the isSessionCommitted hook on AgentHandler - controller-plugin: wire the getLatestRunId delegate and getRunStream SSE delegate in AgentControllerObject - tegg: re-export SDK-aligned agent message types from the barrel - tests: port agent-runtime + controller-decorator agent tests and update the @eggjs/tegg-types export snapshot Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
📝 WalkthroughWalkthroughThis PR substantially refactors the agent runtime to add executor commit awareness, persistent JSONL-backed SSE streaming with reconnection support, thread metadata merging, and enhanced cancellation semantics. All changes maintain backward compatibility at the public method level while replacing the underlying execution and streaming infrastructure. ChangesAgent Runtime Commit-Aware Refactor with Persistent SSE Streaming
🎯 4 (Complex) | ⏱️ ~45 minutes Possibly Related PRs
Suggested Reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## next #5968 +/- ##
==========================================
+ Coverage 85.32% 85.50% +0.18%
==========================================
Files 670 669 -1
Lines 19553 19825 +272
Branches 3864 3917 +53
==========================================
+ Hits 16683 16952 +269
- Misses 2479 2481 +2
- Partials 391 392 +1 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
Deploying egg with
|
| Latest commit: |
13c1446
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://26cba2ad.egg-cci.pages.dev |
| Branch Preview URL: | https://sync-agent-controller-from-t.egg-cci.pages.dev |
There was a problem hiding this comment.
Code Review
This pull request introduces significant enhancements to the AgentRuntime and OSSAgentStore to support session persistence, robust run cancellation, and improved event streaming. Key changes include adding an isSessionCommitted hook for executors, implementing a background event buffer for streaming runs, and introducing an activity-time index for threads. The review feedback correctly identifies several critical issues: the use of synchronous file I/O in hot paths, potential memory leaks in heartbeat timers, and the need for robust error normalization when handling unknown caught exceptions. Additionally, the reviewer highlights the risk of unbounded disk usage from event logs and suggests improvements for the exclusive write lock mechanism in the store.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| private pushEvent(buffer: RunEventBuffer, type: string, data: unknown): void { | ||
| const event: StreamEvent = { | ||
| seq: ++buffer.lastSeq, | ||
| type, | ||
| data, | ||
| ts: Date.now(), | ||
| }; | ||
| appendFileSync(buffer.filePath, JSON.stringify(event) + '\n'); | ||
| buffer.emitter.emit('event', event); | ||
| } |
There was a problem hiding this comment.
Using synchronous file system operations like appendFileSync inside the hot path of a streaming API (which can be called dozens of times per second per active stream for token deltas) blocks the single-threaded Node.js event loop. This can severely degrade the performance and throughput of the entire application under concurrent load.
Consider using a non-blocking fs.WriteStream to write events. You can create the write stream when initializing the RunEventBuffer and close it when the stream finishes.
private pushEvent(buffer: RunEventBuffer, type: string, data: unknown): void {
const event: StreamEvent = {
seq: ++buffer.lastSeq,
type,
data,
ts: Date.now(),
};
if (buffer.writeStream) {
buffer.writeStream.write(JSON.stringify(event) + '\n');
} else {
appendFileSync(buffer.filePath, JSON.stringify(event) + '\n');
}
buffer.emitter.emit('event', event);
}| const waitForEvent = () => | ||
| new Promise<'event' | 'heartbeat'>((resolve) => { | ||
| waitResolve = () => resolve('event'); | ||
| setTimeout(() => resolve('heartbeat'), HEARTBEAT_INTERVAL_MS); | ||
| }); |
There was a problem hiding this comment.
In waitForEvent, the setTimeout timer is not cleared when the promise resolves via waitResolve (which happens when a real-time event is received). This causes a resource leak where active timers and their closures accumulate in the event loop until they eventually fire after 10 seconds. Under high event frequency, this can lead to significant memory overhead and CPU churn.
Clear the timeout when waitResolve is called to prevent this leak.
const waitForEvent = () =>
new Promise<'event' | 'heartbeat'>((resolve) => {
const timer = setTimeout(() => resolve('heartbeat'), HEARTBEAT_INTERVAL_MS);
waitResolve = () => {
clearTimeout(timer);
resolve('event');
};
});| try { | ||
| await this.store.updateRun(run.id, rb.fail(err as Error)); | ||
| const currentRun = await this.store.getRun(runId); | ||
| if (currentRun.status !== RunStatus.Cancelling && currentRun.status !== RunStatus.Cancelled) { | ||
| await this.store.updateRun(runId, rb.fail(err as Error)); | ||
| } | ||
| } catch (storeErr) { | ||
| this.logger.error('[AgentRuntime] failed to update run status after error:', storeErr); | ||
| } | ||
|
|
||
| // event: thread.run.failed | ||
| if (!writer.closed) { | ||
| writer.writeEvent(AgentSSEEvent.ThreadRunFailed, rb.snapshot()); | ||
| } | ||
| this.pushEvent(buffer, 'error', { message: (err as Error).message, runId }); |
There was a problem hiding this comment.
When handling caught exceptions of type unknown, casting err as Error and directly accessing (err as Error).message can lead to runtime errors or undefined values if the thrown value is not an instance of Error (e.g., a string or a plain object).
Following the repository's general guidelines, we should check if the caught value is an Error instance before accessing its properties, and use String(err) as a fallback to ensure a meaningful error message.
Let's normalize err to an Error instance first.
| try { | |
| await this.store.updateRun(run.id, rb.fail(err as Error)); | |
| const currentRun = await this.store.getRun(runId); | |
| if (currentRun.status !== RunStatus.Cancelling && currentRun.status !== RunStatus.Cancelled) { | |
| await this.store.updateRun(runId, rb.fail(err as Error)); | |
| } | |
| } catch (storeErr) { | |
| this.logger.error('[AgentRuntime] failed to update run status after error:', storeErr); | |
| } | |
| // event: thread.run.failed | |
| if (!writer.closed) { | |
| writer.writeEvent(AgentSSEEvent.ThreadRunFailed, rb.snapshot()); | |
| } | |
| this.pushEvent(buffer, 'error', { message: (err as Error).message, runId }); | |
| const errorInstance = err instanceof Error ? err : new Error(String(err)); | |
| try { | |
| const currentRun = await this.store.getRun(runId); | |
| if (currentRun.status !== RunStatus.Cancelling && currentRun.status !== RunStatus.Cancelled) { | |
| await this.store.updateRun(runId, rb.fail(errorInstance)); | |
| } | |
| } catch (storeErr) { | |
| this.logger.error('[AgentRuntime] failed to update run status after error:', storeErr); | |
| } | |
| this.pushEvent(buffer, 'error', { message: errorInstance.message, runId }); |
References
- When creating a new Error from a caught exception, check if the caught value is an
Errorinstance before accessing itsmessageproperty. UseString(err)as a fallback for non-Error values to ensure a meaningful error message.
| try { | ||
| await this.store.updateRun(run.id, rb.fail(err as Error)); | ||
| } catch (storeErr) { | ||
| this.logger.error('[AgentRuntime] failed to update run status after syncRun error:', storeErr); | ||
| } | ||
| throw err; |
There was a problem hiding this comment.
To ensure safe error handling when err is of type unknown, normalize it to an Error instance before passing it to rb.fail. This prevents potential runtime issues if a non-Error value is thrown.
| try { | |
| await this.store.updateRun(run.id, rb.fail(err as Error)); | |
| } catch (storeErr) { | |
| this.logger.error('[AgentRuntime] failed to update run status after syncRun error:', storeErr); | |
| } | |
| throw err; | |
| try { | |
| const errorInstance = err instanceof Error ? err : new Error(String(err)); | |
| await this.store.updateRun(run.id, rb.fail(errorInstance)); | |
| } catch (storeErr) { | |
| this.logger.error('[AgentRuntime] failed to update run status after syncRun error:', storeErr); | |
| } | |
| throw err; |
References
- When creating a new Error from a caught exception, check if the caught value is an
Errorinstance before accessing itsmessageproperty. UseString(err)as a fallback for non-Error values to ensure a meaningful error message.
| try { | ||
| const currentRun = await this.store.getRun(run.id); | ||
| if (currentRun.status !== RunStatus.Cancelling && currentRun.status !== RunStatus.Cancelled) { | ||
| await this.store.updateRun(run.id, rb.fail(err as Error)); | ||
| } | ||
| } catch (storeErr) { | ||
| // TODO: need a background expiry mechanism to clean up runs stuck in non-terminal states | ||
| // (e.g. in_progress or cancelling) when store writes fail persistently. | ||
| this.logger.error('[AgentRuntime] failed to update run status after error:', storeErr); | ||
| } |
There was a problem hiding this comment.
Normalize err to an Error instance before passing it to rb.fail to ensure robust error handling for non-Error thrown values.
| try { | |
| const currentRun = await this.store.getRun(run.id); | |
| if (currentRun.status !== RunStatus.Cancelling && currentRun.status !== RunStatus.Cancelled) { | |
| await this.store.updateRun(run.id, rb.fail(err as Error)); | |
| } | |
| } catch (storeErr) { | |
| // TODO: need a background expiry mechanism to clean up runs stuck in non-terminal states | |
| // (e.g. in_progress or cancelling) when store writes fail persistently. | |
| this.logger.error('[AgentRuntime] failed to update run status after error:', storeErr); | |
| } | |
| try { | |
| const currentRun = await this.store.getRun(run.id); | |
| if (currentRun.status !== RunStatus.Cancelling && currentRun.status !== RunStatus.Cancelled) { | |
| const errorInstance = err instanceof Error ? err : new Error(String(err)); | |
| await this.store.updateRun(run.id, rb.fail(errorInstance)); | |
| } | |
| } catch (storeErr) { | |
| this.logger.error('[AgentRuntime] failed to update run status after error:', storeErr); | |
| } |
References
- When creating a new Error from a caught exception, check if the caught value is an
Errorinstance before accessing itsmessageproperty. UseString(err)as a fallback for non-Error values to ensure a meaningful error message.
| private async runExclusiveThreadMetaWrite<T>(threadId: string, fn: () => Promise<T>): Promise<T> { | ||
| const previous = this.threadMetaWriteTails.get(threadId) ?? Promise.resolve(); | ||
| let release!: () => void; | ||
| const current = new Promise<void>((resolve) => { | ||
| release = resolve; | ||
| }); | ||
| const tail = previous.then(() => current); |
There was a problem hiding this comment.
In runExclusiveThreadMetaWrite, the promise chain is constructed using previous.then(() => current). If any promise in the chain rejects (e.g., due to an unhandled rejection or an unexpected error in a previous operation), the rejection will propagate down the chain, causing all subsequent write operations for that thread to reject and fail permanently.
To make the lock chain robust against unexpected rejections, catch any errors from the previous promise before chaining the next one.
| private async runExclusiveThreadMetaWrite<T>(threadId: string, fn: () => Promise<T>): Promise<T> { | |
| const previous = this.threadMetaWriteTails.get(threadId) ?? Promise.resolve(); | |
| let release!: () => void; | |
| const current = new Promise<void>((resolve) => { | |
| release = resolve; | |
| }); | |
| const tail = previous.then(() => current); | |
| private async runExclusiveThreadMetaWrite<T>(threadId: string, fn: () => Promise<T>): Promise<T> { | |
| const previous = this.threadMetaWriteTails.get(threadId) ?? Promise.resolve(); | |
| let release!: () => void; | |
| const current = new Promise<void>((resolve) => { | |
| release = resolve; | |
| }); | |
| const tail = previous.catch(() => {}).then(() => current); |
| const EVENT_DIR = join(tmpdir(), 'agent-runtime-events'); | ||
| const DEFAULT_CANCEL_COMMIT_TIMEOUT_MS = 30_000; |
There was a problem hiding this comment.
The EVENT_DIR directory is created in the system's temporary directory (tmpdir()), and a new .jsonl file is created for every streaming run. Since these files are never deleted or cleaned up, they will accumulate indefinitely over the lifetime of the server, potentially leading to disk space exhaustion.
Consider implementing a background cleanup job or a retention policy (e.g., deleting files older than 24 hours) to prevent unbounded disk space usage.
Deploying egg-v3 with
|
| Latest commit: |
13c1446
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://07a08c49.egg-v3.pages.dev |
| Branch Preview URL: | https://sync-agent-controller-from-t.egg-v3.pages.dev |
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tegg/core/agent-runtime/src/AgentRuntime.ts (1)
131-138:⚠️ Potential issue | 🟠 Major | ⚡ Quick winValidate thread metadata on the public
createThread()path too.
ensureThread()rejects non-object metadata, butcreateThread()forwardsoptions?.metadatastraight to the store. That letsnull/arrays reach persisted thread metadata through the thread-creation API even though the runtime now treats thread metadata as object-shaped.Suggested fix
async createThread(options?: CreateThreadOptions): Promise<ThreadObject> { - const thread = await this.store.createThread(options?.metadata); + const thread = await this.store.createThread(validateMetadata(options?.metadata)); return { id: thread.id, object: AgentObjectType.Thread,🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tegg/core/agent-runtime/src/AgentRuntime.ts` around lines 131 - 138, createThread forwards options?.metadata straight to this.store.createThread allowing null/arrays to persist; call the existing ensureThread validator to validate/normalize metadata before persisting. In AgentRuntime.createThread, run const metadata = ensureThread(options?.metadata) (or similar) and pass that validated metadata to this.store.createThread, and use that same metadata in the returned ThreadObject so persisted and returned thread metadata are object-shaped and consistent with ensureThread's rules.
🧹 Nitpick comments (4)
tegg/core/controller-decorator/src/decorator/agent/AgentController.ts (2)
17-21: ⚡ Quick winConsider using a discriminated union for type-safe parameter metadata.
The current interface allows
nameto be optional for all param types, butpathParamandqueryalways requirenamewhilebodynever uses it. This forces non-null assertions at lines 156 and 159. A discriminated union would make this contract compile-time safe:type AgentRouteParam = | { index: number; type: 'body' } | { index: number; type: 'pathParam'; name: string } | { index: number; type: 'query'; name: string };This eliminates the
!assertions and prevents accidental misuse (e.g., forgetting to addnamefor a path parameter).♻️ Refactored type definition
-interface AgentRouteParam { - index: number; - type: 'body' | 'pathParam' | 'query'; - name?: string; -} +type AgentRouteParam = + | { index: number; type: 'body' } + | { index: number; type: 'pathParam'; name: string } + | { index: number; type: 'query'; name: string };Then at lines 156 and 159, remove the
!:- HTTPInfoUtil.setHTTPMethodParamName(param.name!, param.index, constructor, route.methodName); + HTTPInfoUtil.setHTTPMethodParamName(param.name, param.index, constructor, route.methodName);🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tegg/core/controller-decorator/src/decorator/agent/AgentController.ts` around lines 17 - 21, Change the loose AgentRouteParam interface into a discriminated union so parameter metadata is type-safe: replace the current interface AgentRouteParam with a union of { index: number; type: 'body' } | { index: number; type: 'pathParam'; name: string } | { index: number; type: 'query'; name: string }; then update all code that accesses AgentRouteParam (notably the logic in AgentController that currently uses non-null assertions on param.name) to rely on the discriminant (param.type) and remove the `!` assertions, e.g., when param.type === 'pathParam' or 'query' safely access param.name and when 'body' avoid using name. Ensure any helper functions or type guards reflect the new union.
34-53: ⚡ Quick winStub generator only supports up to 2 parameters.
The
paramCount >= 2branch creates a function with exactly 2 parameters, so routes with 3+ parameters would get a stub whosefunction.lengthdoesn't matchparamCount, breaking the framework's param validation contract (line 31 comment).Currently no routes exceed 2 parameters, but this limits future extensibility. Consider either:
- Adding a constraint (e.g.,
if (paramCount > 2) throw new Error(...)) to fail fast- Using rest parameters:
fn = async function (..._args: unknown[]) { ... }(though this makesfunction.lengthalways 0)- Generating the function dynamically to match the exact parameter count
♻️ Option 1: Add constraint to fail fast
function createNotImplemented(methodName: string, paramCount: number) { + if (paramCount > 2) { + throw new Error(`createNotImplemented does not support routes with more than 2 parameters (${methodName} has ${paramCount})`); + } let fn; if (paramCount >= 2) {🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tegg/core/controller-decorator/src/decorator/agent/AgentController.ts` around lines 34 - 53, createNotImplemented currently maps any paramCount >= 2 to a 2-arg stub, which breaks the framework's param validation for routes with >2 params; update createNotImplemented to fail fast by throwing an Error when paramCount > 2 (e.g. at the start of createNotImplemented check if paramCount > 2 and throw a clear error mentioning methodName and paramCount), then keep existing branches for 0,1,2 parameters and continue to call AgentInfoUtil.setNotImplemented(fn).tegg/plugin/controller/src/lib/AgentControllerObject.ts (1)
267-267: 💤 Low valueUnnecessary type assertion in parseInt call.
The
as stringassertion is redundant sincelastSeqis already typed asstring | undefinedin the function signature.parseInt(undefined, 10)returnsNaN, andNaN || 0correctly returns0, so the code works but the assertion reduces clarity.♻️ Cleaner alternatives
Option 1: Use nullish coalescing:
- const seq = parseInt(lastSeq as string, 10) || 0; + const seq = parseInt(lastSeq ?? '', 10) || 0;Option 2: Explicit check:
- const seq = parseInt(lastSeq as string, 10) || 0; + const seq = lastSeq ? parseInt(lastSeq, 10) || 0 : 0;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tegg/plugin/controller/src/lib/AgentControllerObject.ts` at line 267, The parseInt call in AgentControllerObject (where const seq = parseInt(lastSeq as string, 10) || 0;) uses an unnecessary type assertion; remove the "as string" and either call parseInt(lastSeq ?? undefined, 10) with the existing || 0 fallback or explicitly handle undefined (e.g., const seq = parseInt(lastSeq ?? '', 10) || 0) so the code is clearer—update the line in AgentControllerObject accordingly without changing surrounding logic.tegg/core/agent-runtime/test/OSSObjectStorageClient.test.ts (1)
8-73: ⚡ Quick winRemove dead code in the first section of
mockFn().Lines 9–39 define
fnand attach mock methods to it, but thisfnis never returned. Instead, lines 41–72 redefinewrappedFnwith the same logic, and line 72 returnswrappedFn. The first section (lines 9–39) is unreachable dead code.♻️ Proposed cleanup
/** Simple mock function helper for mocha tests. */ function mockFn() { const calls: any[][] = []; let nextResults: Array<{ type: 'resolve' | 'reject'; value: any }> = []; - const fn = (...args: any[]) => { - calls.push(args); - const result = nextResults.shift(); - if (result) { - return result.type === 'resolve' ? Promise.resolve(result.value) : Promise.reject(result.value); - } - return Promise.resolve({}); - }; - fn.mock = { calls }; - fn.mockResolvedValue = (val: any) => { - nextResults = []; - fn.mockResolvedValueOnce(val); - nextResults = nextResults.map(() => ({ type: 'resolve' as const, value: val })); - (fn as any)._defaultResult = { type: 'resolve', value: val }; - return fn; - }; - fn.mockResolvedValueOnce = (val: any) => { - nextResults.push({ type: 'resolve', value: val }); - return fn; - }; - fn.mockRejectedValue = (val: any) => { - (fn as any)._defaultResult = { type: 'reject', value: val }; - return fn; - }; - fn.mockRejectedValueOnce = (val: any) => { - nextResults.push({ type: 'reject', value: val }); - return fn; - }; - - // Override fn to use default result when nextResults is empty const wrappedFn: any = (...args: any[]) => { calls.push(args); const result = nextResults.shift();🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tegg/core/agent-runtime/test/OSSObjectStorageClient.test.ts` around lines 8 - 73, The code defines two implementations inside mockFn(): the unused local fn and the returned wrappedFn, causing dead code; remove the entire first implementation (the const fn = ... block and its mock* assignments that reference fn) and keep only the returned wrappedFn implementation that uses nextResults and calls, ensuring mockResolvedValue/mockRejectedValue and their "...Once" variants are attached to wrappedFn and that any references to _defaultResult remain on wrappedFn; update any internal references if necessary so mockFn returns the single working wrappedFn.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@tegg/core/agent-runtime/src/AgentRuntime.ts`:
- Around line 28-30: Update the local ESM imports in AgentRuntime by changing
the specifiers to use .js extensions instead of .ts for the imported modules:
replace imports for MessageConverter, RunBuilder, and SSEWriter so their
specifiers end with .js; ensure the same .js extension convention is applied
consistently for these symbols in other touched tegg TS files in this cohort.
- Around line 457-465: pushEvent currently uses blocking appendFileSync which
blocks the event loop when called from
executeStreamBackground/this.executor.execRun; change pushEvent to perform
non-blocking writes: either make pushEvent async and use fs.promises.appendFile,
or create and reuse a fs.createWriteStream on RunEventBuffer (e.g., a
buffer.stream or buffer.writeStream) and call write() with proper drain handling
to preserve event order and backpressure. Ensure you update call sites
(executeStreamBackground and any other callers) to await the async pushEvent or
handle write stream errors, and keep emitting buffer.emitter.emit('event',
event') immediately after scheduling the async write so consumers still receive
events in order.
In `@tegg/core/agent-runtime/src/OSSAgentStore.ts`:
- Line 13: The import in OSSAgentStore.ts is using a .ts extension which
violates ESM import-extension rules; update the import statement that brings in
dateBucket, newRunId, newThreadId, nowUnix, reverseMs from
'./AgentStoreUtils.ts' to use the .js extension (e.g., './AgentStoreUtils.js')
so the runtime can resolve the module correctly; ensure you update the import
only (preserve the named symbols) and run a quick build to confirm no other
imports use .ts extensions.
- Around line 88-116: writeThreadActivityIndex currently calls
this.client.put(...) directly so a synchronous throw escapes; wrap the call so
any synchronous exception becomes a rejected promise (e.g. create tracked via
Promise.resolve().then(() => this.client.put(indexKey, indexBody))) and then
attach the existing .catch(...) and .finally(...) handlers, and add tracked to
pendingIndexWrites; this ensures both sync and async failures are handled and
pendingIndexWrites.delete(tracked) always runs in finally.
In `@tegg/core/types/src/agent-runtime/AgentRuntime.ts`:
- Around line 1-2: Update the local ESM type import specifiers to use .js
extensions: in the imports that pull in AgentMessage, AgentRunConfig, and
RunStatus (the import statements shown in AgentRuntime.ts and AgentStore.ts),
change the source specifiers from './AgentMessage.ts' and './AgentStore.ts' (or
any other local .ts paths) to './AgentMessage.js' and './AgentStore.js'
respectively so they comply with the tegg ESM import-extension guideline.
---
Outside diff comments:
In `@tegg/core/agent-runtime/src/AgentRuntime.ts`:
- Around line 131-138: createThread forwards options?.metadata straight to
this.store.createThread allowing null/arrays to persist; call the existing
ensureThread validator to validate/normalize metadata before persisting. In
AgentRuntime.createThread, run const metadata = ensureThread(options?.metadata)
(or similar) and pass that validated metadata to this.store.createThread, and
use that same metadata in the returned ThreadObject so persisted and returned
thread metadata are object-shaped and consistent with ensureThread's rules.
---
Nitpick comments:
In `@tegg/core/agent-runtime/test/OSSObjectStorageClient.test.ts`:
- Around line 8-73: The code defines two implementations inside mockFn(): the
unused local fn and the returned wrappedFn, causing dead code; remove the entire
first implementation (the const fn = ... block and its mock* assignments that
reference fn) and keep only the returned wrappedFn implementation that uses
nextResults and calls, ensuring mockResolvedValue/mockRejectedValue and their
"...Once" variants are attached to wrappedFn and that any references to
_defaultResult remain on wrappedFn; update any internal references if necessary
so mockFn returns the single working wrappedFn.
In `@tegg/core/controller-decorator/src/decorator/agent/AgentController.ts`:
- Around line 17-21: Change the loose AgentRouteParam interface into a
discriminated union so parameter metadata is type-safe: replace the current
interface AgentRouteParam with a union of { index: number; type: 'body' } | {
index: number; type: 'pathParam'; name: string } | { index: number; type:
'query'; name: string }; then update all code that accesses AgentRouteParam
(notably the logic in AgentController that currently uses non-null assertions on
param.name) to rely on the discriminant (param.type) and remove the `!`
assertions, e.g., when param.type === 'pathParam' or 'query' safely access
param.name and when 'body' avoid using name. Ensure any helper functions or type
guards reflect the new union.
- Around line 34-53: createNotImplemented currently maps any paramCount >= 2 to
a 2-arg stub, which breaks the framework's param validation for routes with >2
params; update createNotImplemented to fail fast by throwing an Error when
paramCount > 2 (e.g. at the start of createNotImplemented check if paramCount >
2 and throw a clear error mentioning methodName and paramCount), then keep
existing branches for 0,1,2 parameters and continue to call
AgentInfoUtil.setNotImplemented(fn).
In `@tegg/plugin/controller/src/lib/AgentControllerObject.ts`:
- Line 267: The parseInt call in AgentControllerObject (where const seq =
parseInt(lastSeq as string, 10) || 0;) uses an unnecessary type assertion;
remove the "as string" and either call parseInt(lastSeq ?? undefined, 10) with
the existing || 0 fallback or explicitly handle undefined (e.g., const seq =
parseInt(lastSeq ?? '', 10) || 0) so the code is clearer—update the line in
AgentControllerObject accordingly without changing surrounding logic.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: ab0fddf1-6035-450e-a4a0-1db7f8ba9653
⛔ Files ignored due to path filters (1)
tegg/core/types/test/__snapshots__/index.test.ts.snapis excluded by!**/*.snap
📒 Files selected for processing (26)
tegg/core/agent-runtime/src/AgentRuntime.tstegg/core/agent-runtime/src/AgentStoreUtils.tstegg/core/agent-runtime/src/HttpSSEWriter.tstegg/core/agent-runtime/src/MessageConverter.tstegg/core/agent-runtime/src/OSSAgentStore.tstegg/core/agent-runtime/src/RunBuilder.tstegg/core/agent-runtime/src/SSEWriter.tstegg/core/agent-runtime/test/AgentRuntime.metadata.test.tstegg/core/agent-runtime/test/AgentRuntime.test.tstegg/core/agent-runtime/test/AgentStoreUtils.test.tstegg/core/agent-runtime/test/HttpSSEWriter.test.tstegg/core/agent-runtime/test/MessageConverter.test.tstegg/core/agent-runtime/test/OSSAgentStore.test.tstegg/core/agent-runtime/test/OSSObjectStorageClient.test.tstegg/core/agent-runtime/test/RunBuilder.test.tstegg/core/agent-runtime/test/helpers.tstegg/core/controller-decorator/src/decorator/agent/AgentController.tstegg/core/controller-decorator/src/decorator/agent/AgentHandler.tstegg/core/controller-decorator/test/AgentController.test.tstegg/core/controller-decorator/test/fixtures/AgentFooController.tstegg/core/tegg/src/agent.tstegg/core/types/src/agent-runtime/AgentMessage.tstegg/core/types/src/agent-runtime/AgentRuntime.tstegg/core/types/src/agent-runtime/AgentStore.tstegg/core/types/src/agent-runtime/errors.tstegg/plugin/controller/src/lib/AgentControllerObject.ts
| import { MessageConverter } from './MessageConverter.ts'; | ||
| import { RunBuilder } from './RunBuilder.ts'; | ||
| import type { RunUsage } from './RunBuilder.ts'; | ||
| import type { SSEWriter } from './SSEWriter.ts'; |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Use .js specifiers for local ESM imports in tegg TS files.
These touched imports still use .ts extensions. Please switch them to .js here, and keep the same rule for the other changed tegg TS files in this cohort. As per coding guidelines, tegg/**/*.{ts,tsx} files require: “All imports should use .js extensions for ESM files”.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tegg/core/agent-runtime/src/AgentRuntime.ts` around lines 28 - 30, Update the
local ESM imports in AgentRuntime by changing the specifiers to use .js
extensions instead of .ts for the imported modules: replace imports for
MessageConverter, RunBuilder, and SSEWriter so their specifiers end with .js;
ensure the same .js extension convention is applied consistently for these
symbols in other touched tegg TS files in this cohort.
Source: Coding guidelines
| private pushEvent(buffer: RunEventBuffer, type: string, data: unknown): void { | ||
| const event: StreamEvent = { | ||
| seq: ++buffer.lastSeq, | ||
| type, | ||
| data, | ||
| ts: Date.now(), | ||
| }; | ||
| appendFileSync(buffer.filePath, JSON.stringify(event) + '\n'); | ||
| buffer.emitter.emit('event', event); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
sed -n '457,505p' tegg/core/agent-runtime/src/AgentRuntime.tsRepository: eggjs/egg
Length of output: 1735
Avoid synchronous fs writes in the per-message stream hot path.
pushEvent() uses appendFileSync(...), and executeStreamBackground() calls this.pushEvent(...) inside for await (const msg of this.executor.execRun(...)), so every yielded msg triggers a synchronous disk write. This blocks the Node event loop during token-heavy streaming and can throttle other concurrent work.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tegg/core/agent-runtime/src/AgentRuntime.ts` around lines 457 - 465,
pushEvent currently uses blocking appendFileSync which blocks the event loop
when called from executeStreamBackground/this.executor.execRun; change pushEvent
to perform non-blocking writes: either make pushEvent async and use
fs.promises.appendFile, or create and reuse a fs.createWriteStream on
RunEventBuffer (e.g., a buffer.stream or buffer.writeStream) and call write()
with proper drain handling to preserve event order and backpressure. Ensure you
update call sites (executeStreamBackground and any other callers) to await the
async pushEvent or handle write stream errors, and keep emitting
buffer.emitter.emit('event', event') immediately after scheduling the async
write so consumers still receive events in order.
| import { AgentObjectType, RunStatus, AgentNotFoundError } from '@eggjs/tegg-types/agent-runtime'; | ||
|
|
||
| import { nowUnix, newThreadId, newRunId } from './AgentStoreUtils.ts'; | ||
| import { dateBucket, newRunId, newThreadId, nowUnix, reverseMs } from './AgentStoreUtils.ts'; |
There was a problem hiding this comment.
Use .js extension for the local ESM import.
./AgentStoreUtils.ts violates the tegg ESM import-extension rule and can break emitted-runtime resolution.
Suggested fix
-import { dateBucket, newRunId, newThreadId, nowUnix, reverseMs } from './AgentStoreUtils.ts';
+import { dateBucket, newRunId, newThreadId, nowUnix, reverseMs } from './AgentStoreUtils.js';As per coding guidelines, “All imports should use .js extensions for ESM files”.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| import { dateBucket, newRunId, newThreadId, nowUnix, reverseMs } from './AgentStoreUtils.ts'; | |
| import { dateBucket, newRunId, newThreadId, nowUnix, reverseMs } from './AgentStoreUtils.js'; |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tegg/core/agent-runtime/src/OSSAgentStore.ts` at line 13, The import in
OSSAgentStore.ts is using a .ts extension which violates ESM import-extension
rules; update the import statement that brings in dateBucket, newRunId,
newThreadId, nowUnix, reverseMs from './AgentStoreUtils.ts' to use the .js
extension (e.g., './AgentStoreUtils.js') so the runtime can resolve the module
correctly; ensure you update the import only (preserve the named symbols) and
run a quick build to confirm no other imports use .ts extensions.
Source: Coding guidelines
| private writeThreadActivityIndex( | ||
| threadId: string, | ||
| createdAt: number, | ||
| updatedAtMs: number, | ||
| metadata: Record<string, unknown>, | ||
| ): void { | ||
| const indexKey = this.threadActivityIndexKey(updatedAtMs, threadId); | ||
| const indexBody = JSON.stringify({ | ||
| threadId, | ||
| createdAt, | ||
| updatedAt: Math.floor(updatedAtMs / 1000), | ||
| metadata, | ||
| }); | ||
| const tracked: Promise<void> = this.client | ||
| .put(indexKey, indexBody) | ||
| .catch((err: unknown) => { | ||
| const errForLog: Error = err instanceof Error ? err : new Error(String(err)); | ||
| this.logger.warn( | ||
| '[OSSAgentStore] failed to write thread activity index threadId=%s key=%s', | ||
| threadId, | ||
| indexKey, | ||
| errForLog, | ||
| ); | ||
| }) | ||
| .finally(() => { | ||
| this.pendingIndexWrites.delete(tracked); | ||
| }); | ||
| this.pendingIndexWrites.add(tracked); | ||
| } |
There was a problem hiding this comment.
Prevent best-effort index writes from leaking synchronous put() failures.
writeThreadActivityIndex only catches rejected promises. If client.put() throws before returning a promise, this path propagates and can fail create/update calls unexpectedly.
Suggested fix
- const tracked: Promise<void> = this.client
- .put(indexKey, indexBody)
+ const tracked: Promise<void> = (async () => {
+ await this.client.put(indexKey, indexBody);
+ })()
.catch((err: unknown) => {
const errForLog: Error = err instanceof Error ? err : new Error(String(err));
this.logger.warn(
'[OSSAgentStore] failed to write thread activity index threadId=%s key=%s',
threadId,
indexKey,
errForLog,
);
})🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tegg/core/agent-runtime/src/OSSAgentStore.ts` around lines 88 - 116,
writeThreadActivityIndex currently calls this.client.put(...) directly so a
synchronous throw escapes; wrap the call so any synchronous exception becomes a
rejected promise (e.g. create tracked via Promise.resolve().then(() =>
this.client.put(indexKey, indexBody))) and then attach the existing .catch(...)
and .finally(...) handlers, and add tracked to pendingIndexWrites; this ensures
both sync and async failures are handled and pendingIndexWrites.delete(tracked)
always runs in finally.
| import type { AgentMessage } from './AgentMessage.ts'; | ||
| import type { AgentRunConfig, RunStatus } from './AgentStore.ts'; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Verify .ts import specifiers in touched files.
rg -nP --type=ts "from\\s+['\"][^'\"]+\\.ts['\"]" \
tegg/core/types/src/agent-runtime/AgentRuntime.ts \
tegg/core/types/src/agent-runtime/AgentStore.ts
# Inspect compiler/module settings relevant to extension handling.
fd -a 'tsconfig*.json' | while read -r f; do
echo "== $f =="
jq '.compilerOptions | {module, moduleResolution, allowImportingTsExtensions, verbatimModuleSyntax}' "$f"
doneRepository: eggjs/egg
Length of output: 838
Fix ESM local import specifiers in tegg core types to use .js instead of .ts.
tegg/core/types/src/agent-runtime/AgentRuntime.ts and tegg/core/types/src/agent-runtime/AgentStore.ts currently import local types with ./*.ts extensions, which violates the tegg ESM import-extension guideline.
Suggested patch
--- a/tegg/core/types/src/agent-runtime/AgentRuntime.ts
+++ b/tegg/core/types/src/agent-runtime/AgentRuntime.ts
@@
-import type { AgentMessage } from './AgentMessage.ts';
-import type { AgentRunConfig, RunStatus } from './AgentStore.ts';
+import type { AgentMessage } from './AgentMessage.js';
+import type { AgentRunConfig, RunStatus } from './AgentStore.js';--- a/tegg/core/types/src/agent-runtime/AgentStore.ts
+++ b/tegg/core/types/src/agent-runtime/AgentStore.ts
@@
-import type { AgentMessage, InputMessage } from './AgentMessage.ts';
-import type { GetThreadOptions } from './AgentRuntime.ts';
+import type { AgentMessage, InputMessage } from './AgentMessage.js';
+import type { GetThreadOptions } from './AgentRuntime.js';📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| import type { AgentMessage } from './AgentMessage.ts'; | |
| import type { AgentRunConfig, RunStatus } from './AgentStore.ts'; | |
| import type { AgentMessage } from './AgentMessage.js'; | |
| import type { AgentRunConfig, RunStatus } from './AgentStore.js'; |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tegg/core/types/src/agent-runtime/AgentRuntime.ts` around lines 1 - 2, Update
the local ESM type import specifiers to use .js extensions: in the imports that
pull in AgentMessage, AgentRunConfig, and RunStatus (the import statements shown
in AgentRuntime.ts and AgentStore.ts), change the source specifiers from
'./AgentMessage.ts' and './AgentStore.ts' (or any other local .ts paths) to
'./AgentMessage.js' and './AgentStore.js' respectively so they comply with the
tegg ESM import-extension guideline.
Source: Coding guidelines
Motivation
egg's
@AgentControllercapability was the initial port (#5827, March 2026). Since then all active development happened on the standalone tegg repo, where the agent message model and runtime evolved significantly. This PR brings egg'snextbranch up to tegg's current implementation as the source of truth, replacing egg's older implementation.The agent surface in egg is self-contained — the changed types (
MessageObject/AgentStreamMessage→ SDK-alignedAgentMessage) are consumed only by the agent-runtime + controller agent files, so this has no impact outside the agent capability (verified: no external consumers of the removed symbols).Scope
AgentMessagemodel (replaces the OpenAI-styleMessageObjectsurface); addlatestRunId/getLatestRunId, thread metadata,GetThreadOptions,AgentTimeoutError.AgentRuntime/OSSAgentStore/MessageConverter/RunBuilder/SSEWriterto current behavior: per-thread creation-time index, persist-on-abort, session-committed cancel gating, run/thread metadata,getRunStream/getLatestRunId.getLatestRunId(GET /threads/:id/latest-run) andgetRunStream(GET /runs/:id/stream) routes, multi-param routing, and theisSessionCommittedhook onAgentHandler.getLatestRunIddelegate andgetRunStreamSSE delegate inAgentControllerObject.@eggjs/tegg-typesexport snapshot.Convention adaptation applied during the port: ESM
.tsimport suffixes, named util imports, mocha→vitest test hooks. No package renames were needed (the agent surface only uses same-named packages).Test evidence
Run locally with Node 22.22 (repo requires >= 22.18):
oxfmt --check— all 26 changed files passoxlint --type-aware --type-check --quiet— exit 0 (no errors)tsgo --noEmit(typecheck) — clean for@eggjs/tegg-types,@eggjs/agent-runtime,@eggjs/controller-decorator,@eggjs/controller-plugin,@eggjs/teggvitest runover the 5 affected packages — 42 files / 323 tests passed (with CI flags--testTimeout 20000)🤖 Generated with Claude Code
Summary by CodeRabbit
Release Notes
New Features
Improvements