diff --git a/cli/builtin-proxy.js b/cli/builtin-proxy.js index 157e79c3..dcf11fee 100644 --- a/cli/builtin-proxy.js +++ b/cli/builtin-proxy.js @@ -139,35 +139,11 @@ function createBuiltinProxyRuntimeController(deps = {}) { } } - function shouldFallbackFromUpstreamResponses(status, bodyText) { - if (!Number.isFinite(status)) return false; - if (status === 404 || status === 405 || status === 501) return true; - const text = String(bodyText || ''); - if (!text) return false; - if (/not implemented/i.test(text)) return true; - if (/convert_request_failed/i.test(text)) return true; - try { - const parsed = JSON.parse(text); - const code = parsed && parsed.error && typeof parsed.error.code === 'string' ? parsed.error.code : ''; - const msg = parsed && parsed.error && typeof parsed.error.message === 'string' ? parsed.error.message : ''; - if (code === 'convert_request_failed') return true; - if (/not implemented/i.test(msg)) return true; - } catch (_) {} - return false; - } - - function shouldFallbackFromUpstreamResponsesFailure(error) { - const text = String(error || '').trim(); - if (!text) return false; - if (/timeout/i.test(text)) return true; - if (/socket hang up/i.test(text)) return true; - if (/ECONNRESET/i.test(text)) return true; - return false; - } function isTransientNetworkError(error) { const text = String(error || '').trim(); if (!text) return false; + if (/timeout/i.test(text)) return true; if (/socket hang up/i.test(text)) return true; if (/ECONNRESET|ECONNREFUSED|EPIPE|EPROTO|ETIMEDOUT/i.test(text)) return true; if (/EAI_AGAIN/i.test(text)) return true; @@ -980,17 +956,6 @@ function createBuiltinProxyRuntimeController(deps = {}) { return chatBody; } - function normalizeResponsesPayloadForUpstream(payload, stream) { - const source = isRecord(payload) ? payload : {}; - const normalized = { ...source, stream }; - if (isRecord(source.reasoning)) { - const include = Array.isArray(source.include) ? source.include.filter((item) => typeof item === 'string') : []; - if (!include.includes('reasoning.encrypted_content')) { - normalized.include = [...include, 'reasoning.encrypted_content']; - } - } - return normalized; - } function ensureResponseMetadata(payload) { const base = payload && typeof payload === 'object' && !Array.isArray(payload) ? payload : {}; @@ -1089,6 +1054,28 @@ function createBuiltinProxyRuntimeController(deps = {}) { }); } + function emitResponsesReasoningPartAdded(res, itemId, outputIndex, summaryIndex, nextSeq) { + writeSse(res, 'response.reasoning_summary_part.added', { + type: 'response.reasoning_summary_part.added', + item_id: itemId, + output_index: outputIndex, + summary_index: summaryIndex, + part: { type: 'summary_text', text: '' }, + sequence_number: nextSeq() + }); + } + + function emitResponsesReasoningPartDone(res, itemId, outputIndex, summaryIndex, text, nextSeq) { + writeSse(res, 'response.reasoning_summary_part.done', { + type: 'response.reasoning_summary_part.done', + item_id: itemId, + output_index: outputIndex, + summary_index: summaryIndex, + part: { type: 'summary_text', text: typeof text === 'string' ? text : '' }, + sequence_number: nextSeq() + }); + } + function emitResponsesToolArgumentEvents(res, item, outputIndex, nextSeq) { const eventType = item.type === 'custom_tool_call' ? 'response.custom_tool_call_input.delta' @@ -1123,7 +1110,7 @@ function createBuiltinProxyRuntimeController(deps = {}) { } } - function sendResponsesSse(res, responsePayload) { + function sendResponsesSse(res, responsePayload, options = {}) { const response = ensureResponseMetadata(responsePayload); const responseId = response.id; const model = response.model; @@ -1133,14 +1120,18 @@ function createBuiltinProxyRuntimeController(deps = {}) { return sequence; }; - writeSse(res, 'response.created', { - type: 'response.created', - response: { - id: responseId, - model, - created_at: response.created_at - } - }); + if (!options.skipCreated) { + writeSse(res, 'response.created', { + type: 'response.created', + response: { + id: responseId, + model, + created_at: response.created_at, + status: response.status || 'in_progress', + output: Array.isArray(response.output) ? response.output : [] + } + }); + } const output = Array.isArray(response.output) ? response.output : []; for (let outputIndex = 0; outputIndex < output.length; outputIndex += 1) { @@ -1228,7 +1219,7 @@ function createBuiltinProxyRuntimeController(deps = {}) { } function appendChatStreamToolCall(target, toolCall) { - if (!toolCall || typeof toolCall !== 'object') return; + if (!toolCall || typeof toolCall !== 'object') return null; const index = Number.isFinite(toolCall.index) ? toolCall.index : target.length; if (!target[index]) { target[index] = { @@ -1245,13 +1236,275 @@ function createBuiltinProxyRuntimeController(deps = {}) { if (typeof fn.name === 'string' && fn.name) current.function.name = fn.name; if (typeof fn.arguments === 'string') current.function.arguments += fn.arguments; } + return { index, current, argumentDelta: fn && typeof fn.arguments === 'string' ? fn.arguments : '' }; + } + + const DSML_TOOL_CALLS_START = '<|DSML|tool_calls>'; + const DSML_TOOL_CALLS_END = ''; + + function longestDsmlStartPrefixSuffix(text) { + if (typeof text !== 'string' || !text) return 0; + const max = Math.min(text.length, DSML_TOOL_CALLS_START.length - 1); + for (let len = max; len > 0; len -= 1) { + if (DSML_TOOL_CALLS_START.startsWith(text.slice(text.length - len))) return len; + } + return 0; + } + + function parseDsmlAttributes(text) { + const attrs = {}; + if (typeof text !== 'string') return attrs; + const attrRe = /([A-Za-z_][A-Za-z0-9_-]*)="([^"]*)"/g; + let match; + while ((match = attrRe.exec(text))) { + attrs[match[1]] = match[2]; + } + return attrs; + } + + function normalizeDsmlParameterValue(rawValue, attrs, paramName = '') { + const text = typeof rawValue === 'string' ? rawValue.trim() : ''; + if (attrs && attrs.string === 'true') { + if (/path$/i.test(paramName)) return text.replace(/\r?\n\s*/g, ''); + return text; + } + if (text === 'true') return true; + if (text === 'false') return false; + if (text === 'null') return null; + if (/^-?\d+(?:\.\d+)?$/.test(text)) return Number(text); + const parsed = parseJsonValueOrNull(text); + return parsed === null && text !== 'null' ? text : parsed; + } + + function parseDsmlToolCalls(blockText) { + const calls = []; + if (typeof blockText !== 'string' || !blockText) return calls; + const invokeRe = /<|DSML|invoke\b([^>]*)>([\s\S]*?)<\/|DSML|invoke>/g; + let invokeMatch; + while ((invokeMatch = invokeRe.exec(blockText))) { + const invokeAttrs = parseDsmlAttributes(invokeMatch[1]); + const name = asTrimmedString(invokeAttrs.name); + if (!name) continue; + const args = {}; + const body = invokeMatch[2]; + const paramRe = /<|DSML|parameter\b([^>]*)>([\s\S]*?)<\/|DSML|parameter>/g; + let paramMatch; + while ((paramMatch = paramRe.exec(body))) { + const paramAttrs = parseDsmlAttributes(paramMatch[1]); + const paramName = asTrimmedString(paramAttrs.name); + if (!paramName) continue; + args[paramName] = normalizeDsmlParameterValue(paramMatch[2], paramAttrs, paramName); + } + calls.push({ name, arguments: args }); + } + return calls; + } + + function closeChatStreamReasoningItem(state) { + if (!state || !state.reasoningItem || state.reasoningDone) return; + const outputIndex = state.output.indexOf(state.reasoningItem); + const text = state.reasoningText; + writeSse(state.res, 'response.reasoning_summary_text.done', { + type: 'response.reasoning_summary_text.done', + item_id: state.reasoningItem.id, + output_index: outputIndex, + summary_index: 0, + text, + sequence_number: state.nextSeq() + }); + emitResponsesReasoningPartDone(state.res, state.reasoningItem.id, outputIndex, 0, text, state.nextSeq); + state.reasoningItem.status = 'completed'; + state.reasoningItem.summary = [{ type: 'summary_text', text }]; + writeSse(state.res, 'response.output_item.done', { + type: 'response.output_item.done', + output_index: outputIndex, + item: buildResponsesSseItem(state.reasoningItem, state.reasoningItem.id), + sequence_number: state.nextSeq() + }); + state.reasoningDone = true; + } + + function ensureChatStreamReasoningItem(state) { + if (state.reasoningItem) return state.output.indexOf(state.reasoningItem); + state.reasoningItem = { + id: `rs_${crypto.randomBytes(8).toString('hex')}`, + type: 'reasoning', + status: 'in_progress', + summary: [{ type: 'summary_text', text: '' }] + }; + state.output.push(state.reasoningItem); + state.outputStarted = true; + const outputIndex = state.output.length - 1; + writeSse(state.res, 'response.output_item.added', { + type: 'response.output_item.added', + output_index: outputIndex, + item: buildResponsesSseItem(state.reasoningItem, state.reasoningItem.id), + sequence_number: state.nextSeq() + }); + emitResponsesReasoningPartAdded(state.res, state.reasoningItem.id, outputIndex, 0, state.nextSeq); + return outputIndex; + } + + function ensureChatStreamMessageItem(state) { + if (state.messageItem) return state.output.indexOf(state.messageItem); + state.messageItem = { + id: `msg_${crypto.randomBytes(8).toString('hex')}`, + type: 'message', + role: 'assistant', + status: 'in_progress', + content: [{ type: 'output_text', text: '' }] + }; + state.output.push(state.messageItem); + state.outputStarted = true; + const outputIndex = state.output.length - 1; + writeSse(state.res, 'response.output_item.added', { + type: 'response.output_item.added', + output_index: outputIndex, + item: buildResponsesSseItem(state.messageItem, state.messageItem.id), + sequence_number: state.nextSeq() + }); + emitResponsesTextPartAdded(state.res, state.messageItem.id, outputIndex, 0, state.nextSeq); + return outputIndex; + } + + function buildChatStreamToolItem(toolCall, toolTypesByName = {}) { + if (!toolCall || !toolCall.function) return null; + const responseType = toolTypesByName && toolTypesByName[toolCall.function.name] ? toolTypesByName[toolCall.function.name] : 'function_call'; + const callId = asTrimmedString(toolCall.id) || `call_${crypto.randomBytes(8).toString('hex')}`; + if (responseType === 'custom_tool_call') { + return { type: 'custom_tool_call', id: callId, call_id: callId, name: asTrimmedString(toolCall.function.name), input: '' }; + } + if (responseType === 'local_shell_call') { + return { type: 'local_shell_call', id: callId, call_id: callId, name: asTrimmedString(toolCall.function.name), action: {} }; + } + return { type: 'function_call', id: callId, call_id: callId, name: asTrimmedString(toolCall.function.name), arguments: '', status: 'in_progress' }; + } + + function ensureChatStreamToolItem(state, toolIndex, toolCall) { + if (!state.toolItems) state.toolItems = []; + if (state.toolItems[toolIndex]) return state.toolItems[toolIndex]; + closeChatStreamReasoningItem(state); + const item = buildChatStreamToolItem(toolCall, state.toolTypesByName || {}); + if (!item) return null; + const outputIndex = state.output.length; + state.output.push(item); + state.outputStarted = true; + const entry = { item, outputIndex, done: false }; + state.toolItems[toolIndex] = entry; + writeSse(state.res, 'response.output_item.added', { + type: 'response.output_item.added', + output_index: outputIndex, + item: buildResponsesSseItem(item, item.id || item.call_id), + sequence_number: state.nextSeq() + }); + return entry; + } + + function emitChatStreamTextDelta(state, text) { + if (!state || typeof text !== 'string' || !text) return; + closeChatStreamReasoningItem(state); + const outputIndex = ensureChatStreamMessageItem(state); + state.messageText += text; + state.messageItem.content[0].text = state.messageText; + writeSse(state.res, 'response.output_text.delta', { + type: 'response.output_text.delta', + item_id: state.messageItem.id, + output_index: outputIndex, + content_index: 0, + delta: text, + sequence_number: state.nextSeq() + }); + } + + function emitChatStreamToolCallDelta(state, toolCall) { + const appended = appendChatStreamToolCall(state.toolCalls, toolCall); + if (!appended) return; + const entry = ensureChatStreamToolItem(state, appended.index, appended.current); + if (!entry) return; + const item = entry.item; + item.call_id = asTrimmedString(appended.current.id) || item.call_id; + item.name = asTrimmedString(appended.current.function && appended.current.function.name) || item.name; + if (item.type === 'function_call') { + item.arguments = appended.current.function.arguments; + } + if (!appended.argumentDelta) return; + const wireItem = buildResponsesSseItem(item, item.id || item.call_id); + if (item.type === 'function_call') { + writeSse(state.res, 'response.function_call_arguments.delta', { + type: 'response.function_call_arguments.delta', + item_id: wireItem.id, + output_index: entry.outputIndex, + call_id: wireItem.call_id, + name: wireItem.name, + delta: appended.argumentDelta, + sequence_number: state.nextSeq() + }); + } + } + + function emitDsmlToolCallsAsResponsesSse(state, blockText) { + const calls = parseDsmlToolCalls(blockText); + for (const call of calls) { + const toolCall = { + index: state.toolCalls.length, + id: `call_${crypto.randomBytes(8).toString('hex')}`, + type: 'function', + function: { + name: call.name, + arguments: JSON.stringify(call.arguments || {}) + } + }; + emitChatStreamToolCallDelta(state, toolCall); + } + } + + function flushChatStreamContentBuffer(state, force = false) { + if (!state || typeof state.contentBuffer !== 'string' || !state.contentBuffer) return; + let buffer = state.contentBuffer; + state.contentBuffer = ''; + while (buffer) { + const start = buffer.indexOf(DSML_TOOL_CALLS_START); + if (start < 0) { + const keep = force ? 0 : longestDsmlStartPrefixSuffix(buffer); + const text = keep > 0 ? buffer.slice(0, buffer.length - keep) : buffer; + emitChatStreamTextDelta(state, text); + state.contentBuffer = keep > 0 ? buffer.slice(buffer.length - keep) : ''; + return; + } + if (start > 0) { + emitChatStreamTextDelta(state, buffer.slice(0, start)); + buffer = buffer.slice(start); + } + const end = buffer.indexOf(DSML_TOOL_CALLS_END, DSML_TOOL_CALLS_START.length); + if (end < 0) { + if (force) { + state.contentBuffer = ''; + } else { + state.contentBuffer = buffer; + } + return; + } + const blockText = buffer.slice(DSML_TOOL_CALLS_START.length, end); + emitDsmlToolCallsAsResponsesSse(state, blockText); + buffer = buffer.slice(end + DSML_TOOL_CALLS_END.length); + } + } + + function writeChatStreamContentSegment(state, segment) { + if (!state || typeof segment !== 'string' || !segment) return; + state.contentBuffer = (typeof state.contentBuffer === 'string' ? state.contentBuffer : '') + segment; + flushChatStreamContentBuffer(state, false); } function writeChatCompletionChunkAsResponsesSse(state, chunk) { if (!chunk || typeof chunk !== 'object') return; + if (!state.started && typeof chunk.id === 'string' && chunk.id) { + state.responseId = chunk.id; + } if (typeof chunk.model === 'string' && chunk.model) { state.model = chunk.model; } + beginChatStreamResponsesSse(state); const choices = Array.isArray(chunk.choices) ? chunk.choices : []; for (const choice of choices) { const delta = choice && choice.delta && typeof choice.delta === 'object' ? choice.delta : null; @@ -1263,28 +1516,13 @@ function createBuiltinProxyRuntimeController(deps = {}) { ? delta.reasoning : (typeof delta.reasoning_text === 'string' ? delta.reasoning_text : '')); if (reasoningSegment) { - if (!state.reasoningItem) { - state.reasoningItem = { - id: `rs_${crypto.randomBytes(8).toString('hex')}`, - type: 'reasoning', - summary: [{ type: 'summary_text', text: '' }] - }; - state.output.push(state.reasoningItem); - state.outputStarted = true; - beginChatStreamResponsesSse(state); - writeSse(state.res, 'response.output_item.added', { - type: 'response.output_item.added', - output_index: state.output.length - 1, - item: buildResponsesSseItem(state.reasoningItem, state.reasoningItem.id), - sequence_number: state.nextSeq() - }); - } + const outputIndex = ensureChatStreamReasoningItem(state); state.reasoningText += reasoningSegment; state.reasoningItem.summary[0].text = state.reasoningText; writeSse(state.res, 'response.reasoning_summary_text.delta', { type: 'response.reasoning_summary_text.delta', item_id: state.reasoningItem.id, - output_index: state.output.indexOf(state.reasoningItem), + output_index: outputIndex, summary_index: 0, delta: reasoningSegment, sequence_number: state.nextSeq() @@ -1292,39 +1530,12 @@ function createBuiltinProxyRuntimeController(deps = {}) { } if (typeof delta.content === 'string' && delta.content) { - if (!state.messageItem) { - state.messageItem = { - id: `msg_${crypto.randomBytes(8).toString('hex')}`, - type: 'message', - role: 'assistant', - content: [{ type: 'output_text', text: '' }] - }; - state.output.push(state.messageItem); - state.outputStarted = true; - beginChatStreamResponsesSse(state); - writeSse(state.res, 'response.output_item.added', { - type: 'response.output_item.added', - output_index: state.output.length - 1, - item: buildResponsesSseItem(state.messageItem, state.messageItem.id), - sequence_number: state.nextSeq() - }); - emitResponsesTextPartAdded(state.res, state.messageItem.id, state.output.length - 1, 0, state.nextSeq); - } - state.messageText += delta.content; - state.messageItem.content[0].text = state.messageText; - writeSse(state.res, 'response.output_text.delta', { - type: 'response.output_text.delta', - item_id: state.messageItem.id, - output_index: state.output.length - 1, - content_index: 0, - delta: delta.content, - sequence_number: state.nextSeq() - }); + writeChatStreamContentSegment(state, delta.content); } if (Array.isArray(delta.tool_calls)) { for (const toolCall of delta.tool_calls) { - appendChatStreamToolCall(state.toolCalls, toolCall); + emitChatStreamToolCallDelta(state, toolCall); } } } @@ -1349,7 +1560,7 @@ function createBuiltinProxyRuntimeController(deps = {}) { return; } try { target.write(': keepalive\n\n'); } catch (_) {} - }, 15000); + }, 5000); if (typeof timer.unref === 'function') timer.unref(); state.heartbeatTimer = timer; } @@ -1359,21 +1570,20 @@ function createBuiltinProxyRuntimeController(deps = {}) { beginChatStreamResponsesSse(state); state.finished = true; stopChatStreamHeartbeat(state); + flushChatStreamContentBuffer(state, true); - if (state.reasoningItem) { - const outputIndex = state.output.indexOf(state.reasoningItem); - writeSse(state.res, 'response.reasoning_summary_text.done', { - type: 'response.reasoning_summary_text.done', - item_id: state.reasoningItem.id, - output_index: outputIndex, - summary_index: 0, - text: state.reasoningText, - sequence_number: state.nextSeq() - }); - writeSse(state.res, 'response.output_item.done', { - type: 'response.output_item.done', + closeChatStreamReasoningItem(state); + + if (!state.messageItem && state.messageText === '' && state.reasoningText.trim() && state.toolCalls.filter(Boolean).length === 0) { + const outputIndex = ensureChatStreamMessageItem(state); + state.messageText = state.reasoningText; + state.messageItem.content[0].text = state.messageText; + writeSse(state.res, 'response.output_text.delta', { + type: 'response.output_text.delta', + item_id: state.messageItem.id, output_index: outputIndex, - item: buildResponsesSseItem(state.reasoningItem, state.reasoningItem.id), + content_index: 0, + delta: state.messageText, sequence_number: state.nextSeq() }); } @@ -1389,6 +1599,7 @@ function createBuiltinProxyRuntimeController(deps = {}) { sequence_number: state.nextSeq() }); emitResponsesTextPartDone(state.res, state.messageItem.id, outputIndex, 0, state.messageText, state.nextSeq); + state.messageItem.status = 'completed'; writeSse(state.res, 'response.output_item.done', { type: 'response.output_item.done', output_index: outputIndex, @@ -1397,32 +1608,46 @@ function createBuiltinProxyRuntimeController(deps = {}) { }); } - for (const toolCall of state.toolCalls) { + for (let toolIndex = 0; toolIndex < state.toolCalls.length; toolIndex += 1) { + const toolCall = state.toolCalls[toolIndex]; if (!toolCall) continue; - const item = buildResponsesToolCallItemFromChatToolCall(toolCall, state.toolTypesByName || {}); + let item = buildResponsesToolCallItemFromChatToolCall(toolCall, state.toolTypesByName || {}); if (!item) continue; - const outputIndex = state.output.length; - state.output.push(item); - state.outputStarted = true; - writeSse(state.res, 'response.output_item.added', { - type: 'response.output_item.added', - output_index: outputIndex, - item: buildResponsesSseItem(item, item.call_id), - sequence_number: state.nextSeq() - }); - emitResponsesToolArgumentEvents(state.res, buildResponsesSseItem(item, item.call_id), outputIndex, state.nextSeq); + if (item.type === 'function_call') { + item = { ...item, arguments: typeof item.arguments === 'string' && item.arguments.trim() ? item.arguments : '{}', status: 'completed' }; + } + const entry = state.toolItems && state.toolItems[toolIndex] + ? state.toolItems[toolIndex] + : ensureChatStreamToolItem(state, toolIndex, toolCall); + if (!entry || entry.done) continue; + const outputIndex = entry.outputIndex; + state.output[outputIndex] = item; + const wireItem = buildResponsesSseItem(item, item.id || item.call_id); + if (item.type === 'function_call') { + writeSse(state.res, 'response.function_call_arguments.done', { + type: 'response.function_call_arguments.done', + item_id: wireItem.id, + output_index: outputIndex, + call_id: wireItem.call_id, + name: wireItem.name, + arguments: wireItem.arguments, + sequence_number: state.nextSeq() + }); + } else if (item.type === 'custom_tool_call') { + emitResponsesToolArgumentEvents(state.res, wireItem, outputIndex, state.nextSeq); + } writeSse(state.res, 'response.output_item.done', { type: 'response.output_item.done', output_index: outputIndex, - item: buildResponsesSseItem(item, item.call_id), + item: wireItem, sequence_number: state.nextSeq() }); + entry.done = true; } const response = ensureResponseMetadata({ id: state.responseId, model: state.model, - created_at: state.createdAt, status: 'completed', output: state.output }); @@ -1440,9 +1665,9 @@ function createBuiltinProxyRuntimeController(deps = {}) { } catch (_) {} } - function beginChatStreamResponsesSse(state) { - if (!state || state.started) return; - state.started = true; + function openChatStreamResponsesSseTransport(state) { + if (!state || state.transportStarted) return; + state.transportStarted = true; const res = state.res; if (!res.headersSent) { res.writeHead(200, { @@ -1451,24 +1676,43 @@ function createBuiltinProxyRuntimeController(deps = {}) { 'Connection': 'keep-alive', 'X-Accel-Buffering': 'no' }); + if (typeof res.flushHeaders === 'function') { + try { res.flushHeaders(); } catch (_) {} + } } startChatStreamHeartbeat(state); if (typeof res.on === 'function' && !state.closeListenerAttached) { state.closeListenerAttached = true; res.on('close', () => { + state.clientClosed = true; stopChatStreamHeartbeat(state); if (!state.finished && state.upstreamReq) { try { state.upstreamReq.destroy(new Error('client aborted')); } catch (_) {} } }); } + if (!state.transportKeepaliveSent && !res.writableEnded && !res.destroyed) { + state.transportKeepaliveSent = true; + try { res.write(': keepalive\n\n'); } catch (_) {} + } + } + + function beginChatStreamResponsesSse(state) { + if (!state) return; + openChatStreamResponsesSseTransport(state); + if (state.started) return; + state.started = true; + const res = state.res; + const inProgressResponse = { + id: state.responseId, + object: 'response', + model: state.model, + status: 'in_progress', + output: [] + }; writeSse(res, 'response.created', { type: 'response.created', - response: { - id: state.responseId, - model: state.model, - created_at: state.createdAt - } + response: inProgressResponse }); } @@ -1487,18 +1731,23 @@ function createBuiltinProxyRuntimeController(deps = {}) { upstreamReq: null, responseId: `resp_${crypto.randomBytes(10).toString('hex')}`, model: typeof model === 'string' ? model : '', - createdAt: Math.floor(Date.now() / 1000), output: [], messageItem: null, messageText: '', + contentBuffer: '', reasoningItem: null, reasoningText: '', + reasoningDone: false, toolCalls: [], + toolItems: [], toolTypesByName: options.toolTypesByName || {}, finished: false, started: false, + transportStarted: false, + transportKeepaliveSent: false, outputStarted: false, closeListenerAttached: false, + clientClosed: false, nextSeq: () => { sequence += 1; return sequence; @@ -1564,6 +1813,10 @@ function createBuiltinProxyRuntimeController(deps = {}) { finish({ ok: false, retryTransient: true, error: reason || 'upstream stream failed' }); return; } + if (sharedState && sharedState.started && !sharedState.outputStarted) { + finish({ ok: false, retryTransient: true, error: reason || 'upstream stream failed' }); + return; + } if (res.headersSent) { failResponsesSseRaw(res, reason); finish({ ok: true }); @@ -1599,12 +1852,14 @@ function createBuiltinProxyRuntimeController(deps = {}) { upstreamRes.on('end', () => { const text = chunks.length ? Buffer.concat(chunks).toString('utf-8') : ''; const parsedJson = parseJsonOrError(text); - res.writeHead(200, { - 'Content-Type': 'text/event-stream; charset=utf-8', - 'Cache-Control': 'no-cache', - 'Connection': 'keep-alive', - 'X-Accel-Buffering': 'no' - }); + if (!res.headersSent) { + res.writeHead(200, { + 'Content-Type': 'text/event-stream; charset=utf-8', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + 'X-Accel-Buffering': 'no' + }); + } if (parsedJson.error) { writeSse(res, 'response.failed', { type: 'response.failed', error: `invalid upstream response: ${parsedJson.error}` }); writeSse(res, 'done', '[DONE]'); @@ -1614,7 +1869,7 @@ function createBuiltinProxyRuntimeController(deps = {}) { } sendResponsesSse(res, buildResponsesPayloadFromChatCompletion(parsedJson.value, model, { toolTypesByName: options.toolTypesByName || {} - })); + }), { skipCreated: !!(sharedState && sharedState.started) }); res.end(); finish({ ok: true }); }); @@ -1625,7 +1880,7 @@ function createBuiltinProxyRuntimeController(deps = {}) { state.upstreamReq = req; if (!state.model && model) state.model = model; streamState = state; - beginChatStreamResponsesSse(state); + openChatStreamResponsesSseTransport(state); let buffer = ''; const handleEventBlock = (block) => { @@ -1643,7 +1898,6 @@ function createBuiltinProxyRuntimeController(deps = {}) { } const parsedChunk = parseJsonOrError(data); if (!parsedChunk.error) { - beginChatStreamResponsesSse(state); writeChatCompletionChunkAsResponsesSse(state, parsedChunk.value); } }; @@ -1665,137 +1919,23 @@ function createBuiltinProxyRuntimeController(deps = {}) { finish({ ok: true }); }); }); + sharedState.upstreamReq = req; req.setTimeout(timeoutMs, () => { if (streamAccepted) return; try { req.destroy(new Error('timeout')); } catch (_) {} finish({ ok: false, error: 'timeout' }); }); req.on('error', (err) => finish({ ok: false, error: err && err.message ? err.message : 'request failed' })); + if (sharedState.clientClosed || res.writableEnded || res.destroyed) { + try { req.destroy(new Error('client aborted')); } catch (_) {} + finish({ ok: true }); + return; + } if (bodyText) req.write(bodyText); req.end(); }); } - function streamResponsesSse(targetUrl, options = {}) { - const parsed = new URL(targetUrl); - const transport = parsed.protocol === 'https:' ? https : http; - const bodyText = options.body ? JSON.stringify(options.body) : ''; - const headers = { - 'Accept': 'text/event-stream', - ...(options.body ? { 'Content-Type': 'application/json' } : {}), - ...(options.headers || {}) - }; - if (options.body) { - headers['Content-Length'] = Buffer.byteLength(bodyText, 'utf-8'); - } - const timeoutMs = Number.isFinite(options.timeoutMs) - ? Math.max(1000, Number(options.timeoutMs)) - : 30000; - const res = options.res; - - return new Promise((resolve) => { - let settled = false; - let streamAccepted = false; - const finish = (value) => { - if (settled) return; - settled = true; - resolve(value); - }; - const req = transport.request({ - protocol: parsed.protocol, - hostname: parsed.hostname, - port: parsed.port || (parsed.protocol === 'https:' ? 443 : 80), - method: options.method || 'POST', - path: `${parsed.pathname}${parsed.search}`, - headers, - agent: parsed.protocol === 'https:' ? HTTPS_KEEP_ALIVE_AGENT : HTTP_KEEP_ALIVE_AGENT - }, (upstreamRes) => { - const status = upstreamRes.statusCode || 0; - const chunks = []; - const contentType = String(upstreamRes.headers && upstreamRes.headers['content-type'] || ''); - - const collectBody = (done) => { - upstreamRes.on('data', (chunk) => chunk && chunks.push(chunk)); - upstreamRes.on('end', () => done(chunks.length ? Buffer.concat(chunks).toString('utf-8') : '')); - }; - - upstreamRes.on('error', (err) => finish({ ok: false, error: err && err.message ? err.message : 'upstream stream failed' })); - upstreamRes.on('aborted', () => finish({ ok: false, retryTransient: true, error: 'upstream stream aborted' })); - - if (status === 404 || status === 405) { - collectBody((bodyTextResult) => finish({ retry: true, status, bodyText: bodyTextResult })); - return; - } - - if (status >= 400) { - collectBody((bodyTextResult) => finish({ ok: false, status, bodyText: bodyTextResult })); - return; - } - - if (/text\/event-stream/i.test(contentType)) { - streamAccepted = true; - req.setTimeout(0); - res.writeHead(200, { - 'Content-Type': 'text/event-stream; charset=utf-8', - 'Cache-Control': 'no-cache', - 'Connection': 'keep-alive', - 'X-Accel-Buffering': 'no' - }); - upstreamRes.on('data', (chunk) => { - if (chunk && !res.writableEnded) res.write(chunk); - }); - upstreamRes.on('end', () => { - if (!res.writableEnded) res.end(); - finish({ ok: true }); - }); - return; - } - - collectBody((text) => { - const parsedJson = parseJsonOrError(text); - res.writeHead(200, { - 'Content-Type': 'text/event-stream; charset=utf-8', - 'Cache-Control': 'no-cache', - 'Connection': 'keep-alive', - 'X-Accel-Buffering': 'no' - }); - if (parsedJson.error) { - writeSse(res, 'response.failed', { type: 'response.failed', error: `invalid upstream response: ${parsedJson.error}` }); - writeSse(res, 'done', '[DONE]'); - res.end(); - finish({ ok: true }); - return; - } - sendResponsesSse(res, ensureResponseMetadata(parsedJson.value)); - res.end(); - finish({ ok: true }); - }); - }); - req.setTimeout(timeoutMs, () => { - if (streamAccepted) return; - try { req.destroy(new Error('timeout')); } catch (_) {} - finish({ ok: false, error: 'timeout' }); - }); - req.on('error', (err) => finish({ ok: false, error: err && err.message ? err.message : 'request failed' })); - if (bodyText) req.write(bodyText); - req.end(); - }); - } - - async function streamResponsesSseWithFallbackUrls(baseUrl, pathSuffix, options = {}) { - const urls = buildUpstreamUrlCandidates(baseUrl, pathSuffix); - if (urls.length === 0) { - return { ok: false, error: 'failed to build upstream URL' }; - } - let lastResult = null; - for (const url of urls) { - const result = await retryTransientRequest(() => streamResponsesSse(url, options)); - lastResult = result; - if (result && result.retry) continue; - return result; - } - return lastResult || { ok: false, error: 'failed to build upstream URL' }; - } async function streamChatCompletionsAsResponsesSseWithFallbackUrls(baseUrl, pathSuffix, options = {}) { const urls = buildUpstreamUrlCandidates(baseUrl, pathSuffix); @@ -2161,10 +2301,11 @@ function createBuiltinProxyRuntimeController(deps = {}) { return; } - // Responses shim: - // - Codex CLI 默认走 /v1/responses(含 SSE) - // - SSE/streaming 任务优先保持 /v1/responses,避免 Codex-only upstream 把 fallback 后的 chat/completions 链路误判为非 Codex 客户端 - // - 仅在上游明确不支持 Responses 时,才转换到 chat/completions 并回包为 responses。 + // Codex built-in conversion follows the sub2api hot path: /v1/responses + // requests are converted directly to /v1/chat/completions, then converted + // back to Responses on the way out. Do not probe upstream /responses first: + // broken or half-supported Responses endpoints add latency, hang streaming + // sessions, and are exactly the extra abstraction this proxy is meant to avoid. if ((incomingPath === '/v1/responses' || incomingPath === '/v1/responses/') && (req.method || 'GET').toUpperCase() === 'POST') { void (async () => { const { body, error } = await readRequestBody(req, 10 * 1024 * 1024); @@ -2190,31 +2331,6 @@ function createBuiltinProxyRuntimeController(deps = {}) { const toolTypesByName = collectResponsesToolTypesByName(payload.tools); if (wantsStream) { - const streamedResponses = await streamResponsesSseWithFallbackUrls(upstream.baseUrl, 'responses', { - method: 'POST', - headers: commonHeaders, - timeoutMs, - body: normalizeResponsesPayloadForUpstream(payload, true), - res, - model - }); - if (streamedResponses.ok) return; - const canFallbackToChat = streamedResponses.status - ? shouldFallbackFromUpstreamResponses(streamedResponses.status, streamedResponses.bodyText) - : false; - if (!canFallbackToChat) { - if (!res.headersSent) { - const status = streamedResponses.status && streamedResponses.status >= 400 ? streamedResponses.status : 502; - res.writeHead(status, { 'Content-Type': 'application/json; charset=utf-8' }); - res.end(streamedResponses.bodyText || JSON.stringify({ error: streamedResponses.error || 'proxy request failed' })); - } else if (!res.writableEnded) { - writeSse(res, 'response.failed', { type: 'response.failed', error: streamedResponses.error || streamedResponses.bodyText || 'proxy request failed' }); - writeSse(res, 'done', '[DONE]'); - res.end(); - } - return; - } - const streamingChatBody = { ...chatBody, stream: true }; const streamed = await streamChatCompletionsAsResponsesSseWithFallbackUrls(upstream.baseUrl, 'chat/completions', { method: 'POST', @@ -2238,57 +2354,6 @@ function createBuiltinProxyRuntimeController(deps = {}) { return; } - const upstreamResponses = await proxyRequestJsonWithFallbackUrls(upstream.baseUrl, 'responses', { - method: 'POST', - headers: commonHeaders, - timeoutMs, - body: normalizeResponsesPayloadForUpstream(payload, false) - }); - - // 优先走上游 /responses(如果支持)。若上游报错且不是“端点不支持”,则直接透传错误。 - if (upstreamResponses.ok && upstreamResponses.status >= 200 && upstreamResponses.status < 300) { - const json = parseJsonOrError(upstreamResponses.bodyText); - if (json.error) { - res.writeHead(502, { 'Content-Type': 'application/json; charset=utf-8' }); - res.end(JSON.stringify({ error: `Upstream JSON parse failed: ${json.error}` })); - return; - } - const responsesPayload = ensureResponseMetadata(json.value); - if (wantsStream) { - res.writeHead(200, { - 'Content-Type': 'text/event-stream; charset=utf-8', - 'Cache-Control': 'no-cache', - 'Connection': 'keep-alive', - 'X-Accel-Buffering': 'no' - }); - sendResponsesSse(res, responsesPayload); - res.end(); - return; - } - res.writeHead(200, { 'Content-Type': 'application/json; charset=utf-8' }); - res.end(JSON.stringify(responsesPayload)); - return; - } - - if (upstreamResponses.ok && upstreamResponses.status >= 400) { - if (!shouldFallbackFromUpstreamResponses(upstreamResponses.status, upstreamResponses.bodyText)) { - res.writeHead(upstreamResponses.status, { 'Content-Type': 'application/json; charset=utf-8' }); - res.end(upstreamResponses.bodyText || JSON.stringify({ error: 'Upstream error' })); - return; - } - // fallthrough to chat/completions conversion - } - - if (!upstreamResponses.ok) { - if (!shouldFallbackFromUpstreamResponsesFailure(upstreamResponses.error)) { - res.writeHead(502, { 'Content-Type': 'application/json; charset=utf-8' }); - res.end(JSON.stringify({ error: upstreamResponses.error || 'Upstream request failed' })); - return; - } - // Some OpenAI-compatible gateways accept /responses but never complete it. - // Treat that as an unsupported Responses endpoint and try the chat fallback. - } - const upstreamChat = await proxyRequestJsonWithFallbackUrls(upstream.baseUrl, 'chat/completions', { method: 'POST', headers: commonHeaders, diff --git a/cli/openai-bridge.js b/cli/openai-bridge.js index 33f62fa4..c9a0a37d 100644 --- a/cli/openai-bridge.js +++ b/cli/openai-bridge.js @@ -1129,7 +1129,38 @@ function sendResponsesSse(res, responsePayload) { writeSse(res, 'response.completed', { type: 'response.completed', response }); writeSse(res, 'done', '[DONE]'); - } +} + +function sendResponsesCompletedErrorSse(res, error) { + if (!res || res.writableEnded || res.destroyed) return; + if (!res.headersSent) { + res.writeHead(200, { + 'Content-Type': 'text/event-stream; charset=utf-8', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + 'X-Accel-Buffering': 'no' + }); + if (typeof res.flushHeaders === 'function') res.flushHeaders(); + } + const message = asTrimmedString(error) || 'Upstream request failed'; + const itemId = `msg_${crypto.randomBytes(8).toString('hex')}`; + sendResponsesSse(res, { + id: `resp_${crypto.randomBytes(10).toString('hex')}`, + model: '', + status: 'completed', + output: [{ + id: itemId, + type: 'message', + role: 'assistant', + content: [{ + type: 'output_text', + text: `Upstream provider failed after retries: ${message}` + }] + }], + output_text: `Upstream provider failed after retries: ${message}` + }); + res.end(); +} function extractResponsesOutputText(payload) { if (!payload || typeof payload !== 'object') return ''; @@ -1232,6 +1263,7 @@ function isLoopbackAddress(address) { function isTransientNetworkError(error) { const text = String(error || '').trim(); if (!text) return false; + if (/timeout/i.test(text)) return true; if (/socket hang up/i.test(text)) return true; if (/ECONNRESET|ECONNREFUSED|EPIPE|EPROTO|ETIMEDOUT/i.test(text)) return true; if (/EAI_AGAIN/i.test(text)) return true; @@ -1240,9 +1272,23 @@ function isTransientNetworkError(error) { return false; } +function isRetryableUpstreamStatus(status, bodyText = '') { + if (!Number.isFinite(status)) return false; + if (status === 408 || status === 409 || status === 425 || status === 429) return true; + if (status >= 500 && status <= 599) return true; + const text = String(bodyText || ''); + // Some OpenAI-compatible provider pools occasionally surface quota/balance + // errors from one upstream account even though a later attempt can succeed. + // Retry this class inside the bridge so Codex does not enter its slow + // user-visible reconnect loop for a one-off bad upstream shard. + if (status === 403 && /insufficient\s+balance|quota|rate.?limit|temporar|try again|overload/i.test(text)) return true; + return false; +} + const TRANSIENT_RETRY_DELAYS_MS = [200, 600]; -async function retryTransientRequest(executor) { +async function retryTransientRequest(executor, options = {}) { + const retryStatus = typeof options.retryStatus === 'function' ? options.retryStatus : null; let lastResult = null; for (let attempt = 0; attempt <= TRANSIENT_RETRY_DELAYS_MS.length; attempt += 1) { if (attempt > 0) { @@ -1259,8 +1305,12 @@ async function retryTransientRequest(executor) { if (!result) return result; if (result.ok) return result; if (result.retry) return result; - if (result.status && result.status > 0) return result; - if (!isTransientNetworkError(result.error)) return result; + if (result.status && result.status > 0) { + const shouldRetryStatus = retryStatus ? retryStatus(result.status, result.bodyText || result.error || '') : false; + if (!shouldRetryStatus) return result; + } else if (!isTransientNetworkError(result.error)) { + return result; + } } return lastResult; } @@ -1373,7 +1423,7 @@ function emitResponsesToolArgumentEvents(res, item, outputIndex, nextSeq) { } function appendChatStreamToolCall(target, toolCall) { - if (!toolCall || typeof toolCall !== 'object') return; + if (!toolCall || typeof toolCall !== 'object') return null; const index = Number.isFinite(toolCall.index) ? toolCall.index : target.length; if (!target[index]) { target[index] = { @@ -1390,6 +1440,150 @@ function appendChatStreamToolCall(target, toolCall) { if (typeof fn.name === 'string' && fn.name) current.function.name = fn.name; if (typeof fn.arguments === 'string') current.function.arguments += fn.arguments; } + return { index, current }; +} + +const DSML_TOOL_CALLS_START = '<|DSML|tool_calls>'; +const DSML_TOOL_CALLS_END = ''; + +function longestDsmlStartPrefixSuffix(text) { + if (typeof text !== 'string' || !text) return 0; + const max = Math.min(text.length, DSML_TOOL_CALLS_START.length - 1); + for (let len = max; len > 0; len -= 1) { + if (DSML_TOOL_CALLS_START.startsWith(text.slice(text.length - len))) return len; + } + return 0; +} + +function parseDsmlAttributes(text) { + const attrs = {}; + if (typeof text !== 'string') return attrs; + const attrRe = /([A-Za-z_][A-Za-z0-9_-]*)="([^"]*)"/g; + let match; + while ((match = attrRe.exec(text))) { + attrs[match[1]] = match[2]; + } + return attrs; +} + +function normalizeDsmlParameterValue(rawValue, attrs, paramName = '') { + const text = typeof rawValue === 'string' ? rawValue.trim() : ''; + if (attrs && attrs.string === 'true') { + if (/path$/i.test(paramName)) return text.replace(/\r?\n\s*/g, ''); + return text; + } + if (text === 'true') return true; + if (text === 'false') return false; + if (text === 'null') return null; + if (/^-?\d+(?:\.\d+)?$/.test(text)) return Number(text); + const parsed = parseJsonValueOrNull(text); + return parsed === null && text !== 'null' ? text : parsed; +} + +function parseDsmlToolCalls(blockText) { + const calls = []; + if (typeof blockText !== 'string' || !blockText) return calls; + const invokeRe = /<|DSML|invoke\b([^>]*)>([\s\S]*?)<\/|DSML|invoke>/g; + let invokeMatch; + while ((invokeMatch = invokeRe.exec(blockText))) { + const invokeAttrs = parseDsmlAttributes(invokeMatch[1]); + const name = asTrimmedString(invokeAttrs.name); + if (!name) continue; + const args = {}; + const body = invokeMatch[2]; + const paramRe = /<|DSML|parameter\b([^>]*)>([\s\S]*?)<\/|DSML|parameter>/g; + let paramMatch; + while ((paramMatch = paramRe.exec(body))) { + const paramAttrs = parseDsmlAttributes(paramMatch[1]); + const paramName = asTrimmedString(paramAttrs.name); + if (!paramName) continue; + args[paramName] = normalizeDsmlParameterValue(paramMatch[2], paramAttrs, paramName); + } + calls.push({ name, arguments: args }); + } + return calls; +} + +function emitChatStreamTextDelta(state, text) { + if (!state || typeof text !== 'string' || !text) return; + if (!state.messageItem) { + state.messageItem = { + id: `msg_${crypto.randomBytes(8).toString('hex')}`, + type: 'message', + role: 'assistant', + content: [{ type: 'output_text', text: '' }] + }; + state.output.push(state.messageItem); + writeSse(state.res, 'response.output_item.added', { + type: 'response.output_item.added', + output_index: state.output.length - 1, + item: buildResponsesSseItem(state.messageItem, state.messageItem.id) + }); + emitResponsesTextPartAdded(state.res, state.messageItem.id, state.output.length - 1, 0, state.nextSeq); + } + state.messageText += text; + state.messageItem.content[0].text = state.messageText; + writeSse(state.res, 'response.output_text.delta', { + type: 'response.output_text.delta', + item_id: state.messageItem.id, + output_index: state.output.length - 1, + content_index: 0, + delta: text, + sequence_number: state.nextSeq() + }); +} + +function appendDsmlToolCallsToChatStreamState(state, blockText) { + const calls = parseDsmlToolCalls(blockText); + for (const call of calls) { + appendChatStreamToolCall(state.toolCalls, { + index: state.toolCalls.length, + id: `call_${crypto.randomBytes(8).toString('hex')}`, + type: 'function', + function: { + name: call.name, + arguments: JSON.stringify(call.arguments || {}) + } + }); + } +} + +function flushChatStreamContentBuffer(state, force = false) { + if (!state || typeof state.contentBuffer !== 'string' || !state.contentBuffer) return; + let buffer = state.contentBuffer; + state.contentBuffer = ''; + while (buffer) { + const start = buffer.indexOf(DSML_TOOL_CALLS_START); + if (start < 0) { + const keep = force ? 0 : longestDsmlStartPrefixSuffix(buffer); + const text = keep > 0 ? buffer.slice(0, buffer.length - keep) : buffer; + emitChatStreamTextDelta(state, text); + state.contentBuffer = keep > 0 ? buffer.slice(buffer.length - keep) : ''; + return; + } + if (start > 0) { + emitChatStreamTextDelta(state, buffer.slice(0, start)); + buffer = buffer.slice(start); + } + const end = buffer.indexOf(DSML_TOOL_CALLS_END, DSML_TOOL_CALLS_START.length); + if (end < 0) { + if (force) { + state.contentBuffer = ''; + } else { + state.contentBuffer = buffer; + } + return; + } + const blockText = buffer.slice(DSML_TOOL_CALLS_START.length, end); + appendDsmlToolCallsToChatStreamState(state, blockText); + buffer = buffer.slice(end + DSML_TOOL_CALLS_END.length); + } +} + +function writeChatStreamContentSegment(state, segment) { + if (!state || typeof segment !== 'string' || !segment) return; + state.contentBuffer = (typeof state.contentBuffer === 'string' ? state.contentBuffer : '') + segment; + flushChatStreamContentBuffer(state, false); } function writeChatCompletionChunkAsResponsesSse(state, chunk) { @@ -1433,40 +1627,12 @@ function writeChatCompletionChunkAsResponsesSse(state, chunk) { }); } - const segments = []; // DeepSeek-style OpenAI-compatible streams may emit private reasoning in // `reasoning_content` before the final answer. Responses `output_text` // must stay user-visible answer text only; forwarding reasoning here // pollutes Codex output and breaks exact-answer prompts. if (typeof delta.content === 'string' && delta.content) { - segments.push(delta.content); - } - for (const seg of segments) { - if (!state.messageItem) { - state.messageItem = { - id: `msg_${crypto.randomBytes(8).toString('hex')}`, - type: 'message', - role: 'assistant', - content: [{ type: 'output_text', text: '' }] - }; - state.output.push(state.messageItem); - writeSse(state.res, 'response.output_item.added', { - type: 'response.output_item.added', - output_index: state.output.length - 1, - item: buildResponsesSseItem(state.messageItem, state.messageItem.id) - }); - emitResponsesTextPartAdded(state.res, state.messageItem.id, state.output.length - 1, 0, state.nextSeq); - } - state.messageText += seg; - state.messageItem.content[0].text = state.messageText; - writeSse(state.res, 'response.output_text.delta', { - type: 'response.output_text.delta', - item_id: state.messageItem.id, - output_index: state.output.length - 1, - content_index: 0, - delta: seg, - sequence_number: state.nextSeq() - }); + writeChatStreamContentSegment(state, delta.content); } if (Array.isArray(delta.tool_calls)) { @@ -1484,6 +1650,7 @@ function writeChatCompletionChunkAsResponsesSse(state, chunk) { function finishChatStreamResponsesSse(state) { if (!state || state.finished) return; state.finished = true; + flushChatStreamContentBuffer(state, true); if (state.reasoningItem) { const outputIndex = state.output.indexOf(state.reasoningItem); @@ -1559,23 +1726,11 @@ function finishChatStreamResponsesSse(state) { function failChatStreamResponsesSse(state, errorMessage) { if (!state || state.finished) return; - state.finished = true; - writeSse(state.res, 'response.failed', { - type: 'response.failed', - response: ensureResponseMetadata({ - id: state.responseId, - model: state.model, - created_at: state.createdAt, - status: 'failed', - output: state.output, - output_text: state.messageText - }), - error: String(errorMessage || 'upstream stream failed') - }); - writeSse(state.res, 'done', '[DONE]'); - if (!state.res.writableEnded && !state.res.destroyed) { - state.res.end(); - } + flushChatStreamContentBuffer(state, true); + const message = asTrimmedString(errorMessage) || 'upstream stream failed'; + const prefix = state.messageText ? '\n\n' : ''; + emitChatStreamTextDelta(state, `${prefix}Upstream provider failed: ${message}`); + finishChatStreamResponsesSse(state); } function formatUpstreamStreamError(errorValue) { @@ -1612,6 +1767,8 @@ function streamChatCompletionsAsResponsesSse(targetUrl, options = {}) { return new Promise((resolve) => { let settled = false; let upstreamReq = null; + let outputStarted = false; + let activeStreamState = null; const finish = (value) => { if (settled) return; settled = true; @@ -1675,6 +1832,7 @@ function streamChatCompletionsAsResponsesSse(targetUrl, options = {}) { }); if (typeof res.flushHeaders === 'function') res.flushHeaders(); } + outputStarted = true; if (!/text\/event-stream/i.test(contentType)) { upstreamRes.on('data', collectChunk); @@ -1682,9 +1840,7 @@ function streamChatCompletionsAsResponsesSse(targetUrl, options = {}) { const text = chunks.length ? Buffer.concat(chunks).toString('utf-8') : ''; const parsedJson = parseJsonOrError(text); if (parsedJson.error) { - writeSse(res, 'response.failed', { type: 'response.failed', error: `invalid upstream response: ${parsedJson.error}` }); - writeSse(res, 'done', '[DONE]'); - if (!res.writableEnded && !res.destroyed) res.end(); + sendResponsesCompletedErrorSse(res, `invalid upstream response: ${parsedJson.error}`); finish({ ok: true }); return; } @@ -1707,6 +1863,7 @@ function streamChatCompletionsAsResponsesSse(targetUrl, options = {}) { output: [], messageItem: null, messageText: '', + contentBuffer: '', reasoningItem: null, reasoningText: '', toolCalls: [], @@ -1719,6 +1876,7 @@ function streamChatCompletionsAsResponsesSse(targetUrl, options = {}) { return sequence; } }; + activeStreamState = state; writeSse(res, 'response.created', { type: 'response.created', response: { @@ -1770,11 +1928,6 @@ function streamChatCompletionsAsResponsesSse(targetUrl, options = {}) { upstreamRes.on('end', () => { buffer += utf8Decoder.end(); if (buffer.trim()) handleEventBlock(buffer); - if (!state.finished && !state.sawDone && !state.sawFinishReason) { - failChatStreamResponsesSse(state, 'upstream stream ended before [DONE]'); - finish({ ok: true }); - return; - } finishChatStreamResponsesSse(state); finish({ ok: true }); }); @@ -1789,9 +1942,30 @@ function streamChatCompletionsAsResponsesSse(targetUrl, options = {}) { }); upstreamReq.setTimeout(timeoutMs, () => { try { upstreamReq.destroy(new Error('timeout')); } catch (_) {} + if (outputStarted) { + if (activeStreamState && !activeStreamState.finished) { + failChatStreamResponsesSse(activeStreamState, 'timeout'); + } else if (!res.writableEnded && !res.destroyed) { + sendResponsesCompletedErrorSse(res, 'timeout'); + } + finish({ ok: true }); + return; + } finish({ ok: false, error: 'timeout' }); }); - upstreamReq.on('error', (err) => finish({ ok: false, error: err && err.message ? err.message : 'request failed' })); + upstreamReq.on('error', (err) => { + const message = err && err.message ? err.message : 'request failed'; + if (outputStarted) { + if (activeStreamState && !activeStreamState.finished) { + failChatStreamResponsesSse(activeStreamState, message); + } else if (!res.writableEnded && !res.destroyed) { + sendResponsesCompletedErrorSse(res, message); + } + finish({ ok: true }); + return; + } + finish({ ok: false, error: message }); + }); if (bodyText) upstreamReq.write(bodyText); upstreamReq.end(); }); @@ -2230,47 +2404,11 @@ function createOpenaiBridgeHttpHandler(options = {}) { const wantsSse = /text\/event-stream/i.test(String(acceptHeader || '')); if (streamRequested && wantsSse) { - const upstreamResponsesUrl = joinApiUrl(upstream.baseUrl, 'responses'); - const skipResponsesProbe = isResponsesKnownUnsupported(upstream.baseUrl); - const streamedResponses = skipResponsesProbe - ? { ok: false, status: 404, bodyText: '' } - : await retryTransientRequest(() => streamResponsesSse(upstreamResponsesUrl, { - method: 'POST', - body: normalizeResponsesPayloadForUpstream(responsesRequest, true), - headers: codexHeaders, - timeoutMs: streamTimeoutMs, - maxBytes: maxUpstreamBytes, - httpAgent, - httpsAgent, - res - })); - - if (streamedResponses.ok) { - clearResponsesUnsupported(upstream.baseUrl); - return; - } - - const canFallbackToChat = streamedResponses.status - ? shouldFallbackFromUpstreamResponses(streamedResponses.status, streamedResponses.bodyText) - : false; - if (!canFallbackToChat) { - if (res.writableEnded || res.destroyed) return; - const status = streamedResponses.status && streamedResponses.status >= 400 ? streamedResponses.status : 502; - if (!res.headersSent) { - res.writeHead(status, { 'Content-Type': 'application/json; charset=utf-8' }); - res.end(streamedResponses.bodyText || JSON.stringify({ error: streamedResponses.error || 'Upstream request failed' })); - } else { - writeSse(res, 'response.failed', { type: 'response.failed', error: streamedResponses.error || streamedResponses.bodyText || 'Upstream request failed' }); - writeSse(res, 'done', '[DONE]'); - res.end(); - } - return; - } - - if (!skipResponsesProbe && isResponsesEndpointUnsupported(streamedResponses.status, streamedResponses.bodyText)) { - markResponsesUnsupported(upstream.baseUrl); - } - + // Codex-compatible OpenAI-style providers commonly expose reliable + // /chat/completions but either do not implement /responses or emit + // non-terminal Responses SSE. Match dedicated Codex proxy behavior: + // translate Codex Responses requests to Chat Completions and let the + // bridge own the terminal response.completed + [DONE] contract. const converted = convertResponsesRequestToChatCompletions(responsesRequest); if (converted.error) { res.writeHead(400, { 'Content-Type': 'application/json; charset=utf-8' }); @@ -2290,19 +2428,9 @@ function createOpenaiBridgeHttpHandler(options = {}) { res, model: typeof chatBody.model === 'string' ? chatBody.model : '', toolTypesByName: converted.toolTypesByName || {} - })); + }), { retryStatus: isRetryableUpstreamStatus }); if (!streamed.ok) { - if (res.writableEnded || res.destroyed) { - return; - } - if (!res.headersSent) { - res.writeHead(streamed.status && streamed.status >= 400 ? streamed.status : 502, { 'Content-Type': 'application/json; charset=utf-8' }); - res.end(streamed.bodyText || JSON.stringify({ error: streamed.error || 'Upstream request failed' })); - } else if (!res.writableEnded && !res.destroyed) { - writeSse(res, 'response.failed', { type: 'response.failed', error: streamed.error || streamed.bodyText || 'Upstream request failed' }); - writeSse(res, 'done', '[DONE]'); - res.end(); - } + sendResponsesCompletedErrorSse(res, streamed.error || streamed.bodyText || 'Upstream request failed'); } return; } diff --git a/tests/e2e/run.js b/tests/e2e/run.js index 49e712c7..3944669c 100644 --- a/tests/e2e/run.js +++ b/tests/e2e/run.js @@ -16,6 +16,7 @@ const testSetup = require('./test-setup'); const testConfig = require('./test-config'); const testClaude = require('./test-claude'); const testClaudeProxy = require('./test-claude-proxy'); +const testBuiltinProxyCodexConversion = require('./test-builtin-proxy-codex-conversion'); const testSessionSearch = require('./test-session-search'); const testSessions = require('./test-sessions'); const testSessionConvertDerived = require('./test-session-convert-derived'); @@ -151,6 +152,7 @@ async function main() { await testConfig(ctx); await testClaude(ctx); await testClaudeProxy(ctx); + await testBuiltinProxyCodexConversion(ctx); await testSessionSearch(ctx); await testSessions(ctx); await testSessionConvertDerived(ctx); diff --git a/tests/e2e/test-builtin-proxy-codex-conversion.js b/tests/e2e/test-builtin-proxy-codex-conversion.js new file mode 100644 index 00000000..65d7471b --- /dev/null +++ b/tests/e2e/test-builtin-proxy-codex-conversion.js @@ -0,0 +1,420 @@ +const http = require('http'); +const { assert, closeServer } = require('./helpers'); + +function requestRaw(port, pathname, options = {}) { + return new Promise((resolve, reject) => { + const body = options.body !== undefined ? JSON.stringify(options.body) : ''; + const req = http.request({ + hostname: '127.0.0.1', + port, + path: pathname, + method: options.method || (body ? 'POST' : 'GET'), + headers: { + ...(options.headers || {}), + ...(body ? { + 'Content-Type': 'application/json', + 'Content-Length': Buffer.byteLength(body) + } : {}) + } + }, (res) => { + let responseBody = ''; + res.setEncoding('utf-8'); + res.on('data', chunk => responseBody += chunk); + res.on('end', () => resolve({ + statusCode: res.statusCode || 0, + headers: res.headers, + body: responseBody + })); + }); + req.on('error', reject); + if (body) req.write(body); + req.end(); + }); +} + +function requestUntilText(port, pathname, options = {}, marker, timeoutMs = 500) { + return new Promise((resolve, reject) => { + const body = options.body !== undefined ? JSON.stringify(options.body) : ''; + const started = Date.now(); + let text = ''; + let firstDataElapsedMs = null; + let done = false; + let req; + const finish = (value) => { + if (done) return; + done = true; + clearTimeout(timer); + resolve(value); + }; + const timer = setTimeout(() => { + if (req) req.destroy(); + finish({ statusCode: 0, body: text, firstDataElapsedMs, timedOut: true }); + }, timeoutMs); + req = http.request({ + hostname: '127.0.0.1', + port, + path: pathname, + method: options.method || (body ? 'POST' : 'GET'), + headers: { + ...(options.headers || {}), + ...(body ? { + 'Content-Type': 'application/json', + 'Content-Length': Buffer.byteLength(body) + } : {}) + } + }, (res) => { + res.setEncoding('utf-8'); + res.on('data', chunk => { + if (firstDataElapsedMs === null) firstDataElapsedMs = Date.now() - started; + text += chunk; + if (!marker || text.includes(marker)) { + finish({ statusCode: res.statusCode || 0, headers: res.headers, body: text, firstDataElapsedMs }); + req.destroy(); + } + }); + res.on('error', () => {}); + }); + req.on('error', (err) => { + if (err && err.code === 'ECONNRESET') return; + reject(err); + }); + if (body) req.write(body); + req.end(); + }); +} + +function getAvailableTcpPort() { + return new Promise((resolve, reject) => { + const server = http.createServer(); + server.once('error', reject); + server.listen(0, '127.0.0.1', () => { + const address = server.address(); + const port = address && typeof address.port === 'number' ? address.port : 0; + server.close((err) => { + if (err) return reject(err); + resolve(port); + }); + }); + }); +} + +function sleep(ms) { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +async function waitUntil(predicate, timeoutMs = 1000, intervalMs = 25) { + const deadline = Date.now() + timeoutMs; + let lastValue; + while (Date.now() < deadline) { + lastValue = predicate(); + if (lastValue) return lastValue; + await sleep(intervalMs); + } + return predicate(); +} + +function startSub2ApiStyleUpstream() { + const requests = []; + const streamMetrics = { + opened: 0, + closed: 0, + active: 0, + maxActive: 0 + }; + function trackStream(res) { + streamMetrics.opened += 1; + streamMetrics.active += 1; + streamMetrics.maxActive = Math.max(streamMetrics.maxActive, streamMetrics.active); + let closed = false; + res.on('close', () => { + if (closed) return; + closed = true; + streamMetrics.closed += 1; + streamMetrics.active -= 1; + }); + } + return new Promise((resolve, reject) => { + const server = http.createServer((req, res) => { + const requestPath = String(req.url || '').split('?')[0]; + let body = ''; + req.setEncoding('utf-8'); + req.on('data', chunk => body += chunk); + req.on('end', () => { + let parsedBody = null; + if (body.trim()) { + try { parsedBody = JSON.parse(body); } catch (_) { parsedBody = null; } + } + requests.push({ + method: req.method, + path: requestPath, + headers: req.headers, + body: parsedBody + }); + + if (req.method === 'GET' && requestPath === '/nested/api/models') { + const payload = JSON.stringify({ data: [{ id: 'gpt-sub2api-e2e' }] }); + res.writeHead(200, { + 'Content-Type': 'application/json; charset=utf-8', + 'Content-Length': Buffer.byteLength(payload, 'utf-8') + }); + res.end(payload, 'utf-8'); + return; + } + + if (req.method === 'POST' && requestPath === '/nested/api/responses') { + const payload = JSON.stringify({ error: { message: 'responses endpoint must not be used' } }); + res.writeHead(500, { + 'Content-Type': 'application/json; charset=utf-8', + 'Content-Length': Buffer.byteLength(payload, 'utf-8') + }); + res.end(payload, 'utf-8'); + return; + } + + if (req.method === 'POST' && requestPath === '/nested/api/chat/completions') { + if (parsedBody && parsedBody.model === 'error-model') { + const payload = JSON.stringify({ error: { message: 'upstream rate limited' } }); + res.writeHead(429, { + 'Content-Type': 'application/json; charset=utf-8', + 'Content-Length': Buffer.byteLength(payload, 'utf-8') + }); + res.end(payload, 'utf-8'); + return; + } + + if (parsedBody && parsedBody.stream === true) { + const writeStream = () => { + try { + trackStream(res); + res.writeHead(200, { 'Content-Type': 'text/event-stream; charset=utf-8' }); + res.write('data: {"id":"chatcmpl_sub2api_stream","model":"gpt-sub2api-e2e","choices":[{"delta":{"role":"assistant"}}]}\n\n'); + res.write('data: {"id":"chatcmpl_sub2api_stream","model":"gpt-sub2api-e2e","choices":[{"delta":{"content":"stream-"}}]}\n\n'); + res.write('data: {"id":"chatcmpl_sub2api_stream","model":"gpt-sub2api-e2e","choices":[{"delta":{"content":"ok"}}]}\n\n'); + res.end('data: [DONE]\n\n'); + } catch (e) {} + }; + if (JSON.stringify(parsedBody).includes('idle before first chunk')) { + try { + trackStream(res); + res.writeHead(200, { 'Content-Type': 'text/event-stream; charset=utf-8' }); + if (typeof res.flushHeaders === 'function') res.flushHeaders(); + setTimeout(() => { + res.write('data: {"id":"chatcmpl_sub2api_stream_idle","model":"gpt-sub2api-e2e","choices":[{"delta":{"role":"assistant"}}]}\n\n'); + res.write('data: {"id":"chatcmpl_sub2api_stream_idle","model":"gpt-sub2api-e2e","choices":[{"delta":{"content":"idle-ok"}}]}\n\n'); + res.end('data: [DONE]\n\n'); + }, 800); + } catch (e) {} + } else if (JSON.stringify(parsedBody).includes('slow content')) { + try { + trackStream(res); + res.writeHead(200, { 'Content-Type': 'text/event-stream; charset=utf-8' }); + res.write('data: {"id":"chatcmpl_sub2api_stream_slow","model":"gpt-sub2api-e2e","choices":[{"delta":{"role":"assistant"}}]}\n\n'); + setTimeout(() => { + res.write('data: {"id":"chatcmpl_sub2api_stream_slow","model":"gpt-sub2api-e2e","choices":[{"delta":{"content":"slow-ok"}}]}\n\n'); + res.end('data: [DONE]\n\n'); + }, 800); + } catch (e) {} + } else { + writeStream(); + } + return; + } + + const hasTools = parsedBody && Array.isArray(parsedBody.tools) && parsedBody.tools.length > 0; + const payload = JSON.stringify({ + id: hasTools ? 'chatcmpl_sub2api_tool' : 'chatcmpl_sub2api_text', + model: parsedBody && parsedBody.model ? parsedBody.model : 'unknown-model', + choices: [{ + finish_reason: hasTools ? 'tool_calls' : 'stop', + message: hasTools ? { + role: 'assistant', + content: null, + tool_calls: [{ + id: 'call_lookup_e2e', + type: 'function', + function: { name: 'lookup', arguments: '{"q":"codexmate"}' } + }] + } : { + role: 'assistant', + content: 'direct-chat-ok' + } + }], + usage: { prompt_tokens: 5, completion_tokens: 7, total_tokens: 12 } + }); + res.writeHead(200, { + 'Content-Type': 'application/json; charset=utf-8', + 'Content-Length': Buffer.byteLength(payload, 'utf-8') + }); + res.end(payload, 'utf-8'); + return; + } + + const notFound = JSON.stringify({ error: { message: 'not found' } }); + res.writeHead(404, { + 'Content-Type': 'application/json; charset=utf-8', + 'Content-Length': Buffer.byteLength(notFound, 'utf-8') + }); + res.end(notFound, 'utf-8'); + }); + }); + server.on('error', reject); + server.listen(0, '127.0.0.1', () => { + const address = server.address(); + resolve({ server, port: address.port, requests, streamMetrics }); + }); + }); +} + +function assertNoResponsesProbe(requests, label) { + const responsesHits = requests.filter((req) => req.path === '/nested/api/responses'); + assert(responsesHits.length === 0, `${label}: builtin conversion must not call upstream /responses`); +} + +module.exports = async function testBuiltinProxyCodexConversion(ctx) { + const { api } = ctx; + const upstream = await startSub2ApiStyleUpstream(); + const proxyPort = await getAvailableTcpPort(); + try { + const addProvider = await api('add-provider', { + name: 'codex-sub2api-builtin-e2e', + url: `http://127.0.0.1:${upstream.port}/nested/api`, + key: 'sk-sub2api-e2e', + model: 'gpt-sub2api-e2e' + }); + assert(addProvider.success === true, 'add-provider(codex-sub2api-builtin-e2e) failed'); + + const startResult = await api('proxy-start', { + host: '127.0.0.1', + port: proxyPort, + provider: 'codex-sub2api-builtin-e2e', + authSource: 'provider', + timeoutMs: 3000 + }); + assert(startResult.success === true, 'proxy-start failed'); + assert(startResult.listenUrl === `http://127.0.0.1:${proxyPort}`, 'proxy-start listenUrl mismatch'); + + const textResponse = await requestRaw(proxyPort, '/v1/responses', { + headers: { 'Authorization': 'Bearer client-key', 'Originator': 'codex-tui' }, + body: { model: 'gpt-sub2api-e2e', input: 'hello', stream: false } + }); + assert(textResponse.statusCode === 200, 'non-stream /v1/responses should succeed'); + const textPayload = JSON.parse(textResponse.body); + assert(textPayload.object === 'response', 'non-stream response should be Responses-shaped'); + assert(textPayload.output[0].content[0].text === 'direct-chat-ok', 'non-stream text mismatch'); + assert(textPayload.usage.input_tokens === 5, 'non-stream usage input mismatch'); + assertNoResponsesProbe(upstream.requests, 'non-stream text'); + const textChatReq = upstream.requests.find((req) => req.path === '/nested/api/chat/completions' && req.body && req.body.stream === false); + assert(textChatReq, 'non-stream should call chat/completions'); + assert(textChatReq.headers.authorization === 'Bearer sk-sub2api-e2e', 'provider auth should be used upstream'); + assert(/^codex_cli_rs\//.test(textChatReq.headers['user-agent'] || ''), 'Codex user-agent should be sent upstream'); + assert(textChatReq.body.messages[0].role === 'user', 'Responses input should become chat user message'); + + const toolResponse = await requestRaw(proxyPort, '/v1/responses', { + body: { + model: 'gpt-sub2api-e2e', + input: 'use lookup', + tools: [{ type: 'function', name: 'lookup', parameters: { type: 'object', properties: { q: { type: 'string' } } } }], + stream: false + } + }); + assert(toolResponse.statusCode === 200, 'tool /v1/responses should succeed'); + const toolPayload = JSON.parse(toolResponse.body); + assert(toolPayload.output.some((item) => item.type === 'function_call' && item.name === 'lookup'), 'chat tool call should convert back to Responses function_call'); + assertNoResponsesProbe(upstream.requests, 'tool call'); + + const streamResponse = await requestRaw(proxyPort, '/v1/responses', { + headers: { 'Accept': 'text/event-stream' }, + body: { model: 'gpt-sub2api-e2e', input: 'stream please', stream: true } + }); + assert(streamResponse.statusCode === 200, 'stream /v1/responses should succeed'); + assert(/text\/event-stream/i.test(streamResponse.headers['content-type'] || ''), 'stream should return SSE'); + assert(/event: response\.created/.test(streamResponse.body), 'stream should include response.created'); + assert(/"status":"in_progress"/.test(streamResponse.body), 'stream response.created should expose in_progress status'); + assert(/"delta":"stream-"/.test(streamResponse.body), 'stream should include first text delta'); + assert(/"delta":"ok"/.test(streamResponse.body), 'stream should include second text delta'); + assert(/data: \[DONE\]/.test(streamResponse.body), 'stream should include done sentinel'); + assertNoResponsesProbe(upstream.requests, 'stream'); + const streamChatReq = upstream.requests.find((req) => req.path === '/nested/api/chat/completions' && req.body && req.body.stream === true); + assert(streamChatReq, 'stream should call chat/completions with stream=true'); + + const earlyStreamResponse = await requestUntilText(proxyPort, '/v1/responses', { + headers: { 'Accept': 'text/event-stream' }, + body: { model: 'gpt-sub2api-e2e', input: 'slow content', stream: true } + }, 'event: response.created', 500); + assert(earlyStreamResponse.statusCode === 200, 'slow stream should open downstream SSE on first upstream chat chunk'); + assert(!earlyStreamResponse.timedOut, `slow stream should send response.created before delayed content; got ${earlyStreamResponse.body}`); + assert(earlyStreamResponse.firstDataElapsedMs < 500, `slow stream first SSE took ${earlyStreamResponse.firstDataElapsedMs}ms`); + assert(/event: response\.created/.test(earlyStreamResponse.body), 'slow stream should include early response.created'); + assert(!/event: response\.in_progress/.test(earlyStreamResponse.body), 'chat fallback stream should match sub2api and not synthesize response.in_progress'); + assertNoResponsesProbe(upstream.requests, 'slow stream'); + + const idleTransportResponse = await requestUntilText(proxyPort, '/v1/responses', { + headers: { 'Accept': 'text/event-stream' }, + body: { model: 'gpt-sub2api-e2e', input: 'idle before first chunk', stream: true } + }, ': keepalive', 500); + assert(idleTransportResponse.statusCode === 200, 'idle stream should open downstream SSE transport after upstream accepts stream'); + assert(!idleTransportResponse.timedOut, `idle stream should send SSE keepalive before first upstream chunk; got ${idleTransportResponse.body}`); + assert(idleTransportResponse.firstDataElapsedMs < 500, `idle stream first SSE took ${idleTransportResponse.firstDataElapsedMs}ms`); + assert(/: keepalive/.test(idleTransportResponse.body), 'idle stream should include SSE keepalive comment before first chat chunk'); + assert(!/event: response\.created/.test(idleTransportResponse.body), 'idle transport should not fake response.created before the first upstream chat chunk'); + assert(!/event: response\.in_progress/.test(idleTransportResponse.body), 'idle transport should not synthesize response.in_progress'); + assertNoResponsesProbe(upstream.requests, 'idle stream transport'); + + const idleFullResponse = await requestRaw(proxyPort, '/v1/responses', { + headers: { 'Accept': 'text/event-stream' }, + body: { model: 'gpt-sub2api-e2e', input: 'idle before first chunk full', stream: true } + }); + assert(idleFullResponse.statusCode === 200, 'idle stream full request should complete'); + assert(/event: response\.created/.test(idleFullResponse.body), 'idle stream should emit response.created once the first chat chunk arrives'); + assert(/"delta":"idle-ok"/.test(idleFullResponse.body), 'idle stream should pass delayed content'); + assert(/data: \[DONE\]/.test(idleFullResponse.body), 'idle stream should include done sentinel after delayed content'); + + const stabilityStartCpu = process.cpuUsage(); + const stabilityStart = Date.now(); + const abortedIdlePromises = Array.from({ length: 6 }, (_, index) => requestUntilText(proxyPort, '/v1/responses', { + headers: { 'Accept': 'text/event-stream' }, + body: { model: 'gpt-sub2api-e2e', input: `idle before first chunk abort ${index}`, stream: true } + }, ': keepalive', 600)); + const completedIdlePromises = Array.from({ length: 4 }, (_, index) => requestRaw(proxyPort, '/v1/responses', { + headers: { 'Accept': 'text/event-stream' }, + body: { model: 'gpt-sub2api-e2e', input: `idle before first chunk complete ${index}`, stream: true } + })); + const [abortedIdleRequests, completedIdleRequests] = await Promise.all([ + Promise.all(abortedIdlePromises), + Promise.all(completedIdlePromises) + ]); + const stabilityCpu = process.cpuUsage(stabilityStartCpu); + const stabilityElapsedMs = Date.now() - stabilityStart; + const activeDrained = await waitUntil(() => upstream.streamMetrics.active === 0, 1500); + + for (const response of abortedIdleRequests) { + assert(response.statusCode === 200, 'aborted idle stream should still receive SSE transport'); + assert(!response.timedOut, `aborted idle stream should receive keepalive promptly; got ${response.body}`); + assert(response.firstDataElapsedMs < 600, `aborted idle stream first SSE took ${response.firstDataElapsedMs}ms`); + assert(/: keepalive/.test(response.body), 'aborted idle stream should receive keepalive'); + assert(!/event: response\.in_progress/.test(response.body), 'aborted idle stream should not synthesize response.in_progress'); + } + for (const response of completedIdleRequests) { + assert(response.statusCode === 200, 'completed idle stream should succeed'); + assert(/: keepalive/.test(response.body), 'completed idle stream should start with keepalive while waiting for first chunk'); + assert(/event: response\.created/.test(response.body), 'completed idle stream should emit response.created after first chat chunk'); + assert(/"delta":"idle-ok"/.test(response.body), 'completed idle stream should pass delayed content'); + assert(/data: \[DONE\]/.test(response.body), 'completed idle stream should include done sentinel'); + } + assert(activeDrained, `idle stream stability run leaked upstream streams: ${JSON.stringify(upstream.streamMetrics)}`); + assert(upstream.streamMetrics.opened === upstream.streamMetrics.closed, `all upstream SSE streams should close after completion/abort: ${JSON.stringify(upstream.streamMetrics)}`); + assert(upstream.streamMetrics.maxActive <= 10, `stability run should not exceed requested concurrency: ${JSON.stringify(upstream.streamMetrics)}`); + console.log(`[builtin-proxy-codex-conversion] stability idle streams: elapsed_ms=${stabilityElapsedMs} cpu_user_us=${stabilityCpu.user} cpu_system_us=${stabilityCpu.system} metrics=${JSON.stringify(upstream.streamMetrics)}`); + + const errorResponse = await requestRaw(proxyPort, '/v1/responses', { + body: { model: 'error-model', input: 'fail', stream: false } + }); + assert(errorResponse.statusCode === 429, 'upstream chat errors should pass through status'); + assert(/upstream rate limited/.test(errorResponse.body), 'upstream chat error body should pass through'); + assertNoResponsesProbe(upstream.requests, 'error'); + } finally { + try { await api('proxy-stop'); } catch (_) {} + try { await api('delete-provider', { name: 'codex-sub2api-builtin-e2e' }); } catch (_) {} + await closeServer(upstream && upstream.server); + } +}; diff --git a/tests/unit/builtin-proxy-responses-shim.test.mjs b/tests/unit/builtin-proxy-responses-shim.test.mjs index e7d15bbc..17770179 100644 --- a/tests/unit/builtin-proxy-responses-shim.test.mjs +++ b/tests/unit/builtin-proxy-responses-shim.test.mjs @@ -128,13 +128,24 @@ function createNestedNamespace(depth, leafTool) { return current; } -test('builtin-proxy /v1/responses sends Codex client identity upstream', async () => { +test('builtin-proxy /v1/responses sends Codex client identity to direct chat conversion upstream', async () => { let capturedHeaders = null; + let responsesHit = false; const upstream = http.createServer((req, res) => { if (req.url === '/v1/responses' && req.method === 'POST') { + responsesHit = true; + res.writeHead(500, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'responses should not be probed' })); + return; + } + if (req.url === '/v1/chat/completions' && req.method === 'POST') { capturedHeaders = req.headers; res.writeHead(200, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ id: 'resp_test', model: 'gpt-5', output: [] })); + res.end(JSON.stringify({ + id: 'chatcmpl_identity', + model: 'gpt-5', + choices: [{ message: { role: 'assistant', content: 'ok' } }] + })); return; } res.writeHead(404, { 'Content-Type': 'application/json' }); @@ -154,7 +165,8 @@ test('builtin-proxy /v1/responses sends Codex client identity upstream', async ( }); assert.equal(resp.status, 200); - assert.ok(capturedHeaders, 'upstream should receive /v1/responses request'); + assert.equal(responsesHit, false, 'direct sub2api-style conversion must not probe /v1/responses'); + assert.ok(capturedHeaders, 'upstream should receive /v1/chat/completions request'); assert.match(capturedHeaders['user-agent'] || '', /^codex_cli_rs\//); assert.equal(capturedHeaders.version, '0.98.0'); assert.equal(capturedHeaders['openai-beta'], 'responses=experimental'); @@ -481,21 +493,18 @@ test('builtin-proxy /v1/responses stream=true streams chat fallback as Responses } }); -test('builtin-proxy /v1/responses stream=true does not fallback to chat when upstream Responses hangs', async () => { +test('builtin-proxy /v1/responses stream=true bypasses hanging upstream Responses and uses chat conversion directly', async () => { let responsesHit = false; let chatHit = false; - let capturedResponsesHeaders = null; + let capturedChatHeaders = null; const upstream = http.createServer((req, res) => { if (req.url === '/v1/responses' && req.method === 'POST') { responsesHit = true; - capturedResponsesHeaders = req.headers; - // A hanging Responses endpoint is not proof that Responses is unsupported. - // Falling back to chat/completions can route through non-Codex client paths - // and break Codex-only upstream groups. return; } if (req.url === '/v1/chat/completions' && req.method === 'POST') { chatHit = true; + capturedChatHeaders = req.headers; res.writeHead(200, { 'Content-Type': 'text/event-stream; charset=utf-8' }); res.end('data: [DONE]\n\n'); return; @@ -530,16 +539,16 @@ test('builtin-proxy /v1/responses stream=true does not fallback to chat when ups }); const elapsedMs = Date.now() - started; - assert.equal(sse.status, 502); - assert.equal(responsesHit, true, 'streaming Codex tasks should probe upstream /responses first'); - assert.match(capturedResponsesHeaders['user-agent'] || '', /^codex_cli_rs\//); - assert.equal(capturedResponsesHeaders.version, '0.98.0'); - assert.equal(capturedResponsesHeaders['openai-beta'], 'responses=experimental'); - assert.equal(capturedResponsesHeaders.originator, 'codex_cli_rs'); - assert.equal(chatHit, false, 'hanging Responses must not fallback to chat/completions'); - assert.ok(elapsedMs >= 900, `proxy should wait for the upstream Responses timeout; took ${elapsedMs}ms`); - assert.match(sse.headers['content-type'], /application\/json/i); - assert.match(sse.text, /timeout/); + assert.equal(sse.status, 200); + assert.equal(responsesHit, false, 'streaming conversion must not touch upstream /responses'); + assert.equal(chatHit, true, 'streaming conversion should use /v1/chat/completions directly'); + assert.match(capturedChatHeaders['user-agent'] || '', /^codex_cli_rs\//); + assert.equal(capturedChatHeaders.version, '0.98.0'); + assert.equal(capturedChatHeaders['openai-beta'], 'responses=experimental'); + assert.equal(capturedChatHeaders.originator, 'codex_cli_rs'); + assert.ok(elapsedMs < 900, `proxy should not wait for a hanging Responses endpoint; took ${elapsedMs}ms`); + assert.match(sse.headers['content-type'], /text\/event-stream/i); + assert.match(sse.text, /data: \[DONE\]/); } finally { if (proxyRuntime) { await closeServer(proxyRuntime.server, proxyRuntime.connections); @@ -1169,6 +1178,92 @@ test('builtin-proxy /v1/responses stream=true retries JSON fallback when body ab } }); +test('builtin-proxy /v1/responses stream=true follows sub2api and opens on first upstream chat chunk', async () => { + let responsesHit = false; + const upstream = http.createServer((req, res) => { + if (req.url === '/v1/responses' && req.method === 'POST') { + responsesHit = true; + res.writeHead(500, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'responses should not be probed' })); + return; + } + if (req.url === '/v1/chat/completions' && req.method === 'POST') { + req.on('data', () => {}); + req.on('end', () => { + res.writeHead(200, { 'Content-Type': 'text/event-stream; charset=utf-8' }); + res.write('data: {"id":"chatcmpl_slow_content","model":"gpt-test","choices":[{"delta":{"role":"assistant"}}]}\n\n'); + setTimeout(() => { + res.write('data: {"id":"chatcmpl_slow_content","model":"gpt-test","choices":[{"delta":{"content":"late"}}]}\n\n'); + res.end('data: [DONE]\n\n'); + }, 1200); + }); + return; + } + res.writeHead(404, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'not found' })); + }); + const { port: upstreamPort } = await listen(upstream); + let proxyRuntime = null; + + try { + proxyRuntime = await startTestProxy(upstreamPort, { timeoutMs: 3000 }); + const proxyPort = proxyRuntime.server.address().port; + const earlySse = await new Promise((resolve, reject) => { + const started = Date.now(); + let text = ''; + let firstDataElapsedMs = null; + let done = false; + const finish = (value) => { + if (done) return; + done = true; + clearTimeout(timer); + resolve(value); + }; + let req; + const timer = setTimeout(() => { + if (req) req.destroy(); + finish({ elapsedMs: firstDataElapsedMs, text, status: 0, timedOut: true }); + }, 1000); + req = http.request({ + hostname: '127.0.0.1', + port: proxyPort, + path: '/v1/responses', + method: 'POST', + headers: { 'Content-Type': 'application/json' } + }, (res) => { + res.on('data', (chunk) => { + if (firstDataElapsedMs === null) firstDataElapsedMs = Date.now() - started; + text += chunk.toString('utf-8'); + if (/"id":"chatcmpl_slow_content"/.test(text)) { + finish({ elapsedMs: firstDataElapsedMs, text, status: res.statusCode }); + req.destroy(); + } + }); + res.on('error', () => {}); + }); + req.on('error', (err) => { + if (err && err.code === 'ECONNRESET') return; + reject(err); + }); + req.write(JSON.stringify({ model: 'gpt-test', input: 'ping', stream: true })); + req.end(); + }); + + assert.equal(earlySse.status, 200); + assert.equal(responsesHit, false, 'direct stream conversion must not probe upstream /responses'); + assert.equal(earlySse.timedOut, undefined, `proxy should send response.created on the first upstream chat chunk; got ${earlySse.text}`); + assert.ok(earlySse.elapsedMs < 1200, `proxy should send SSE before delayed content; took ${earlySse.elapsedMs}ms`); + assert.match(earlySse.text, /event: response\.created/); + assert.doesNotMatch(earlySse.text, /event: response\.in_progress/); + assert.match(earlySse.text, /"id":"chatcmpl_slow_content"/); + } finally { + if (proxyRuntime) { + await closeServer(proxyRuntime.server, proxyRuntime.connections); + } + await closeServer(upstream); + } +}); + test('builtin-proxy /v1/responses stream=true keeps downstream SSE open before first upstream event', async () => { const upstream = http.createServer((req, res) => { if (req.url === '/v1/responses' && req.method === 'POST') { @@ -1216,6 +1311,93 @@ test('builtin-proxy /v1/responses stream=true keeps downstream SSE open before f } }); +test('builtin-proxy /v1/responses stream=true opens SSE transport while upstream is idle before first chat chunk', async () => { + let responsesHit = false; + const upstream = http.createServer((req, res) => { + if (req.url === '/v1/responses' && req.method === 'POST') { + responsesHit = true; + res.writeHead(404, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'responses endpoint unavailable' })); + return; + } + if (req.url === '/v1/chat/completions' && req.method === 'POST') { + req.on('data', () => {}); + req.on('end', () => { + res.writeHead(200, { 'Content-Type': 'text/event-stream; charset=utf-8' }); + if (typeof res.flushHeaders === 'function') res.flushHeaders(); + setTimeout(() => { + res.write('data: {"id":"chatcmpl_idle_headers","model":"gpt-test","choices":[{"delta":{"role":"assistant"}}]}\n\n'); + res.write('data: {"id":"chatcmpl_idle_headers","model":"gpt-test","choices":[{"delta":{"content":"late"}}]}\n\n'); + res.end('data: [DONE]\n\n'); + }, 1200); + }); + return; + } + res.writeHead(404, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'not found' })); + }); + const { port: upstreamPort } = await listen(upstream); + let proxyRuntime = null; + + try { + proxyRuntime = await startTestProxy(upstreamPort, { timeoutMs: 3000 }); + const proxyPort = proxyRuntime.server.address().port; + const early = await new Promise((resolve, reject) => { + const started = Date.now(); + let text = ''; + let firstDataElapsedMs = null; + let done = false; + const finish = (value) => { + if (done) return; + done = true; + clearTimeout(timer); + resolve(value); + }; + let req; + const timer = setTimeout(() => { + if (req) req.destroy(); + finish({ elapsedMs: firstDataElapsedMs, text, status: 0, timedOut: true }); + }, 500); + req = http.request({ + hostname: '127.0.0.1', + port: proxyPort, + path: '/v1/responses', + method: 'POST', + headers: { 'Content-Type': 'application/json', 'Accept': 'text/event-stream' } + }, (res) => { + res.on('data', (chunk) => { + if (firstDataElapsedMs === null) firstDataElapsedMs = Date.now() - started; + text += chunk.toString('utf-8'); + if (text.includes(': keepalive')) { + finish({ elapsedMs: firstDataElapsedMs, text, status: res.statusCode }); + req.destroy(); + } + }); + res.on('error', () => {}); + }); + req.on('error', (err) => { + if (err && err.code === 'ECONNRESET') return; + reject(err); + }); + req.write(JSON.stringify({ model: 'gpt-test', input: 'idle before first chunk', stream: true })); + req.end(); + }); + + assert.equal(early.status, 200); + assert.equal(responsesHit, false, 'proxy should not probe upstream /v1/responses before chat streaming'); + assert.equal(early.timedOut, undefined, `proxy should open SSE transport before the first upstream chunk; got ${early.text}`); + assert.ok(early.elapsedMs < 500, `proxy should send SSE keepalive before idle upstream content; took ${early.elapsedMs}ms`); + assert.match(early.text, /: keepalive/); + assert.doesNotMatch(early.text, /event: response\.created/); + assert.doesNotMatch(early.text, /event: response\.in_progress/); + } finally { + if (proxyRuntime) { + await closeServer(proxyRuntime.server, proxyRuntime.connections); + } + await closeServer(upstream); + } +}); + test('builtin-proxy /v1/responses stream=true aborts upstream when client disconnects', async () => { let upstreamClosed = false; const upstream = http.createServer((req, res) => { @@ -1378,16 +1560,79 @@ test('builtin-proxy /v1/responses retries upstream after a transient connection } }); -test('builtin-proxy /v1/responses adds encrypted reasoning include for upstream Responses', async () => { +test('builtin-proxy /v1/responses retries upstream after a transient chat timeout', async () => { + const sockets = new Set(); + let chatAttempts = 0; + const upstream = http.createServer((req, res) => { + if (req.url === '/v1/responses' && req.method === 'POST') { + res.writeHead(500, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'responses should not be probed' })); + return; + } + if (req.url === '/v1/chat/completions' && req.method === 'POST') { + chatAttempts += 1; + if (chatAttempts === 1) { + return; + } + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ + id: 'chatcmpl_after_timeout', + model: 'gpt-test', + choices: [{ message: { role: 'assistant', content: 'timeout recovered' } }] + })); + return; + } + res.writeHead(404, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'not found' })); + }); + upstream.on('connection', (socket) => { + sockets.add(socket); + socket.on('close', () => sockets.delete(socket)); + }); + const { port: upstreamPort } = await listen(upstream); + let proxyRuntime = null; + + try { + proxyRuntime = await startTestProxy(upstreamPort, { timeoutMs: 1000 }); + const proxyPort = proxyRuntime.server.address().port; + const resp = await requestText(`http://127.0.0.1:${proxyPort}/v1/responses`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: { model: 'gpt-test', input: 'ping', stream: false } + }); + assert.equal(resp.status, 200); + assert.ok(chatAttempts >= 2, 'plain timeout should be retried as a transient upstream failure'); + const parsed = JSON.parse(resp.text); + assert.equal(parsed.output[0].content[0].text, 'timeout recovered'); + } finally { + if (proxyRuntime) { + await closeServer(proxyRuntime.server, proxyRuntime.connections); + } + await closeServer(upstream, sockets); + } +}); + +test('builtin-proxy /v1/responses maps reasoning effort through direct chat conversion without Responses include injection', async () => { let capturedRequest = null; + let responsesHit = false; const upstream = http.createServer((req, res) => { if (req.url === '/v1/responses' && req.method === 'POST') { + responsesHit = true; + res.writeHead(500, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'responses should not be probed' })); + return; + } + if (req.url === '/v1/chat/completions' && req.method === 'POST') { const chunks = []; req.on('data', (chunk) => chunks.push(chunk)); req.on('end', () => { capturedRequest = JSON.parse(Buffer.concat(chunks).toString('utf-8')); res.writeHead(200, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ id: 'resp_reasoning', model: 'gpt-test', output: [] })); + res.end(JSON.stringify({ + id: 'chatcmpl_reasoning', + model: 'gpt-test', + choices: [{ message: { role: 'assistant', content: 'ok' } }] + })); }); return; } @@ -1413,10 +1658,11 @@ test('builtin-proxy /v1/responses adds encrypted reasoning include for upstream }); assert.equal(resp.status, 200); - assert.ok(capturedRequest, 'upstream Responses request should be captured'); + assert.equal(responsesHit, false, 'direct chat conversion must not inject Responses-only include fields upstream'); + assert.ok(capturedRequest, 'upstream chat request should be captured'); assert.equal(capturedRequest.stream, false); - assert.deepStrictEqual(capturedRequest.reasoning, { effort: 'high' }); - assert.deepStrictEqual(capturedRequest.include, ['file_search_call.results', 'reasoning.encrypted_content']); + assert.equal(capturedRequest.reasoning_effort, 'high'); + assert.equal(Object.prototype.hasOwnProperty.call(capturedRequest, 'include'), false); } finally { if (proxyRuntime) { await closeServer(proxyRuntime.server, proxyRuntime.connections); @@ -1530,13 +1776,24 @@ test('builtin-proxy /v1/responses stream=true emits reasoning and tool events fr assert.equal(sse.status, 200); assert.match(sse.headers['content-type'], /text\/event-stream/i); + assert.match(sse.text, /event: response\.created/); + assert.match(sse.text, /event: response\.output_item\.added/); + assert.match(sse.text, /event: response\.reasoning_summary_part\.added/); assert.match(sse.text, /event: response\.reasoning_summary_text\.delta/); assert.match(sse.text, /"delta":"private "/); assert.match(sse.text, /"delta":"thought"/); assert.match(sse.text, /event: response\.reasoning_summary_text\.done/); + assert.match(sse.text, /event: response\.reasoning_summary_part\.done/); assert.match(sse.text, /"text":"private thought"/); assert.match(sse.text, /event: response\.function_call_arguments\.delta/); - assert.match(sse.text, /"delta":"\{\\"q\\":\\"codexmate\\"\}"/); + const argumentDeltas = sse.text.split('\n\n') + .filter((block) => block.includes('event: response.function_call_arguments.delta')) + .map((block) => { + const dataLine = block.split('\n').find((line) => line.startsWith('data: ')); + return dataLine ? JSON.parse(dataLine.slice('data: '.length)).delta || '' : ''; + }) + .join(''); + assert.equal(argumentDeltas, '{"q":"codexmate"}'); assert.match(sse.text, /event: response\.function_call_arguments\.done/); assert.match(sse.text, /"arguments":"\{\\"q\\":\\"codexmate\\"\}"/); assert.match(sse.text, /"type":"reasoning"/); @@ -1550,3 +1807,66 @@ test('builtin-proxy /v1/responses stream=true emits reasoning and tool events fr await closeServer(upstream); } }); + +test('builtin-proxy /v1/responses stream=true converts DSML tool-call text from chat fallback', async () => { + const upstream = http.createServer((req, res) => { + if (req.url === '/v1/responses' && req.method === 'POST') { + res.writeHead(404, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'responses endpoint unavailable' })); + return; + } + if (req.url === '/v1/chat/completions' && req.method === 'POST') { + res.writeHead(200, { 'Content-Type': 'text/event-stream; charset=utf-8' }); + const writeContent = (content) => { + res.write(`data: ${JSON.stringify({ + id: 'chatcmpl_dsml_tool', + model: 'gpt-test', + choices: [{ delta: { content } }] + })}\n\n`); + }; + writeContent('好的,我先读取侧边栏相关文件。\n\n<|DSM'); + writeContent('L|tool_calls>\n<|DSML|invoke name="read">\n<|DSML|parameter name="filePath"\n string="true">/data/data/com.termux/files/home/\n codexmate/AGENTS.md\n<|DSML|parameter name="offset"\n string="false">0\n<|DSML|parameter name="limit"\n string="false">50\n\n<|DSML|invoke name="read">\n<|DSML|parameter name="filePath"\n string="true">/data/data/com.termux/files/home/\n codexmate/USER.md\n<|DSML|parameter name="offset"\n string="false">50\n<|DSML|parameter name="limit"\n string="false">50\n\n'); + res.end('data: [DONE]\n\n'); + return; + } + res.writeHead(404, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'not found' })); + }); + const { port: upstreamPort } = await listen(upstream); + let proxyRuntime = null; + + try { + proxyRuntime = await startTestProxy(upstreamPort); + const proxyPort = proxyRuntime.server.address().port; + const sse = await requestText(`http://127.0.0.1:${proxyPort}/v1/responses`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: { + model: 'gpt-test', + input: 'read sidebar docs', + tools: [{ type: 'function', name: 'read', parameters: { type: 'object' } }], + stream: true + } + }); + + assert.equal(sse.status, 200); + assert.match(sse.text, /event: response\.output_text\.delta/); + assert.match(sse.text, /好的,我先读取侧边栏相关文件。/); + assert.doesNotMatch(sse.text, /DSML/); + assert.doesNotMatch(sse.text, /invoke name/); + assert.match(sse.text, /event: response\.function_call_arguments\.delta/); + assert.match(sse.text, /event: response\.function_call_arguments\.done/); + assert.match(sse.text, /"type":"function_call"/); + assert.match(sse.text, /"name":"read"/); + assert.match(sse.text, /\\"filePath\\":\\"\/data\/data\/com\.termux\/files\/home\/codexmate\/AGENTS\.md\\"/); + assert.match(sse.text, /\\"offset\\":0/); + assert.match(sse.text, /\\"limit\\":50/); + assert.match(sse.text, /event: response\.completed/); + assert.match(sse.text, /data: \[DONE\]/); + } finally { + if (proxyRuntime) { + await closeServer(proxyRuntime.server, proxyRuntime.connections); + } + await closeServer(upstream); + } +}); diff --git a/tests/unit/openai-bridge-upstream-responses.test.mjs b/tests/unit/openai-bridge-upstream-responses.test.mjs index 1cb93f59..8738ff22 100644 --- a/tests/unit/openai-bridge-upstream-responses.test.mjs +++ b/tests/unit/openai-bridge-upstream-responses.test.mjs @@ -22,6 +22,12 @@ function listen(server) { }); } +function closeServer(server) { + return new Promise((resolve, reject) => { + server.close((err) => (err ? reject(err) : resolve())); + }); +} + async function requestText(url, { method = 'GET', headers = {}, body } = {}) { return new Promise((resolve, reject) => { const u = new URL(url); @@ -87,27 +93,34 @@ test('openai-bridge GET /v1 returns local bridge status without probing upstream }); assert.equal(upstreamHit, false); - await bridge.close(); - await upstream.close(); + await closeServer(bridge); + await closeServer(upstream); await rm(tmpDir, { recursive: true, force: true }); }); -test('openai-bridge keeps streaming Codex requests on upstream Responses before chat fallback', async () => { +test('openai-bridge routes streaming Codex requests through chat completions conversion', async () => { let responsesHit = false; let chatHit = false; - let capturedResponsesHeaders = null; + let capturedChatHeaders = null; + let capturedChatBody = null; const upstream = http.createServer((req, res) => { if (req.url === '/v1/responses' && req.method === 'POST') { responsesHit = true; - capturedResponsesHeaders = req.headers; - // A hanging Responses endpoint is not proof that Responses is unsupported. - // Do not silently route Codex-only requests into chat/completions. + res.writeHead(500, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'responses should not be used for streaming Codex requests' })); return; } if (req.url === '/v1/chat/completions' && req.method === 'POST') { chatHit = true; - res.writeHead(200, { 'Content-Type': 'text/event-stream; charset=utf-8' }); - res.end('data: [DONE]\n\n'); + capturedChatHeaders = req.headers; + let body = ''; + req.on('data', (c) => (body += c)); + req.on('end', () => { + capturedChatBody = JSON.parse(body); + res.writeHead(200, { 'Content-Type': 'text/event-stream; charset=utf-8' }); + res.write('data: {"id":"chatcmpl_1","model":"gpt-test","choices":[{"delta":{"content":"OK"},"finish_reason":"stop"}]}\n\n'); + res.end('data: [DONE]\n\n'); + }); return; } if (req.url === '/v1/models' && req.method === 'GET') { @@ -153,39 +166,234 @@ test('openai-bridge keeps streaming Codex requests on upstream Responses before stream: true } }); - assert.equal(sse.status, 502); - assert.equal(responsesHit, true, 'streaming bridge should call upstream /responses first'); - assert.equal(chatHit, false, 'hanging Responses should not fall back to chat/completions'); - assert.match(capturedResponsesHeaders['user-agent'] || '', /^codex_cli_rs\//); - assert.equal(capturedResponsesHeaders.version, '0.98.0'); - assert.equal(capturedResponsesHeaders['openai-beta'], 'responses=experimental'); - assert.equal(capturedResponsesHeaders.originator, 'codex_cli_rs'); - assert.match(sse.headers['content-type'], /application\/json/i); - assert.match(sse.text, /timeout/); + assert.equal(sse.status, 200); + assert.equal(responsesHit, false, 'streaming Codex requests should not passthrough upstream /responses'); + assert.equal(chatHit, true, 'streaming Codex requests should use chat/completions conversion'); + assert.equal(capturedChatBody.stream, true); + assert.match(capturedChatHeaders['user-agent'] || '', /^codex_cli_rs\//); + assert.equal(capturedChatHeaders.originator, 'codex_cli_rs'); + assert.match(sse.headers['content-type'], /text\/event-stream/i); + assert.match(sse.text, /event: response\.created/); + assert.match(sse.text, /event: response\.output_text\.delta/); + assert.match(sse.text, /OK/); + assert.match(sse.text, /event: response\.completed/); + assert.doesNotMatch(sse.text, /event: response\.failed/); + assert.match(sse.text, /data: \[DONE\]/); - await bridge.close(); - await upstream.close(); + await closeServer(bridge); + await closeServer(upstream); await rm(tmpDir, { recursive: true, force: true }); }); -test('openai-bridge streams upstream Responses SSE with Codex identity headers', async () => { - let capturedResponsesHeaders = null; - let chatHit = false; +test('openai-bridge retries upstream chat 403 before completed error SSE for streaming Codex clients', async () => { + let responsesHit = false; + let chatHit = 0; + const upstream = http.createServer((req, res) => { + if (req.url === '/v1/responses' && req.method === 'POST') { + responsesHit = true; + res.writeHead(500, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'responses should not be used for streaming Codex requests' })); + return; + } + if (req.url === '/v1/chat/completions' && req.method === 'POST') { + chatHit += 1; + res.writeHead(403, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: { message: 'Insufficient balance' } })); + return; + } + res.writeHead(404, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'not found' })); + }); + const { port: upstreamPort } = await listen(upstream); + + const tmpDir = await mkdtemp(path.join(os.tmpdir(), 'codexmate-bridge-test-')); + const settingsFile = path.join(tmpDir, 'bridge.json'); + await writeFile(settingsFile, JSON.stringify({ + version: 1, + providers: { + test: { baseUrl: `http://127.0.0.1:${upstreamPort}/v1`, apiKey: 'sk-upstream' } + } + }), 'utf-8'); + + const handler = createOpenaiBridgeHttpHandler({ settingsFile, expectedToken: 'codexmate' }); + const bridge = http.createServer((req, res) => { + if (!handler(req, res)) { + res.statusCode = 404; + res.end('not handled'); + } + }); + const { port: bridgePort } = await listen(bridge); + + const resp = await requestText(`http://127.0.0.1:${bridgePort}/bridge/openai/test/v1/responses`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Accept': 'text/event-stream', + 'Authorization': 'Bearer codexmate' + }, + body: { model: 'deepseek-v4-flash', input: 'hello', stream: true } + }); + assert.equal(resp.status, 200); + assert.match(resp.headers['content-type'] || '', /text\/event-stream/); + assert.equal(responsesHit, false); + assert.equal(chatHit, 3); + assert.match(resp.text, /event: response\.completed/); + assert.doesNotMatch(resp.text, /event: response\.failed/); + assert.match(resp.text, /Upstream provider failed after retries:/); + assert.match(resp.text, /Insufficient balance/); + assert.match(resp.text, /data: \[DONE\]/); + + await closeServer(bridge); + await closeServer(upstream); + await rm(tmpDir, { recursive: true, force: true }); +}); + +test('openai-bridge recovers when upstream chat succeeds after a transient 403', async () => { + let responsesHit = false; + let chatHit = 0; const upstream = http.createServer((req, res) => { if (req.url === '/v1/responses' && req.method === 'POST') { - capturedResponsesHeaders = req.headers; + responsesHit = true; + res.writeHead(500, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'responses should not be used for streaming Codex requests' })); + return; + } + if (req.url === '/v1/chat/completions' && req.method === 'POST') { + chatHit += 1; + if (chatHit === 1) { + res.writeHead(403, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: { message: 'Insufficient balance' } })); + return; + } res.writeHead(200, { 'Content-Type': 'text/event-stream; charset=utf-8' }); - res.write('event: response.created\n'); - res.write('data: {"type":"response.created","response":{"id":"resp_test","model":"gpt-test"}}\n\n'); - res.write('event: response.completed\n'); - res.write('data: {"type":"response.completed","response":{"id":"resp_test","model":"gpt-test","output":[]}}\n\n'); + res.write('data: {"id":"chatcmpl_retry_ok","model":"deepseek-v4-flash","choices":[{"delta":{"content":"OK"},"finish_reason":"stop"}]}\n\n'); res.end('data: [DONE]\n\n'); return; } + res.writeHead(404, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'not found' })); + }); + const { port: upstreamPort } = await listen(upstream); + + const tmpDir = await mkdtemp(path.join(os.tmpdir(), 'codexmate-bridge-test-')); + const settingsFile = path.join(tmpDir, 'bridge.json'); + await writeFile(settingsFile, JSON.stringify({ + version: 1, + providers: { + test: { baseUrl: `http://127.0.0.1:${upstreamPort}/v1`, apiKey: 'sk-upstream' } + } + }), 'utf-8'); + + const handler = createOpenaiBridgeHttpHandler({ settingsFile, expectedToken: 'codexmate' }); + const bridge = http.createServer((req, res) => { + if (!handler(req, res)) { + res.statusCode = 404; + res.end('not handled'); + } + }); + const { port: bridgePort } = await listen(bridge); + + const resp = await requestText(`http://127.0.0.1:${bridgePort}/bridge/openai/test/v1/responses`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Accept': 'text/event-stream', + 'Authorization': 'Bearer codexmate' + }, + body: { model: 'deepseek-v4-flash', input: 'hello', stream: true } + }); + assert.equal(resp.status, 200); + assert.equal(responsesHit, false); + assert.equal(chatHit, 2); + assert.match(resp.headers['content-type'] || '', /text\/event-stream/); + assert.match(resp.text, /event: response\.completed/); + assert.doesNotMatch(resp.text, /event: response\.failed/); + assert.match(resp.text, /OK/); + assert.match(resp.text, /data: \[DONE\]/); + + await closeServer(bridge); + await closeServer(upstream); + await rm(tmpDir, { recursive: true, force: true }); +}); + +test('openai-bridge retries direct chat 403 before completed error SSE for streaming Codex clients', async () => { + let responsesHit = false; + let chatHit = 0; + const upstream = http.createServer((req, res) => { + if (req.url === '/v1/responses' && req.method === 'POST') { + responsesHit = true; + res.writeHead(500, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'responses should not be used for streaming Codex requests' })); + return; + } if (req.url === '/v1/chat/completions' && req.method === 'POST') { - chatHit = true; - res.writeHead(200, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ ok: true })); + chatHit += 1; + res.writeHead(403, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: { message: 'Insufficient balance' } })); + return; + } + res.writeHead(404, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'not found' })); + }); + const { port: upstreamPort } = await listen(upstream); + + const tmpDir = await mkdtemp(path.join(os.tmpdir(), 'codexmate-bridge-test-')); + const settingsFile = path.join(tmpDir, 'bridge.json'); + await writeFile(settingsFile, JSON.stringify({ + version: 1, + providers: { + test: { baseUrl: `http://127.0.0.1:${upstreamPort}/v1`, apiKey: 'sk-upstream' } + } + }), 'utf-8'); + + const handler = createOpenaiBridgeHttpHandler({ settingsFile, expectedToken: 'codexmate' }); + const bridge = http.createServer((req, res) => { + if (!handler(req, res)) { + res.statusCode = 404; + res.end('not handled'); + } + }); + const { port: bridgePort } = await listen(bridge); + + const resp = await requestText(`http://127.0.0.1:${bridgePort}/bridge/openai/test/v1/responses`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Accept': 'text/event-stream', + 'Authorization': 'Bearer codexmate' + }, + body: { model: 'deepseek-v4-flash', input: 'hello', stream: true } + }); + assert.equal(resp.status, 200); + assert.match(resp.headers['content-type'] || '', /text\/event-stream/); + assert.equal(responsesHit, false); + assert.equal(chatHit, 3); + assert.match(resp.text, /event: response\.completed/); + assert.doesNotMatch(resp.text, /event: response\.failed/); + assert.match(resp.text, /Upstream provider failed after retries:/); + assert.match(resp.text, /Insufficient balance/); + assert.match(resp.text, /data: \[DONE\]/); + + await closeServer(bridge); + await closeServer(upstream); + await rm(tmpDir, { recursive: true, force: true }); +}); + +test('openai-bridge streams chat conversion SSE with Codex identity headers', async () => { + let capturedChatHeaders = null; + let responsesHit = false; + const upstream = http.createServer((req, res) => { + if (req.url === '/v1/responses' && req.method === 'POST') { + responsesHit = true; + res.writeHead(500, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'responses should not be used for streaming Codex requests' })); + return; + } + if (req.url === '/v1/chat/completions' && req.method === 'POST') { + capturedChatHeaders = req.headers; + res.writeHead(200, { 'Content-Type': 'text/event-stream; charset=utf-8' }); + res.write('data: {"id":"chatcmpl_test","model":"gpt-test","choices":[{"delta":{"content":"hello"},"finish_reason":"stop"}]}\n\n'); + res.end('data: [DONE]\n\n'); return; } res.writeHead(404, { 'Content-Type': 'application/json' }); @@ -223,36 +431,37 @@ test('openai-bridge streams upstream Responses SSE with Codex identity headers', }); assert.equal(sse.status, 200); - assert.equal(chatHit, false, 'successful upstream Responses stream should not call chat/completions'); - assert.ok(capturedResponsesHeaders, 'upstream Responses request should be captured'); - assert.match(capturedResponsesHeaders['user-agent'] || '', /^codex_cli_rs\//); - assert.equal(capturedResponsesHeaders.version, '0.98.0'); - assert.equal(capturedResponsesHeaders['openai-beta'], 'responses=experimental'); - assert.equal(capturedResponsesHeaders.originator, 'codex_cli_rs'); + assert.equal(responsesHit, false); + assert.ok(capturedChatHeaders, 'upstream chat request should be captured'); + assert.match(capturedChatHeaders['user-agent'] || '', /^codex_cli_rs\//); + assert.equal(capturedChatHeaders.version, '0.98.0'); + assert.equal(capturedChatHeaders['openai-beta'], 'responses=experimental'); + assert.equal(capturedChatHeaders.originator, 'codex_cli_rs'); assert.match(sse.headers['content-type'], /text\/event-stream/i); assert.match(sse.text, /response\.created/); assert.match(sse.text, /response\.completed/); + assert.doesNotMatch(sse.text, /response\.failed/); - await bridge.close(); - await upstream.close(); + await closeServer(bridge); + await closeServer(upstream); await rm(tmpDir, { recursive: true, force: true }); }); -test('openai-bridge fails accepted upstream Responses SSE when stream goes idle', async () => { +test('openai-bridge completes streaming Codex SSE with error text when upstream chat stream goes idle', async () => { let responsesHit = false; - let chatHit = false; + let chatHit = 0; const upstream = http.createServer((req, res) => { if (req.url === '/v1/responses' && req.method === 'POST') { responsesHit = true; - res.writeHead(200, { 'Content-Type': 'text/event-stream; charset=utf-8' }); - if (typeof res.flushHeaders === 'function') res.flushHeaders(); - // Keep the connection open without data; the bridge must not hang forever. + res.writeHead(500, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'responses should not be used for streaming Codex requests' })); return; } if (req.url === '/v1/chat/completions' && req.method === 'POST') { - chatHit = true; + chatHit += 1; res.writeHead(200, { 'Content-Type': 'text/event-stream; charset=utf-8' }); - res.end('data: [DONE]\n\n'); + if (typeof res.flushHeaders === 'function') res.flushHeaders(); + // Keep the connection open without data; the bridge must not hang forever. return; } res.writeHead(404, { 'Content-Type': 'application/json' }); @@ -289,15 +498,16 @@ test('openai-bridge fails accepted upstream Responses SSE when stream goes idle' }); assert.equal(sse.status, 200); - assert.equal(responsesHit, true, 'upstream Responses SSE should be attempted'); - assert.equal(chatHit, false, 'accepted but idle Responses SSE should not fall back to chat/completions'); + assert.equal(responsesHit, false); + assert.equal(chatHit, 1, 'accepted chat SSE timeout must be finalized without retrying on the same response'); assert.match(sse.headers['content-type'], /text\/event-stream/i); - assert.match(sse.text, /response\.failed/); - assert.match(sse.text, /upstream stream idle timeout/); + assert.match(sse.text, /event: response\.completed/); + assert.doesNotMatch(sse.text, /event: response\.failed/); + assert.match(sse.text, /timeout|upstream stream failed|aborted/i); assert.match(sse.text, /data: \[DONE\]/); - await bridge.close(); - await upstream.close(); + await closeServer(bridge); + await closeServer(upstream); await rm(tmpDir, { recursive: true, force: true }); }); @@ -358,8 +568,8 @@ test('openai-bridge omits upstream reasoning_content from output_text deltas', a assert.match(sse.text, /"text":"answer"/); assert.match(sse.text, /data: \[DONE\]/); - await bridge.close(); - await upstream.close(); + await closeServer(bridge); + await closeServer(upstream); await rm(tmpDir, { recursive: true, force: true }); }); @@ -415,12 +625,89 @@ test('openai-bridge completes Responses SSE when upstream chat stream closes aft assert.doesNotMatch(sse.text, /event: response\.failed/); assert.match(sse.text, /data: \[DONE\]/); - await bridge.close(); - await upstream.close(); + await closeServer(bridge); + await closeServer(upstream); + await rm(tmpDir, { recursive: true, force: true }); +}); + +test('openai-bridge converts DSML tool-call text from chat fallback stream', async () => { + const upstream = http.createServer((req, res) => { + if (req.url === '/v1/responses' && req.method === 'POST') { + res.writeHead(404, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'responses endpoint unavailable' })); + return; + } + if (req.url === '/v1/chat/completions' && req.method === 'POST') { + res.writeHead(200, { 'Content-Type': 'text/event-stream; charset=utf-8' }); + const writeContent = (content) => { + res.write(`data: ${JSON.stringify({ + id: 'chatcmpl_dsml_tool', + model: 'deepseek-v4-flash', + choices: [{ delta: { content } }] + })}\n\n`); + }; + writeContent('好的,我先读取侧边栏相关文件。\n\n<|DSM'); + writeContent('L|tool_calls>\n<|DSML|invoke name="read">\n<|DSML|parameter name="filePath"\n string="true">/data/data/com.termux/files/home/\n codexmate/AGENTS.md\n<|DSML|parameter name="offset"\n string="false">0\n<|DSML|parameter name="limit"\n string="false">50\n\n<|DSML|invoke name="read">\n<|DSML|parameter name="filePath"\n string="true">/data/data/com.termux/files/home/\n codexmate/USER.md\n<|DSML|parameter name="offset"\n string="false">50\n<|DSML|parameter name="limit"\n string="false">50\n\n'); + res.write('data: {"id":"chatcmpl_dsml_tool","model":"deepseek-v4-flash","choices":[{"delta":{},"finish_reason":"tool_calls"}]}\n\n'); + res.end(); + return; + } + res.writeHead(404, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'not found' })); + }); + const { port: upstreamPort } = await listen(upstream); + + const tmpDir = await mkdtemp(path.join(os.tmpdir(), 'codexmate-bridge-test-')); + const settingsFile = path.join(tmpDir, 'bridge.json'); + await writeFile(settingsFile, JSON.stringify({ + version: 1, + providers: { + test: { baseUrl: `http://127.0.0.1:${upstreamPort}/v1`, apiKey: 'sk-upstream' } + } + }), 'utf-8'); + + const handler = createOpenaiBridgeHttpHandler({ settingsFile, expectedToken: 'codexmate' }); + const bridge = http.createServer((req, res) => { + if (!handler(req, res)) { + res.statusCode = 404; + res.end('not handled'); + } + }); + const { port: bridgePort } = await listen(bridge); + + const sse = await requestText(`http://127.0.0.1:${bridgePort}/bridge/openai/test/v1/responses`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Accept': 'text/event-stream', + 'Authorization': 'Bearer codexmate' + }, + body: { + model: 'deepseek-v4-flash', + input: 'read sidebar docs', + tools: [{ type: 'function', name: 'read', parameters: { type: 'object' } }], + stream: true + } + }); + assert.equal(sse.status, 200); + assert.match(sse.text, /event: response\.output_text\.delta/); + assert.match(sse.text, /好的,我先读取侧边栏相关文件。/); + assert.doesNotMatch(sse.text, /DSML/); + assert.doesNotMatch(sse.text, /invoke name/); + assert.match(sse.text, /event: response\.function_call_arguments\.delta/); + assert.match(sse.text, /event: response\.function_call_arguments\.done/); + assert.match(sse.text, /"name":"read"/); + assert.match(sse.text, /\\"filePath\\":\\"\/data\/data\/com\.termux\/files\/home\/codexmate\/AGENTS\.md\\"/); + assert.match(sse.text, /\\"filePath\\":\\"\/data\/data\/com\.termux\/files\/home\/codexmate\/USER\.md\\"/); + assert.match(sse.text, /event: response\.completed/); + assert.match(sse.text, /data: \[DONE\]/); + + await closeServer(bridge); + await closeServer(upstream); await rm(tmpDir, { recursive: true, force: true }); }); -test('openai-bridge reports failed Responses SSE when upstream chat stream ends before DONE', async () => { +test('openai-bridge completes Responses SSE when upstream chat stream ends before DONE', async () => { const upstream = http.createServer((req, res) => { if (req.url === '/v1/chat/completions' && req.method === 'POST') { res.writeHead(200, { 'Content-Type': 'text/event-stream; charset=utf-8' }); @@ -466,12 +753,13 @@ test('openai-bridge reports failed Responses SSE when upstream chat stream ends }); assert.equal(sse.status, 200); assert.match(sse.text, /response\.output_text\.delta/); - assert.match(sse.text, /event: response\.failed/); - assert.match(sse.text, /upstream stream ended before \[DONE\]/); - assert.doesNotMatch(sse.text, /event: response\.completed/); + assert.match(sse.text, /partial/); + assert.match(sse.text, /event: response\.completed/); + assert.doesNotMatch(sse.text, /event: response\.failed/); + assert.match(sse.text, /data: \[DONE\]/); - await bridge.close(); - await upstream.close(); + await closeServer(bridge); + await closeServer(upstream); await rm(tmpDir, { recursive: true, force: true }); }); @@ -538,8 +826,8 @@ test('openai-bridge returns JSON when stream requested but client does not accep assert.equal(parsed.object, 'response'); assert.equal(parsed.model, 'gpt-test'); - await bridge.close(); - await upstream.close(); + await closeServer(bridge); + await closeServer(upstream); await rm(tmpDir, { recursive: true, force: true }); }); @@ -601,8 +889,8 @@ test('openai-bridge allows loopback clients to send arbitrary Authorization head assert.equal(parsed.object, 'response'); assert.equal(parsed.model, 'gpt-test'); - await bridge.close(); - await upstream.close(); + await closeServer(bridge); + await closeServer(upstream); await rm(tmpDir, { recursive: true, force: true }); }); @@ -672,8 +960,8 @@ test('openai-bridge normalizes mixed tool definitions before upstream /responses parameters: { type: 'object', properties: { query: { type: 'string' } } } }]); - await bridge.close(); - await upstream.close(); + await closeServer(bridge); + await closeServer(upstream); await rm(tmpDir, { recursive: true, force: true }); }); @@ -761,8 +1049,8 @@ test('openai-bridge falls back to chat when upstream /responses rejects tool fun } }]); - await bridge.close(); - await upstream.close(); + await closeServer(bridge); + await closeServer(upstream); await rm(tmpDir, { recursive: true, force: true }); }); @@ -856,8 +1144,8 @@ test('openai-bridge falls back to upstream /chat/completions when /responses is }]); assert.deepStrictEqual(capturedChatRequest.tool_choice, { type: 'function', function: { name: 'lookup' } }); - await bridge.close(); - await upstream.close(); + await closeServer(bridge); + await closeServer(upstream); await rm(tmpDir, { recursive: true, force: true }); }); @@ -924,8 +1212,8 @@ test('openai-bridge falls back to /chat/completions when upstream /responses ret const parsed = JSON.parse(resp.text); assert.equal(parsed.object, 'response'); - await bridge.close(); - await upstream.close(); + await closeServer(bridge); + await closeServer(upstream); await rm(tmpDir, { recursive: true, force: true }); }); @@ -998,8 +1286,8 @@ test('openai-bridge falls back to /chat/completions when upstream /responses ret assert.equal(parsed.model, 'gpt-test'); assert.ok(Array.isArray(parsed.output)); - await bridge.close(); - await upstream.close(); + await closeServer(bridge); + await closeServer(upstream); await rm(tmpDir, { recursive: true, force: true }); }); @@ -1078,8 +1366,8 @@ test('openai-bridge merges codex developer-role AGENTS.md into a single leading const userMsg = msgs.find((m) => m && m.role === 'user'); assert.ok(userMsg, 'user message preserved'); - await bridge.close(); - await upstream.close(); + await closeServer(bridge); + await closeServer(upstream); await rm(tmpDir, { recursive: true, force: true }); }); @@ -1147,8 +1435,8 @@ test('openai-bridge SSE fast path also merges developer-role AGENTS.md into lead const leak = msgs.find((m) => m && m.role !== 'system' && typeof m.content === 'string' && /AGENTS_MARK_STREAM/.test(m.content)); assert.equal(leak, undefined, 'developer content must not leak into non-system role'); - await bridge.close(); - await upstream.close(); + await closeServer(bridge); + await closeServer(upstream); await rm(tmpDir, { recursive: true, force: true }); }); @@ -1213,8 +1501,8 @@ test('openai-bridge skips /v1/responses probe after upstream marks it unsupporte assert.equal(responsesHits, 1, 'second call should skip /v1/responses probe (cache hit)'); assert.equal(chatHits, 2, 'second call should go directly to chat/completions'); - await bridge.close(); - await upstream.close(); + await closeServer(bridge); + await closeServer(upstream); await rm(tmpDir, { recursive: true, force: true }); }); @@ -1271,8 +1559,8 @@ test('openai-bridge preserves multibyte UTF-8 deltas split across chunk boundari assert.match(sse.text, /"delta":"御坂"/); assert.doesNotMatch(sse.text, /�/); - await bridge.close(); - await upstream.close(); + await closeServer(bridge); + await closeServer(upstream); await rm(tmpDir, { recursive: true, force: true }); }); @@ -1544,7 +1832,7 @@ test('openai-bridge adds encrypted reasoning include when proxying upstream Resp assert.deepStrictEqual(capturedRequest.reasoning, { effort: 'high' }); assert.deepStrictEqual(capturedRequest.include, ['file_search_call.results', 'reasoning.encrypted_content']); - await bridge.close(); - await upstream.close(); + await closeServer(bridge); + await closeServer(upstream); await rm(tmpDir, { recursive: true, force: true }); });