Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
768 changes: 600 additions & 168 deletions tegg/core/agent-runtime/src/AgentRuntime.ts

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions tegg/core/agent-runtime/src/AgentStoreUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,31 @@ export function newThreadId(): string {
export function newRunId(): string {
return `run_${crypto.randomUUID()}`;
}

/**
* Upper bound for 13-digit millisecond timestamps. The time complement
* `TS_MAX_MS - ms` sorts newest-first in ascending key order.
*/
export const TS_MAX_MS = 9_999_999_999_999;

const REV_MS_WIDTH = String(TS_MAX_MS).length;

/**
* Encode a millisecond timestamp so ascending key order is newest-first.
*/
export function reverseMs(ms: number): string {
if (!Number.isInteger(ms) || ms < 0 || ms > TS_MAX_MS) {
throw new RangeError(`reverseMs: ms must be an integer in [0, ${TS_MAX_MS}], got ${ms}`);
}
return String(TS_MAX_MS - ms).padStart(REV_MS_WIDTH, '0');
}

/**
* Format a Unix-millisecond timestamp as a UTC `YYYY-MM-DD` bucket.
*/
export function dateBucket(ms: number): string {
if (!Number.isInteger(ms) || ms < 0) {
throw new RangeError(`dateBucket: ms must be a nonnegative integer Unix-millisecond timestamp, got ${ms}`);
}
return new Date(ms).toISOString().slice(0, 10);
}
6 changes: 6 additions & 0 deletions tegg/core/agent-runtime/src/HttpSSEWriter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ export class HttpSSEWriter implements SSEWriter {
this.res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`);
}

writeComment(text: string): void {
if (this._closed) return;
this.ensureHeaders();
this.res.write(`: ${text}\n\n`);
}

get closed(): boolean {
return this._closed;
}
Expand Down
132 changes: 30 additions & 102 deletions tegg/core/agent-runtime/src/MessageConverter.ts
Original file line number Diff line number Diff line change
@@ -1,129 +1,57 @@
import type {
CreateRunInput,
MessageObject,
MessageContentBlock,
AgentStreamMessage,
AgentStreamMessagePayload,
} from '@eggjs/tegg-types/agent-runtime';
import { AgentObjectType, MessageRole, MessageStatus, ContentBlockType } from '@eggjs/tegg-types/agent-runtime';
import type { AgentMessage, InputMessage, SDKResultMessage } from '@eggjs/tegg-types/agent-runtime';

import { nowUnix, newMsgId } from './AgentStoreUtils.ts';
import type { RunUsage } from './RunBuilder.ts';

export class MessageConverter {
/**
* Convert an AgentStreamMessage's message payload into OpenAI MessageContentBlock[].
* Extract accumulated usage from AgentMessage objects.
* Only `result` type messages carry usage information.
*/
static toContentBlocks(msg: AgentStreamMessagePayload): MessageContentBlock[] {
if (!msg) return [];
const content = msg.content;
if (typeof content === 'string') {
return [{ type: ContentBlockType.Text, text: { value: content, annotations: [] } }];
}
if (Array.isArray(content)) {
return content
.filter((part) => part.type === ContentBlockType.Text)
.map((part) => ({ type: ContentBlockType.Text, text: { value: part.text, annotations: [] } }));
}
return [];
}

/**
* Build a completed MessageObject from an AgentStreamMessage payload.
*/
static toMessageObject(msg: AgentStreamMessagePayload, runId?: string): MessageObject {
return {
id: newMsgId(),
object: AgentObjectType.ThreadMessage,
createdAt: nowUnix(),
runId,
role: MessageRole.Assistant,
status: MessageStatus.Completed,
content: MessageConverter.toContentBlocks(msg),
};
}

/**
* Extract MessageObjects and accumulated usage from AgentStreamMessage objects.
*/
static extractFromStreamMessages(
messages: AgentStreamMessage[],
runId?: string,
): {
output: MessageObject[];
usage?: RunUsage;
} {
const output: MessageObject[] = [];
static extractUsage(messages: AgentMessage[]): RunUsage | undefined {
let promptTokens = 0;
let completionTokens = 0;
let hasUsage = false;

for (const msg of messages) {
if (msg.message) {
output.push(MessageConverter.toMessageObject(msg.message, runId));
if (msg.type === 'result') {
const resultMsg = msg as SDKResultMessage;
if (resultMsg.usage) {
hasUsage = true;
promptTokens += resultMsg.usage.input_tokens ?? 0;
completionTokens += resultMsg.usage.output_tokens ?? 0;
}
}
if (msg.usage) {
hasUsage = true;
promptTokens += msg.usage.promptTokens ?? 0;
completionTokens += msg.usage.completionTokens ?? 0;
}
}

let usage: RunUsage | undefined;
if (hasUsage) {
usage = {
promptTokens,
completionTokens,
totalTokens: promptTokens + completionTokens,
};
}

return { output, usage };
}

/**
* Produce a completed copy of a streaming MessageObject with final content.
*/
static completeMessage(msg: MessageObject, content: MessageContentBlock[]): MessageObject {
return { ...msg, status: MessageStatus.Completed, content };
if (!hasUsage) return undefined;
return {
promptTokens,
completionTokens,
totalTokens: promptTokens + completionTokens,
};
}

/**
* Create an in-progress MessageObject for streaming (before content is known).
* Filter out stream_event messages before persisting to thread storage.
* Stream events are incremental deltas (one per token) only useful during
* real-time streaming; the final assistant message already contains the
* complete response.
*/
static createStreamMessage(msgId: string, runId: string): MessageObject {
return {
id: msgId,
object: AgentObjectType.ThreadMessage,
createdAt: nowUnix(),
runId,
role: MessageRole.Assistant,
status: MessageStatus.InProgress,
content: [],
};
static filterForStorage(messages: AgentMessage[]): AgentMessage[] {
return messages.filter((m) => m.type !== 'stream_event');
}

/**
* Convert input messages to MessageObjects for thread history.
* System messages are filtered out — they are transient instructions, not conversation history.
* Convert input messages to AgentMessage format for thread history.
* System messages are filtered out — they are transient instructions,
* not conversation history.
*/
static toInputMessageObjects(messages: CreateRunInput['input']['messages'], threadId?: string): MessageObject[] {
static toAgentMessages(messages: InputMessage[]): AgentMessage[] {
return messages
.filter(
(m): m is typeof m & { role: Exclude<typeof m.role, typeof MessageRole.System> } =>
m.role !== MessageRole.System,
)
.filter((m) => m.role !== 'system')
.map((m) => ({
id: newMsgId(),
object: AgentObjectType.ThreadMessage,
createdAt: nowUnix(),
threadId,
role: m.role,
status: MessageStatus.Completed,
content:
typeof m.content === 'string'
? [{ type: ContentBlockType.Text, text: { value: m.content, annotations: [] } }]
: m.content.map((p) => ({ type: ContentBlockType.Text, text: { value: p.text, annotations: [] } })),
type: m.role as 'user' | 'assistant',
message: { role: m.role, content: m.content },
}));
}
}
Loading
Loading