From e562eee9748f951b85033eec97335fdb4645bb3d Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Wed, 4 Feb 2026 13:14:13 -0800 Subject: [PATCH 01/13] Migrate openai to use responses api --- apps/sim/app/api/wand/route.ts | 192 ++++++----- apps/sim/lib/copilot/chat-title.ts | 78 +++-- apps/sim/providers/azure-openai/index.ts | 337 ++++++++++++-------- apps/sim/providers/azure-openai/utils.ts | 37 --- apps/sim/providers/openai/index.ts | 297 ++++++++++------- apps/sim/providers/openai/utils.ts | 15 - apps/sim/providers/responses-utils.ts | 386 +++++++++++++++++++++++ 7 files changed, 944 insertions(+), 398 deletions(-) delete mode 100644 apps/sim/providers/azure-openai/utils.ts delete mode 100644 apps/sim/providers/openai/utils.ts create mode 100644 apps/sim/providers/responses-utils.ts diff --git a/apps/sim/app/api/wand/route.ts b/apps/sim/app/api/wand/route.ts index 54f914a2b7..9673387862 100644 --- a/apps/sim/app/api/wand/route.ts +++ b/apps/sim/app/api/wand/route.ts @@ -3,7 +3,6 @@ import { userStats, workflow } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { eq, sql } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' -import OpenAI, { AzureOpenAI } from 'openai' import { getBYOKKey } from '@/lib/api-key/byok' import { getSession } from '@/lib/auth' import { logModelUsage } from '@/lib/billing/core/usage-log' @@ -12,6 +11,7 @@ import { env } from '@/lib/core/config/env' import { getCostMultiplier, isBillingEnabled } from '@/lib/core/config/feature-flags' import { generateRequestId } from '@/lib/core/utils/request' import { verifyWorkspaceMembership } from '@/app/api/workflows/utils' +import { extractResponseText, parseResponsesUsage } from '@/providers/responses-utils' import { getModelPricing } from '@/providers/utils' export const dynamic = 'force-dynamic' @@ -28,18 +28,6 @@ const openaiApiKey = env.OPENAI_API_KEY const useWandAzure = azureApiKey && azureEndpoint && azureApiVersion -const client = useWandAzure - ? new AzureOpenAI({ - apiKey: azureApiKey, - apiVersion: azureApiVersion, - endpoint: azureEndpoint, - }) - : openaiApiKey - ? new OpenAI({ - apiKey: openaiApiKey, - }) - : null - if (!useWandAzure && !openaiApiKey) { logger.warn( 'Neither Azure OpenAI nor OpenAI API key found. Wand generation API will not function.' @@ -202,20 +190,18 @@ export async function POST(req: NextRequest) { } let isBYOK = false - let activeClient = client - let byokApiKey: string | null = null + let activeOpenAIKey = openaiApiKey if (workspaceId && !useWandAzure) { const byokResult = await getBYOKKey(workspaceId, 'openai') if (byokResult) { isBYOK = true - byokApiKey = byokResult.apiKey - activeClient = new OpenAI({ apiKey: byokResult.apiKey }) + activeOpenAIKey = byokResult.apiKey logger.info(`[${requestId}] Using BYOK OpenAI key for wand generation`) } } - if (!activeClient) { + if (!useWandAzure && !activeOpenAIKey) { logger.error(`[${requestId}] AI client not initialized. Missing API key.`) return NextResponse.json( { success: false, error: 'Wand generation service is not configured.' }, @@ -276,17 +262,18 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg ) const apiUrl = useWandAzure - ? `${azureEndpoint}/openai/deployments/${wandModelName}/chat/completions?api-version=${azureApiVersion}` - : 'https://api.openai.com/v1/chat/completions' + ? `${azureEndpoint}/openai/v1/responses?api-version=${azureApiVersion}` + : 'https://api.openai.com/v1/responses' const headers: Record = { 'Content-Type': 'application/json', + 'OpenAI-Beta': 'responses=v1', } if (useWandAzure) { headers['api-key'] = azureApiKey! } else { - headers.Authorization = `Bearer ${byokApiKey || openaiApiKey}` + headers.Authorization = `Bearer ${activeOpenAIKey}` } logger.debug(`[${requestId}] Making streaming request to: ${apiUrl}`) @@ -296,11 +283,10 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg headers, body: JSON.stringify({ model: useWandAzure ? wandModelName : 'gpt-4o', - messages: messages, + input: messages, temperature: 0.2, - max_tokens: 10000, + max_output_tokens: 10000, stream: true, - stream_options: { include_usage: true }, }), }) @@ -331,6 +317,7 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg let buffer = '' let chunkCount = 0 let finalUsage: any = null + let activeEventType: string | undefined while (true) { const { done, value } = await reader.read() @@ -348,27 +335,49 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg buffer = lines.pop() || '' for (const line of lines) { - if (line.startsWith('data: ')) { - const data = line.slice(6).trim() + const trimmed = line.trim() + if (!trimmed) { + continue + } - if (data === '[DONE]') { - logger.info(`[${requestId}] Received [DONE] signal`) + if (trimmed.startsWith('event:')) { + activeEventType = trimmed.slice(6).trim() + continue + } - if (finalUsage) { - await updateUserStatsForWand(session.user.id, finalUsage, requestId, isBYOK) - } + if (!trimmed.startsWith('data:')) { + continue + } + + const data = trimmed.slice(5).trim() + if (data === '[DONE]') { + logger.info(`[${requestId}] Received [DONE] signal`) - controller.enqueue( - encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`) - ) - controller.close() - return + if (finalUsage) { + await updateUserStatsForWand(session.user.id, finalUsage, requestId, isBYOK) } - try { - const parsed = JSON.parse(data) - const content = parsed.choices?.[0]?.delta?.content + controller.enqueue( + encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`) + ) + controller.close() + return + } + + try { + const parsed = JSON.parse(data) + const eventType = parsed?.type ?? activeEventType + + if ( + eventType === 'response.error' || + eventType === 'error' || + activeEventType === 'response.failed' + ) { + throw new Error(parsed?.error?.message || 'Responses stream error') + } + if (eventType === 'response.output_text.delta') { + const content = parsed.delta if (content) { chunkCount++ if (chunkCount === 1) { @@ -379,18 +388,23 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg encoder.encode(`data: ${JSON.stringify({ chunk: content })}\n\n`) ) } + } - if (parsed.usage) { - finalUsage = parsed.usage + if (eventType === 'response.completed') { + const usage = parseResponsesUsage(parsed?.response?.usage ?? parsed?.usage) + if (usage) { + finalUsage = { + prompt_tokens: usage.promptTokens, + completion_tokens: usage.completionTokens, + total_tokens: usage.totalTokens, + } logger.info( - `[${requestId}] Received usage data: ${JSON.stringify(parsed.usage)}` + `[${requestId}] Received usage data: ${JSON.stringify(finalUsage)}` ) } - } catch (parseError) { - logger.debug( - `[${requestId}] Skipped non-JSON line: ${data.substring(0, 100)}` - ) } + } catch (parseError) { + logger.debug(`[${requestId}] Skipped non-JSON line: ${data.substring(0, 100)}`) } } } @@ -424,8 +438,6 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg message: error?.message || 'Unknown error', code: error?.code, status: error?.status, - responseStatus: error?.response?.status, - responseData: error?.response?.data ? safeStringify(error.response.data) : undefined, stack: error?.stack, useWandAzure, model: useWandAzure ? wandModelName : 'gpt-4o', @@ -440,14 +452,43 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg } } - const completion = await activeClient.chat.completions.create({ - model: useWandAzure ? wandModelName : 'gpt-4o', - messages: messages, - temperature: 0.3, - max_tokens: 10000, + const apiUrl = useWandAzure + ? `${azureEndpoint}/openai/v1/responses?api-version=${azureApiVersion}` + : 'https://api.openai.com/v1/responses' + + const headers: Record = { + 'Content-Type': 'application/json', + 'OpenAI-Beta': 'responses=v1', + } + + if (useWandAzure) { + headers['api-key'] = azureApiKey! + } else { + headers.Authorization = `Bearer ${activeOpenAIKey}` + } + + const response = await fetch(apiUrl, { + method: 'POST', + headers, + body: JSON.stringify({ + model: useWandAzure ? wandModelName : 'gpt-4o', + input: messages, + temperature: 0.3, + max_output_tokens: 10000, + }), }) - const generatedContent = completion.choices[0]?.message?.content?.trim() + if (!response.ok) { + const errorText = await response.text() + const apiError = new Error( + `API request failed: ${response.status} ${response.statusText} - ${errorText}` + ) + ;(apiError as any).status = response.status + throw apiError + } + + const completion = await response.json() + const generatedContent = extractResponseText(completion.output)?.trim() if (!generatedContent) { logger.error( @@ -461,8 +502,18 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg logger.info(`[${requestId}] Wand generation successful`) - if (completion.usage) { - await updateUserStatsForWand(session.user.id, completion.usage, requestId, isBYOK) + const usage = parseResponsesUsage(completion.usage) + if (usage) { + await updateUserStatsForWand( + session.user.id, + { + prompt_tokens: usage.promptTokens, + completion_tokens: usage.completionTokens, + total_tokens: usage.totalTokens, + }, + requestId, + isBYOK + ) } return NextResponse.json({ success: true, content: generatedContent }) @@ -472,10 +523,6 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg message: error?.message || 'Unknown error', code: error?.code, status: error?.status, - responseStatus: error instanceof OpenAI.APIError ? error.status : error?.response?.status, - responseData: (error as any)?.response?.data - ? safeStringify((error as any).response.data) - : undefined, stack: error?.stack, useWandAzure, model: useWandAzure ? wandModelName : 'gpt-4o', @@ -484,26 +531,19 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg }) let clientErrorMessage = 'Wand generation failed. Please try again later.' - let status = 500 + let status = typeof (error as any)?.status === 'number' ? (error as any).status : 500 - if (error instanceof OpenAI.APIError) { - status = error.status || 500 - logger.error( - `[${requestId}] ${useWandAzure ? 'Azure OpenAI' : 'OpenAI'} API Error: ${status} - ${error.message}` - ) - - if (status === 401) { - clientErrorMessage = 'Authentication failed. Please check your API key configuration.' - } else if (status === 429) { - clientErrorMessage = 'Rate limit exceeded. Please try again later.' - } else if (status >= 500) { - clientErrorMessage = - 'The wand generation service is currently unavailable. Please try again later.' - } - } else if (useWandAzure && error.message?.includes('DeploymentNotFound')) { + if (useWandAzure && error?.message?.includes('DeploymentNotFound')) { clientErrorMessage = 'Azure OpenAI deployment not found. Please check your model deployment configuration.' status = 404 + } else if (status === 401) { + clientErrorMessage = 'Authentication failed. Please check your API key configuration.' + } else if (status === 429) { + clientErrorMessage = 'Rate limit exceeded. Please try again later.' + } else if (status >= 500) { + clientErrorMessage = + 'The wand generation service is currently unavailable. Please try again later.' } return NextResponse.json( diff --git a/apps/sim/lib/copilot/chat-title.ts b/apps/sim/lib/copilot/chat-title.ts index 7e383bdbe3..0edbcc30a6 100644 --- a/apps/sim/lib/copilot/chat-title.ts +++ b/apps/sim/lib/copilot/chat-title.ts @@ -1,6 +1,6 @@ import { createLogger } from '@sim/logger' -import OpenAI, { AzureOpenAI } from 'openai' import { env } from '@/lib/core/config/env' +import { extractResponseText } from '@/providers/responses-utils' const logger = createLogger('SimAgentUtils') @@ -12,47 +12,65 @@ const openaiApiKey = env.OPENAI_API_KEY const useChatTitleAzure = azureApiKey && azureEndpoint && azureApiVersion -const client = useChatTitleAzure - ? new AzureOpenAI({ - apiKey: azureApiKey, - apiVersion: azureApiVersion, - endpoint: azureEndpoint, - }) - : openaiApiKey - ? new OpenAI({ - apiKey: openaiApiKey, - }) - : null - /** * Generates a short title for a chat based on the first message * @param message First user message in the chat * @returns A short title or null if API key is not available */ export async function generateChatTitle(message: string): Promise { - if (!client) { + if (!useChatTitleAzure && !openaiApiKey) { return null } try { - const response = await client.chat.completions.create({ - model: useChatTitleAzure ? chatTitleModelName : 'gpt-4o', - messages: [ - { - role: 'system', - content: - 'Generate a very short title (3-5 words max) for a chat that starts with this message. The title should be concise and descriptive. Do not wrap the title in quotes.', - }, - { - role: 'user', - content: message, - }, - ], - max_tokens: 20, - temperature: 0.2, + const apiUrl = useChatTitleAzure + ? `${azureEndpoint}/openai/v1/responses?api-version=${azureApiVersion}` + : 'https://api.openai.com/v1/responses' + + const headers: Record = { + 'Content-Type': 'application/json', + 'OpenAI-Beta': 'responses=v1', + } + + if (useChatTitleAzure) { + headers['api-key'] = azureApiKey! + } else { + headers.Authorization = `Bearer ${openaiApiKey}` + } + + const response = await fetch(apiUrl, { + method: 'POST', + headers, + body: JSON.stringify({ + model: useChatTitleAzure ? chatTitleModelName : 'gpt-4o', + input: [ + { + role: 'system', + content: + 'Generate a very short title (3-5 words max) for a chat that starts with this message. The title should be concise and descriptive. Do not wrap the title in quotes.', + }, + { + role: 'user', + content: message, + }, + ], + max_output_tokens: 20, + temperature: 0.2, + }), }) - const title = response.choices[0]?.message?.content?.trim() || null + if (!response.ok) { + const errorText = await response.text() + logger.error('Error generating chat title:', { + status: response.status, + statusText: response.statusText, + error: errorText, + }) + return null + } + + const data = await response.json() + const title = extractResponseText(data.output)?.trim() || null return title } catch (error) { logger.error('Error generating chat title:', error) diff --git a/apps/sim/providers/azure-openai/index.ts b/apps/sim/providers/azure-openai/index.ts index 195103ffe0..f535dd27f5 100644 --- a/apps/sim/providers/azure-openai/index.ts +++ b/apps/sim/providers/azure-openai/index.ts @@ -1,15 +1,22 @@ import { createLogger } from '@sim/logger' -import { AzureOpenAI } from 'openai' -import type { ChatCompletionCreateParamsStreaming } from 'openai/resources/chat/completions' import { env } from '@/lib/core/config/env' import type { StreamingExecution } from '@/executor/types' import { MAX_TOOL_ITERATIONS } from '@/providers' -import { - checkForForcedToolUsage, - createReadableStreamFromAzureOpenAIStream, -} from '@/providers/azure-openai/utils' import { getProviderDefaultModel, getProviderModels } from '@/providers/models' +import { + buildResponsesInputFromMessages, + convertResponseOutputToInputItems, + convertToolsToResponses, + createReadableStreamFromResponses, + extractResponseText, + extractResponseToolCalls, + parseResponsesUsage, + type ResponsesInputItem, + type ResponsesToolCall, + toResponsesToolChoice, +} from '@/providers/responses-utils' import type { + Message, ProviderConfig, ProviderRequest, ProviderResponse, @@ -19,6 +26,7 @@ import { calculateCost, prepareToolExecution, prepareToolsWithUsageControl, + trackForcedToolUsage, } from '@/providers/utils' import { executeTool } from '@/tools' @@ -58,13 +66,14 @@ export const azureOpenAIProvider: ProviderConfig = { ) } - const azureOpenAI = new AzureOpenAI({ - apiKey: request.apiKey, - apiVersion: azureApiVersion, - endpoint: azureEndpoint, - }) + if (!request.apiKey) { + throw new Error('API key is required for Azure OpenAI') + } + + const deploymentName = request.model.replace('azure/', '') + const apiUrl = `${azureEndpoint.replace(/\/$/, '')}/openai/v1/responses?api-version=${azureApiVersion}` - const allMessages = [] + const allMessages: Message[] = [] if (request.systemPrompt) { allMessages.push({ @@ -84,31 +93,31 @@ export const azureOpenAIProvider: ProviderConfig = { allMessages.push(...request.messages) } - const tools = request.tools?.length - ? request.tools.map((tool) => ({ - type: 'function', - function: { - name: tool.id, - description: tool.description, - parameters: tool.parameters, - }, - })) - : undefined + const initialInput = buildResponsesInputFromMessages(allMessages) - const deploymentName = request.model.replace('azure/', '') - const payload: any = { + const basePayload: Record = { model: deploymentName, - messages: allMessages, } - if (request.temperature !== undefined) payload.temperature = request.temperature - if (request.maxTokens != null) payload.max_completion_tokens = request.maxTokens + if (request.temperature !== undefined) basePayload.temperature = request.temperature + if (request.maxTokens != null) basePayload.max_output_tokens = request.maxTokens - if (request.reasoningEffort !== undefined) payload.reasoning_effort = request.reasoningEffort - if (request.verbosity !== undefined) payload.verbosity = request.verbosity + if (request.reasoningEffort !== undefined) { + basePayload.reasoning = { + effort: request.reasoningEffort, + summary: 'auto', + } + } + + if (request.verbosity !== undefined) { + basePayload.text = { + ...(basePayload.text ?? {}), + verbosity: request.verbosity, + } + } if (request.responseFormat) { - payload.response_format = { + basePayload.response_format = { type: 'json_schema', json_schema: { name: request.responseFormat.name || 'response_schema', @@ -120,23 +129,47 @@ export const azureOpenAIProvider: ProviderConfig = { logger.info('Added JSON schema response format to Azure OpenAI request') } + const tools = request.tools?.length + ? request.tools.map((tool) => ({ + type: 'function', + function: { + name: tool.id, + description: tool.description, + parameters: tool.parameters, + }, + })) + : undefined + let preparedTools: ReturnType | null = null + let responsesToolChoice: ReturnType | undefined if (tools?.length) { preparedTools = prepareToolsWithUsageControl(tools, request.tools, logger, 'azure-openai') const { tools: filteredTools, toolChoice } = preparedTools - if (filteredTools?.length && toolChoice) { - payload.tools = filteredTools - payload.tool_choice = toolChoice + if (filteredTools?.length) { + const convertedTools = convertToolsToResponses(filteredTools) + if (!convertedTools.length) { + throw new Error('All tools have empty names') + } + + basePayload.tools = convertedTools + basePayload.parallel_tool_calls = true + } + + if (toolChoice) { + responsesToolChoice = toResponsesToolChoice(toolChoice) + if (responsesToolChoice) { + basePayload.tool_choice = responsesToolChoice + } logger.info('Azure OpenAI request configuration:', { - toolCount: filteredTools.length, + toolCount: filteredTools?.length || 0, toolChoice: typeof toolChoice === 'string' ? toolChoice : toolChoice.type === 'function' - ? `force:${toolChoice.function.name}` + ? `force:${toolChoice.function?.name}` : toolChoice.type === 'tool' ? `force:${toolChoice.name}` : toolChoice.type === 'any' @@ -147,6 +180,46 @@ export const azureOpenAIProvider: ProviderConfig = { } } + const headers: Record = { + 'Content-Type': 'application/json', + 'OpenAI-Beta': 'responses=v1', + 'api-key': request.apiKey, + } + + const createRequestBody = ( + input: ResponsesInputItem[], + overrides: Record = {} + ) => ({ + ...basePayload, + input, + ...overrides, + }) + + const parseErrorResponse = async (response: Response): Promise => { + const text = await response.text() + try { + const payload = JSON.parse(text) + return payload?.error?.message || text + } catch { + return text + } + } + + const postResponses = async (body: Record) => { + const response = await fetch(apiUrl, { + method: 'POST', + headers, + body: JSON.stringify(body), + }) + + if (!response.ok) { + const message = await parseErrorResponse(response) + throw new Error(`Azure OpenAI API error (${response.status}): ${message}`) + } + + return response.json() + } + const providerStartTime = Date.now() const providerStartTimeISO = new Date(providerStartTime).toISOString() @@ -154,26 +227,30 @@ export const azureOpenAIProvider: ProviderConfig = { if (request.stream && (!tools || tools.length === 0)) { logger.info('Using streaming response for Azure OpenAI request') - const streamingParams: ChatCompletionCreateParamsStreaming = { - ...payload, - stream: true, - stream_options: { include_usage: true }, + const streamResponse = await fetch(apiUrl, { + method: 'POST', + headers, + body: JSON.stringify(createRequestBody(initialInput, { stream: true })), + }) + + if (!streamResponse.ok) { + const message = await parseErrorResponse(streamResponse) + throw new Error(`Azure OpenAI API error (${streamResponse.status}): ${message}`) } - const streamResponse = await azureOpenAI.chat.completions.create(streamingParams) const streamingResult = { - stream: createReadableStreamFromAzureOpenAIStream(streamResponse, (content, usage) => { + stream: createReadableStreamFromResponses(streamResponse, (content, usage) => { streamingResult.execution.output.content = content streamingResult.execution.output.tokens = { - input: usage.prompt_tokens, - output: usage.completion_tokens, - total: usage.total_tokens, + input: usage?.promptTokens || 0, + output: usage?.completionTokens || 0, + total: usage?.totalTokens || 0, } const costResult = calculateCost( request.model, - usage.prompt_tokens, - usage.completion_tokens + usage?.promptTokens || 0, + usage?.completionTokens || 0 ) streamingResult.execution.output.cost = { input: costResult.input, @@ -233,26 +310,48 @@ export const azureOpenAIProvider: ProviderConfig = { } const initialCallTime = Date.now() - const originalToolChoice = payload.tool_choice const forcedTools = preparedTools?.forcedTools || [] let usedForcedTools: string[] = [] + let hasUsedForcedTool = false + let currentToolChoice = responsesToolChoice + + const checkForForcedToolUsage = ( + toolCallsInResponse: ResponsesToolCall[], + toolChoice: string | { type: string; name?: string } | undefined + ) => { + if (typeof toolChoice === 'object' && toolCallsInResponse.length > 0) { + const result = trackForcedToolUsage( + toolCallsInResponse, + toolChoice, + logger, + 'azure-openai', + forcedTools, + usedForcedTools + ) + hasUsedForcedTool = result.hasUsedForcedTool + usedForcedTools = result.usedForcedTools + } + } - let currentResponse = await azureOpenAI.chat.completions.create(payload) + const currentInput: ResponsesInputItem[] = [...initialInput] + let currentResponse = await postResponses( + createRequestBody(currentInput, { tool_choice: currentToolChoice }) + ) const firstResponseTime = Date.now() - initialCallTime - let content = currentResponse.choices[0]?.message?.content || '' + const initialUsage = parseResponsesUsage(currentResponse.usage) const tokens = { - input: currentResponse.usage?.prompt_tokens || 0, - output: currentResponse.usage?.completion_tokens || 0, - total: currentResponse.usage?.total_tokens || 0, + input: initialUsage?.promptTokens || 0, + output: initialUsage?.completionTokens || 0, + total: initialUsage?.totalTokens || 0, } + const toolCalls = [] const toolResults = [] - const currentMessages = [...allMessages] let iterationCount = 0 let modelTime = firstResponseTime let toolsTime = 0 - let hasUsedForcedTool = false + let content = extractResponseText(currentResponse.output) || '' const timeSegments: TimeSegment[] = [ { @@ -264,41 +363,43 @@ export const azureOpenAIProvider: ProviderConfig = { }, ] - const firstCheckResult = checkForForcedToolUsage( - currentResponse, - originalToolChoice, - logger, - forcedTools, - usedForcedTools - ) - hasUsedForcedTool = firstCheckResult.hasUsedForcedTool - usedForcedTools = firstCheckResult.usedForcedTools + checkForForcedToolUsage(extractResponseToolCalls(currentResponse.output), currentToolChoice) while (iterationCount < MAX_TOOL_ITERATIONS) { - if (currentResponse.choices[0]?.message?.content) { - content = currentResponse.choices[0].message.content + const responseText = extractResponseText(currentResponse.output) + if (responseText) { + content = responseText } - const toolCallsInResponse = currentResponse.choices[0]?.message?.tool_calls - if (!toolCallsInResponse || toolCallsInResponse.length === 0) { + const toolCallsInResponse = extractResponseToolCalls(currentResponse.output) + if (!toolCallsInResponse.length) { break } + const outputInputItems = convertResponseOutputToInputItems(currentResponse.output) + if (outputInputItems.length) { + currentInput.push(...outputInputItems) + } + logger.info( - `Processing ${toolCallsInResponse.length} tool calls (iteration ${iterationCount + 1}/${MAX_TOOL_ITERATIONS})` + `Processing ${toolCallsInResponse.length} tool calls in parallel (iteration ${ + iterationCount + 1 + }/${MAX_TOOL_ITERATIONS})` ) const toolsStartTime = Date.now() const toolExecutionPromises = toolCallsInResponse.map(async (toolCall) => { const toolCallStartTime = Date.now() - const toolName = toolCall.function.name + const toolName = toolCall.name try { - const toolArgs = JSON.parse(toolCall.function.arguments) + const toolArgs = toolCall.arguments ? JSON.parse(toolCall.arguments) : {} const tool = request.tools?.find((t) => t.id === toolName) - if (!tool) return null + if (!tool) { + return null + } const { toolParams, executionParams } = prepareToolExecution(tool, toolArgs, request) const result = await executeTool(toolName, executionParams) @@ -335,19 +436,6 @@ export const azureOpenAIProvider: ProviderConfig = { const executionResults = await Promise.allSettled(toolExecutionPromises) - currentMessages.push({ - role: 'assistant', - content: null, - tool_calls: toolCallsInResponse.map((tc) => ({ - id: tc.id, - type: 'function', - function: { - name: tc.function.name, - arguments: tc.function.arguments, - }, - })), - }) - for (const settledResult of executionResults) { if (settledResult.status === 'rejected' || !settledResult.value) continue @@ -384,48 +472,38 @@ export const azureOpenAIProvider: ProviderConfig = { success: result.success, }) - currentMessages.push({ - role: 'tool', - tool_call_id: toolCall.id, - content: JSON.stringify(resultContent), + currentInput.push({ + type: 'function_call_output', + call_id: toolCall.id, + output: JSON.stringify(resultContent), }) } const thisToolsTime = Date.now() - toolsStartTime toolsTime += thisToolsTime - const nextPayload = { - ...payload, - messages: currentMessages, - } - - if (typeof originalToolChoice === 'object' && hasUsedForcedTool && forcedTools.length > 0) { + if (typeof currentToolChoice === 'object' && hasUsedForcedTool && forcedTools.length > 0) { const remainingTools = forcedTools.filter((tool) => !usedForcedTools.includes(tool)) if (remainingTools.length > 0) { - nextPayload.tool_choice = { + currentToolChoice = { type: 'function', - function: { name: remainingTools[0] }, + name: remainingTools[0], } logger.info(`Forcing next tool: ${remainingTools[0]}`) } else { - nextPayload.tool_choice = 'auto' + currentToolChoice = 'auto' logger.info('All forced tools have been used, switching to auto tool_choice') } } const nextModelStartTime = Date.now() - currentResponse = await azureOpenAI.chat.completions.create(nextPayload) - - const nextCheckResult = checkForForcedToolUsage( - currentResponse, - nextPayload.tool_choice, - logger, - forcedTools, - usedForcedTools + + currentResponse = await postResponses( + createRequestBody(currentInput, { tool_choice: currentToolChoice }) ) - hasUsedForcedTool = nextCheckResult.hasUsedForcedTool - usedForcedTools = nextCheckResult.usedForcedTools + + checkForForcedToolUsage(extractResponseToolCalls(currentResponse.output), currentToolChoice) const nextModelEndTime = Date.now() const thisModelTime = nextModelEndTime - nextModelStartTime @@ -440,14 +518,11 @@ export const azureOpenAIProvider: ProviderConfig = { modelTime += thisModelTime - if (currentResponse.choices[0]?.message?.content) { - content = currentResponse.choices[0].message.content - } - - if (currentResponse.usage) { - tokens.input += currentResponse.usage.prompt_tokens || 0 - tokens.output += currentResponse.usage.completion_tokens || 0 - tokens.total += currentResponse.usage.total_tokens || 0 + const usage = parseResponsesUsage(currentResponse.usage) + if (usage) { + tokens.input += usage.promptTokens + tokens.output += usage.completionTokens + tokens.total += usage.totalTokens } iterationCount++ @@ -458,28 +533,32 @@ export const azureOpenAIProvider: ProviderConfig = { const accumulatedCost = calculateCost(request.model, tokens.input, tokens.output) - const streamingParams: ChatCompletionCreateParamsStreaming = { - ...payload, - messages: currentMessages, - tool_choice: 'auto', - stream: true, - stream_options: { include_usage: true }, + const streamResponse = await fetch(apiUrl, { + method: 'POST', + headers, + body: JSON.stringify( + createRequestBody(currentInput, { stream: true, tool_choice: 'auto' }) + ), + }) + + if (!streamResponse.ok) { + const message = await parseErrorResponse(streamResponse) + throw new Error(`Azure OpenAI API error (${streamResponse.status}): ${message}`) } - const streamResponse = await azureOpenAI.chat.completions.create(streamingParams) const streamingResult = { - stream: createReadableStreamFromAzureOpenAIStream(streamResponse, (content, usage) => { + stream: createReadableStreamFromResponses(streamResponse, (content, usage) => { streamingResult.execution.output.content = content streamingResult.execution.output.tokens = { - input: tokens.input + usage.prompt_tokens, - output: tokens.output + usage.completion_tokens, - total: tokens.total + usage.total_tokens, + input: tokens.input + (usage?.promptTokens || 0), + output: tokens.output + (usage?.completionTokens || 0), + total: tokens.total + (usage?.totalTokens || 0), } const streamCost = calculateCost( request.model, - usage.prompt_tokens, - usage.completion_tokens + usage?.promptTokens || 0, + usage?.completionTokens || 0 ) streamingResult.execution.output.cost = { input: accumulatedCost.input + streamCost.input, @@ -564,7 +643,7 @@ export const azureOpenAIProvider: ProviderConfig = { }) const enhancedError = new Error(error instanceof Error ? error.message : String(error)) - // @ts-ignore + // @ts-ignore - Adding timing property to the error enhancedError.timing = { startTime: providerStartTimeISO, endTime: providerEndTimeISO, diff --git a/apps/sim/providers/azure-openai/utils.ts b/apps/sim/providers/azure-openai/utils.ts deleted file mode 100644 index a3d12fa966..0000000000 --- a/apps/sim/providers/azure-openai/utils.ts +++ /dev/null @@ -1,37 +0,0 @@ -import type { Logger } from '@sim/logger' -import type { ChatCompletionChunk } from 'openai/resources/chat/completions' -import type { CompletionUsage } from 'openai/resources/completions' -import type { Stream } from 'openai/streaming' -import { checkForForcedToolUsageOpenAI, createOpenAICompatibleStream } from '@/providers/utils' - -/** - * Creates a ReadableStream from an Azure OpenAI streaming response. - * Uses the shared OpenAI-compatible streaming utility. - */ -export function createReadableStreamFromAzureOpenAIStream( - azureOpenAIStream: Stream, - onComplete?: (content: string, usage: CompletionUsage) => void -): ReadableStream { - return createOpenAICompatibleStream(azureOpenAIStream, 'Azure OpenAI', onComplete) -} - -/** - * Checks if a forced tool was used in an Azure OpenAI response. - * Uses the shared OpenAI-compatible forced tool usage helper. - */ -export function checkForForcedToolUsage( - response: any, - toolChoice: string | { type: string; function?: { name: string }; name?: string; any?: any }, - _logger: Logger, - forcedTools: string[], - usedForcedTools: string[] -): { hasUsedForcedTool: boolean; usedForcedTools: string[] } { - return checkForForcedToolUsageOpenAI( - response, - toolChoice, - 'Azure OpenAI', - forcedTools, - usedForcedTools, - _logger - ) -} diff --git a/apps/sim/providers/openai/index.ts b/apps/sim/providers/openai/index.ts index b2cecfceb2..f4cf93306e 100644 --- a/apps/sim/providers/openai/index.ts +++ b/apps/sim/providers/openai/index.ts @@ -1,11 +1,21 @@ import { createLogger } from '@sim/logger' -import OpenAI from 'openai' -import type { ChatCompletionCreateParamsStreaming } from 'openai/resources/chat/completions' import type { StreamingExecution } from '@/executor/types' import { MAX_TOOL_ITERATIONS } from '@/providers' import { getProviderDefaultModel, getProviderModels } from '@/providers/models' -import { createReadableStreamFromOpenAIStream } from '@/providers/openai/utils' +import { + buildResponsesInputFromMessages, + convertResponseOutputToInputItems, + convertToolsToResponses, + createReadableStreamFromResponses, + extractResponseText, + extractResponseToolCalls, + parseResponsesUsage, + type ResponsesInputItem, + type ResponsesToolCall, + toResponsesToolChoice, +} from '@/providers/responses-utils' import type { + Message, ProviderConfig, ProviderRequest, ProviderResponse, @@ -20,6 +30,7 @@ import { import { executeTool } from '@/tools' const logger = createLogger('OpenAIProvider') +const responsesEndpoint = 'https://api.openai.com/v1/responses' export const openaiProvider: ProviderConfig = { id: 'openai', @@ -42,9 +53,11 @@ export const openaiProvider: ProviderConfig = { stream: !!request.stream, }) - const openai = new OpenAI({ apiKey: request.apiKey }) + if (!request.apiKey) { + throw new Error('API key is required for OpenAI') + } - const allMessages = [] + const allMessages: Message[] = [] if (request.systemPrompt) { allMessages.push({ @@ -64,30 +77,31 @@ export const openaiProvider: ProviderConfig = { allMessages.push(...request.messages) } - const tools = request.tools?.length - ? request.tools.map((tool) => ({ - type: 'function', - function: { - name: tool.id, - description: tool.description, - parameters: tool.parameters, - }, - })) - : undefined + const initialInput = buildResponsesInputFromMessages(allMessages) - const payload: any = { + const basePayload: Record = { model: request.model, - messages: allMessages, } - if (request.temperature !== undefined) payload.temperature = request.temperature - if (request.maxTokens != null) payload.max_completion_tokens = request.maxTokens + if (request.temperature !== undefined) basePayload.temperature = request.temperature + if (request.maxTokens != null) basePayload.max_output_tokens = request.maxTokens + + if (request.reasoningEffort !== undefined) { + basePayload.reasoning = { + effort: request.reasoningEffort, + summary: 'auto', + } + } - if (request.reasoningEffort !== undefined) payload.reasoning_effort = request.reasoningEffort - if (request.verbosity !== undefined) payload.verbosity = request.verbosity + if (request.verbosity !== undefined) { + basePayload.text = { + ...(basePayload.text ?? {}), + verbosity: request.verbosity, + } + } if (request.responseFormat) { - payload.response_format = { + basePayload.response_format = { type: 'json_schema', json_schema: { name: request.responseFormat.name || 'response_schema', @@ -99,23 +113,47 @@ export const openaiProvider: ProviderConfig = { logger.info('Added JSON schema response format to request') } + const tools = request.tools?.length + ? request.tools.map((tool) => ({ + type: 'function', + function: { + name: tool.id, + description: tool.description, + parameters: tool.parameters, + }, + })) + : undefined + let preparedTools: ReturnType | null = null + let responsesToolChoice: ReturnType | undefined if (tools?.length) { preparedTools = prepareToolsWithUsageControl(tools, request.tools, logger, 'openai') const { tools: filteredTools, toolChoice } = preparedTools - if (filteredTools?.length && toolChoice) { - payload.tools = filteredTools - payload.tool_choice = toolChoice + if (filteredTools?.length) { + const convertedTools = convertToolsToResponses(filteredTools) + if (!convertedTools.length) { + throw new Error('All tools have empty names') + } + + basePayload.tools = convertedTools + basePayload.parallel_tool_calls = true + } + + if (toolChoice) { + responsesToolChoice = toResponsesToolChoice(toolChoice) + if (responsesToolChoice) { + basePayload.tool_choice = responsesToolChoice + } logger.info('OpenAI request configuration:', { - toolCount: filteredTools.length, + toolCount: filteredTools?.length || 0, toolChoice: typeof toolChoice === 'string' ? toolChoice : toolChoice.type === 'function' - ? `force:${toolChoice.function.name}` + ? `force:${toolChoice.function?.name}` : toolChoice.type === 'tool' ? `force:${toolChoice.name}` : toolChoice.type === 'any' @@ -126,6 +164,46 @@ export const openaiProvider: ProviderConfig = { } } + const headers: Record = { + Authorization: `Bearer ${request.apiKey}`, + 'Content-Type': 'application/json', + 'OpenAI-Beta': 'responses=v1', + } + + const createRequestBody = ( + input: ResponsesInputItem[], + overrides: Record = {} + ) => ({ + ...basePayload, + input, + ...overrides, + }) + + const parseErrorResponse = async (response: Response): Promise => { + const text = await response.text() + try { + const payload = JSON.parse(text) + return payload?.error?.message || text + } catch { + return text + } + } + + const postResponses = async (body: Record) => { + const response = await fetch(responsesEndpoint, { + method: 'POST', + headers, + body: JSON.stringify(body), + }) + + if (!response.ok) { + const message = await parseErrorResponse(response) + throw new Error(`OpenAI API error (${response.status}): ${message}`) + } + + return response.json() + } + const providerStartTime = Date.now() const providerStartTimeISO = new Date(providerStartTime).toISOString() @@ -133,26 +211,30 @@ export const openaiProvider: ProviderConfig = { if (request.stream && (!tools || tools.length === 0)) { logger.info('Using streaming response for OpenAI request') - const streamingParams: ChatCompletionCreateParamsStreaming = { - ...payload, - stream: true, - stream_options: { include_usage: true }, + const streamResponse = await fetch(responsesEndpoint, { + method: 'POST', + headers, + body: JSON.stringify(createRequestBody(initialInput, { stream: true })), + }) + + if (!streamResponse.ok) { + const message = await parseErrorResponse(streamResponse) + throw new Error(`OpenAI API error (${streamResponse.status}): ${message}`) } - const streamResponse = await openai.chat.completions.create(streamingParams) const streamingResult = { - stream: createReadableStreamFromOpenAIStream(streamResponse, (content, usage) => { + stream: createReadableStreamFromResponses(streamResponse, (content, usage) => { streamingResult.execution.output.content = content streamingResult.execution.output.tokens = { - input: usage.prompt_tokens, - output: usage.completion_tokens, - total: usage.total_tokens, + input: usage?.promptTokens || 0, + output: usage?.completionTokens || 0, + total: usage?.totalTokens || 0, } const costResult = calculateCost( request.model, - usage.prompt_tokens, - usage.completion_tokens + usage?.promptTokens || 0, + usage?.completionTokens || 0 ) streamingResult.execution.output.cost = { input: costResult.input, @@ -212,23 +294,18 @@ export const openaiProvider: ProviderConfig = { } const initialCallTime = Date.now() - - const originalToolChoice = payload.tool_choice - const forcedTools = preparedTools?.forcedTools || [] let usedForcedTools: string[] = [] + let hasUsedForcedTool = false + let currentToolChoice = responsesToolChoice - /** - * Helper function to check for forced tool usage in responses - */ const checkForForcedToolUsage = ( - response: any, - toolChoice: string | { type: string; function?: { name: string }; name?: string; any?: any } + toolCallsInResponse: ResponsesToolCall[], + toolChoice: string | { type: string; name?: string } | undefined ) => { - if (typeof toolChoice === 'object' && response.choices[0]?.message?.tool_calls) { - const toolCallsResponse = response.choices[0].message.tool_calls + if (typeof toolChoice === 'object' && toolCallsInResponse.length > 0) { const result = trackForcedToolUsage( - toolCallsResponse, + toolCallsInResponse, toolChoice, logger, 'openai', @@ -240,24 +317,25 @@ export const openaiProvider: ProviderConfig = { } } - let currentResponse = await openai.chat.completions.create(payload) + const currentInput: ResponsesInputItem[] = [...initialInput] + let currentResponse = await postResponses( + createRequestBody(currentInput, { tool_choice: currentToolChoice }) + ) const firstResponseTime = Date.now() - initialCallTime - let content = currentResponse.choices[0]?.message?.content || '' + const initialUsage = parseResponsesUsage(currentResponse.usage) const tokens = { - input: currentResponse.usage?.prompt_tokens || 0, - output: currentResponse.usage?.completion_tokens || 0, - total: currentResponse.usage?.total_tokens || 0, + input: initialUsage?.promptTokens || 0, + output: initialUsage?.completionTokens || 0, + total: initialUsage?.totalTokens || 0, } + const toolCalls = [] const toolResults = [] - const currentMessages = [...allMessages] let iterationCount = 0 - let modelTime = firstResponseTime let toolsTime = 0 - - let hasUsedForcedTool = false + let content = extractResponseText(currentResponse.output) || '' const timeSegments: TimeSegment[] = [ { @@ -269,30 +347,38 @@ export const openaiProvider: ProviderConfig = { }, ] - checkForForcedToolUsage(currentResponse, originalToolChoice) + checkForForcedToolUsage(extractResponseToolCalls(currentResponse.output), currentToolChoice) while (iterationCount < MAX_TOOL_ITERATIONS) { - if (currentResponse.choices[0]?.message?.content) { - content = currentResponse.choices[0].message.content + const responseText = extractResponseText(currentResponse.output) + if (responseText) { + content = responseText } - const toolCallsInResponse = currentResponse.choices[0]?.message?.tool_calls - if (!toolCallsInResponse || toolCallsInResponse.length === 0) { + const toolCallsInResponse = extractResponseToolCalls(currentResponse.output) + if (!toolCallsInResponse.length) { break } + const outputInputItems = convertResponseOutputToInputItems(currentResponse.output) + if (outputInputItems.length) { + currentInput.push(...outputInputItems) + } + logger.info( - `Processing ${toolCallsInResponse.length} tool calls in parallel (iteration ${iterationCount + 1}/${MAX_TOOL_ITERATIONS})` + `Processing ${toolCallsInResponse.length} tool calls in parallel (iteration ${ + iterationCount + 1 + }/${MAX_TOOL_ITERATIONS})` ) const toolsStartTime = Date.now() const toolExecutionPromises = toolCallsInResponse.map(async (toolCall) => { const toolCallStartTime = Date.now() - const toolName = toolCall.function.name + const toolName = toolCall.name try { - const toolArgs = JSON.parse(toolCall.function.arguments) + const toolArgs = toolCall.arguments ? JSON.parse(toolCall.arguments) : {} const tool = request.tools?.find((t) => t.id === toolName) if (!tool) { @@ -334,19 +420,6 @@ export const openaiProvider: ProviderConfig = { const executionResults = await Promise.allSettled(toolExecutionPromises) - currentMessages.push({ - role: 'assistant', - content: null, - tool_calls: toolCallsInResponse.map((tc) => ({ - id: tc.id, - type: 'function', - function: { - name: tc.function.name, - arguments: tc.function.arguments, - }, - })), - }) - for (const settledResult of executionResults) { if (settledResult.status === 'rejected' || !settledResult.value) continue @@ -383,41 +456,38 @@ export const openaiProvider: ProviderConfig = { success: result.success, }) - currentMessages.push({ - role: 'tool', - tool_call_id: toolCall.id, - content: JSON.stringify(resultContent), + currentInput.push({ + type: 'function_call_output', + call_id: toolCall.id, + output: JSON.stringify(resultContent), }) } const thisToolsTime = Date.now() - toolsStartTime toolsTime += thisToolsTime - const nextPayload = { - ...payload, - messages: currentMessages, - } - - if (typeof originalToolChoice === 'object' && hasUsedForcedTool && forcedTools.length > 0) { + if (typeof currentToolChoice === 'object' && hasUsedForcedTool && forcedTools.length > 0) { const remainingTools = forcedTools.filter((tool) => !usedForcedTools.includes(tool)) if (remainingTools.length > 0) { - nextPayload.tool_choice = { + currentToolChoice = { type: 'function', - function: { name: remainingTools[0] }, + name: remainingTools[0], } logger.info(`Forcing next tool: ${remainingTools[0]}`) } else { - nextPayload.tool_choice = 'auto' + currentToolChoice = 'auto' logger.info('All forced tools have been used, switching to auto tool_choice') } } const nextModelStartTime = Date.now() - currentResponse = await openai.chat.completions.create(nextPayload) + currentResponse = await postResponses( + createRequestBody(currentInput, { tool_choice: currentToolChoice }) + ) - checkForForcedToolUsage(currentResponse, nextPayload.tool_choice) + checkForForcedToolUsage(extractResponseToolCalls(currentResponse.output), currentToolChoice) const nextModelEndTime = Date.now() const thisModelTime = nextModelEndTime - nextModelStartTime @@ -432,10 +502,11 @@ export const openaiProvider: ProviderConfig = { modelTime += thisModelTime - if (currentResponse.usage) { - tokens.input += currentResponse.usage.prompt_tokens || 0 - tokens.output += currentResponse.usage.completion_tokens || 0 - tokens.total += currentResponse.usage.total_tokens || 0 + const usage = parseResponsesUsage(currentResponse.usage) + if (usage) { + tokens.input += usage.promptTokens + tokens.output += usage.completionTokens + tokens.total += usage.totalTokens } iterationCount++ @@ -446,28 +517,32 @@ export const openaiProvider: ProviderConfig = { const accumulatedCost = calculateCost(request.model, tokens.input, tokens.output) - const streamingParams: ChatCompletionCreateParamsStreaming = { - ...payload, - messages: currentMessages, - tool_choice: 'auto', - stream: true, - stream_options: { include_usage: true }, + const streamResponse = await fetch(responsesEndpoint, { + method: 'POST', + headers, + body: JSON.stringify( + createRequestBody(currentInput, { stream: true, tool_choice: 'auto' }) + ), + }) + + if (!streamResponse.ok) { + const message = await parseErrorResponse(streamResponse) + throw new Error(`OpenAI API error (${streamResponse.status}): ${message}`) } - const streamResponse = await openai.chat.completions.create(streamingParams) const streamingResult = { - stream: createReadableStreamFromOpenAIStream(streamResponse, (content, usage) => { + stream: createReadableStreamFromResponses(streamResponse, (content, usage) => { streamingResult.execution.output.content = content streamingResult.execution.output.tokens = { - input: tokens.input + usage.prompt_tokens, - output: tokens.output + usage.completion_tokens, - total: tokens.total + usage.total_tokens, + input: tokens.input + (usage?.promptTokens || 0), + output: tokens.output + (usage?.completionTokens || 0), + total: tokens.total + (usage?.totalTokens || 0), } const streamCost = calculateCost( request.model, - usage.prompt_tokens, - usage.completion_tokens + usage?.promptTokens || 0, + usage?.completionTokens || 0 ) streamingResult.execution.output.cost = { input: accumulatedCost.input + streamCost.input, diff --git a/apps/sim/providers/openai/utils.ts b/apps/sim/providers/openai/utils.ts deleted file mode 100644 index bd04d601c7..0000000000 --- a/apps/sim/providers/openai/utils.ts +++ /dev/null @@ -1,15 +0,0 @@ -import type { ChatCompletionChunk } from 'openai/resources/chat/completions' -import type { CompletionUsage } from 'openai/resources/completions' -import type { Stream } from 'openai/streaming' -import { createOpenAICompatibleStream } from '@/providers/utils' - -/** - * Creates a ReadableStream from an OpenAI streaming response. - * Uses the shared OpenAI-compatible streaming utility. - */ -export function createReadableStreamFromOpenAIStream( - openaiStream: Stream, - onComplete?: (content: string, usage: CompletionUsage) => void -): ReadableStream { - return createOpenAICompatibleStream(openaiStream, 'OpenAI', onComplete) -} diff --git a/apps/sim/providers/responses-utils.ts b/apps/sim/providers/responses-utils.ts new file mode 100644 index 0000000000..6245796e0e --- /dev/null +++ b/apps/sim/providers/responses-utils.ts @@ -0,0 +1,386 @@ +import { createLogger } from '@sim/logger' +import type { Message } from '@/providers/types' + +const logger = createLogger('ResponsesUtils') + +export interface ResponsesUsageTokens { + promptTokens: number + completionTokens: number + totalTokens: number + cachedTokens: number + reasoningTokens: number +} + +export interface ResponsesToolCall { + id: string + name: string + arguments: string +} + +export type ResponsesInputItem = + | { + role: 'system' | 'user' | 'assistant' + content: string + } + | { + type: 'function_call' + call_id: string + name: string + arguments: string + } + | { + type: 'function_call_output' + call_id: string + output: string + } + +export interface ResponsesToolDefinition { + type: 'function' + name: string + description?: string + parameters?: Record +} + +/** + * Converts chat-style messages into Responses API input items. + */ +export function buildResponsesInputFromMessages(messages: Message[]): ResponsesInputItem[] { + const input: ResponsesInputItem[] = [] + + for (const message of messages) { + if (message.role === 'tool' && message.tool_call_id) { + input.push({ + type: 'function_call_output', + call_id: message.tool_call_id, + output: message.content ?? '', + }) + continue + } + + if (message.content) { + input.push({ + role: message.role, + content: message.content, + }) + } + + if (message.tool_calls?.length) { + for (const toolCall of message.tool_calls) { + input.push({ + type: 'function_call', + call_id: toolCall.id, + name: toolCall.function.name, + arguments: toolCall.function.arguments, + }) + } + } + } + + return input +} + +/** + * Converts tool definitions to the Responses API format. + */ +export function convertToolsToResponses(tools: any[]): ResponsesToolDefinition[] { + return tools + .map((tool) => { + const name = tool.function?.name ?? tool.name + if (!name) { + return null + } + + return { + type: 'function' as const, + name, + description: tool.function?.description ?? tool.description, + parameters: tool.function?.parameters ?? tool.parameters, + } + }) + .filter(Boolean) as ResponsesToolDefinition[] +} + +/** + * Converts tool_choice to the Responses API format. + */ +export function toResponsesToolChoice( + toolChoice: + | 'auto' + | 'none' + | { type: 'function'; function?: { name: string }; name?: string } + | { type: 'tool'; name: string } + | { type: 'any'; any: { model: string; name: string } } + | undefined +): 'auto' | 'none' | { type: 'function'; name: string } | undefined { + if (!toolChoice) { + return undefined + } + + if (typeof toolChoice === 'string') { + return toolChoice + } + + if (toolChoice.type === 'function') { + const name = toolChoice.name ?? toolChoice.function?.name + return name ? { type: 'function', name } : undefined + } + + return 'auto' +} + +function extractTextFromMessageItem(item: any): string { + if (!item) { + return '' + } + + if (typeof item.content === 'string') { + return item.content + } + + if (!Array.isArray(item.content)) { + return '' + } + + const textParts: string[] = [] + for (const part of item.content) { + if ( + part && + (part.type === 'output_text' || part.type === 'text') && + typeof part.text === 'string' + ) { + textParts.push(part.text) + } + } + + return textParts.join('') +} + +/** + * Extracts plain text from Responses API output items. + */ +export function extractResponseText(output: unknown): string { + if (!Array.isArray(output)) { + return '' + } + + const textParts: string[] = [] + for (const item of output) { + if (item?.type !== 'message') { + continue + } + + const text = extractTextFromMessageItem(item) + if (text) { + textParts.push(text) + } + } + + return textParts.join('') +} + +/** + * Converts Responses API output items into input items for subsequent calls. + */ +export function convertResponseOutputToInputItems(output: unknown): ResponsesInputItem[] { + if (!Array.isArray(output)) { + return [] + } + + const items: ResponsesInputItem[] = [] + for (const item of output) { + if (!item || typeof item !== 'object') { + continue + } + + if (item.type === 'message') { + const text = extractTextFromMessageItem(item) + if (text) { + items.push({ + role: 'assistant', + content: text, + }) + } + continue + } + + if (item.type === 'function_call') { + const callId = item.call_id ?? item.id + const name = item.name ?? item.function?.name + if (!callId || !name) { + continue + } + + const argumentsValue = + typeof item.arguments === 'string' ? item.arguments : JSON.stringify(item.arguments ?? {}) + + items.push({ + type: 'function_call', + call_id: callId, + name, + arguments: argumentsValue, + }) + } + } + + return items +} + +/** + * Extracts tool calls from Responses API output items. + */ +export function extractResponseToolCalls(output: unknown): ResponsesToolCall[] { + if (!Array.isArray(output)) { + return [] + } + + return output + .map((item) => { + if (!item || item.type !== 'function_call') { + return null + } + + const callId = item.call_id ?? item.id + const name = item.name ?? item.function?.name + if (!callId || !name) { + return null + } + + const argumentsValue = + typeof item.arguments === 'string' ? item.arguments : JSON.stringify(item.arguments ?? {}) + + return { + id: callId, + name, + arguments: argumentsValue, + } + }) + .filter(Boolean) as ResponsesToolCall[] +} + +/** + * Maps Responses API usage data to prompt/completion token counts. + */ +export function parseResponsesUsage(usage: any): ResponsesUsageTokens | undefined { + if (!usage || typeof usage !== 'object') { + return undefined + } + + const inputTokens = Number(usage.input_tokens ?? 0) + const outputTokens = Number(usage.output_tokens ?? 0) + const cachedTokens = Number(usage.input_tokens_details?.cached_tokens ?? 0) + const reasoningTokens = Number(usage.output_tokens_details?.reasoning_tokens ?? 0) + const completionTokens = outputTokens + reasoningTokens + const totalTokens = inputTokens + completionTokens + + return { + promptTokens: inputTokens, + completionTokens, + totalTokens, + cachedTokens, + reasoningTokens, + } +} + +/** + * Creates a ReadableStream from a Responses API SSE stream. + */ +export function createReadableStreamFromResponses( + response: Response, + onComplete?: (content: string, usage?: ResponsesUsageTokens) => void +): ReadableStream { + let fullContent = '' + let finalUsage: ResponsesUsageTokens | undefined + let activeEventType: string | undefined + const encoder = new TextEncoder() + + return new ReadableStream({ + async start(controller) { + const reader = response.body?.getReader() + if (!reader) { + controller.close() + return + } + + const decoder = new TextDecoder() + let buffer = '' + + try { + while (true) { + const { done, value } = await reader.read() + if (done) { + break + } + + buffer += decoder.decode(value, { stream: true }) + const lines = buffer.split('\n') + buffer = lines.pop() || '' + + for (const line of lines) { + const trimmed = line.trim() + if (!trimmed) { + continue + } + + if (trimmed.startsWith('event:')) { + activeEventType = trimmed.slice(6).trim() + continue + } + + if (!trimmed.startsWith('data:')) { + continue + } + + const data = trimmed.slice(5).trim() + if (data === '[DONE]') { + continue + } + + let event: any + try { + event = JSON.parse(data) + } catch (error) { + logger.debug('Skipping non-JSON response stream chunk', { + data: data.slice(0, 200), + error, + }) + continue + } + + const eventType = event?.type ?? activeEventType + + if ( + eventType === 'response.error' || + eventType === 'error' || + activeEventType === 'response.failed' + ) { + const message = event?.error?.message || 'Responses API stream error' + controller.error(new Error(message)) + return + } + + if (eventType === 'response.output_text.delta') { + if (typeof event.delta === 'string' && event.delta.length > 0) { + fullContent += event.delta + controller.enqueue(encoder.encode(event.delta)) + } + } + + if (eventType === 'response.completed') { + finalUsage = parseResponsesUsage(event?.response?.usage ?? event?.usage) + } + } + } + + if (onComplete) { + onComplete(fullContent, finalUsage) + } + + controller.close() + } catch (error) { + controller.error(error) + } finally { + reader.releaseLock() + } + }, + }) +} From 5a30b832f376b49be15761fb5a222be03d03c574 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Wed, 4 Feb 2026 13:21:59 -0800 Subject: [PATCH 02/13] Consolidate azure --- apps/sim/providers/azure-openai/index.ts | 628 +--------------------- apps/sim/providers/openai/index.ts | 628 +--------------------- apps/sim/providers/responses-provider.ts | 640 +++++++++++++++++++++++ 3 files changed, 666 insertions(+), 1230 deletions(-) create mode 100644 apps/sim/providers/responses-provider.ts diff --git a/apps/sim/providers/azure-openai/index.ts b/apps/sim/providers/azure-openai/index.ts index f535dd27f5..32049b2a84 100644 --- a/apps/sim/providers/azure-openai/index.ts +++ b/apps/sim/providers/azure-openai/index.ts @@ -1,34 +1,9 @@ import { createLogger } from '@sim/logger' import { env } from '@/lib/core/config/env' import type { StreamingExecution } from '@/executor/types' -import { MAX_TOOL_ITERATIONS } from '@/providers' import { getProviderDefaultModel, getProviderModels } from '@/providers/models' -import { - buildResponsesInputFromMessages, - convertResponseOutputToInputItems, - convertToolsToResponses, - createReadableStreamFromResponses, - extractResponseText, - extractResponseToolCalls, - parseResponsesUsage, - type ResponsesInputItem, - type ResponsesToolCall, - toResponsesToolChoice, -} from '@/providers/responses-utils' -import type { - Message, - ProviderConfig, - ProviderRequest, - ProviderResponse, - TimeSegment, -} from '@/providers/types' -import { - calculateCost, - prepareToolExecution, - prepareToolsWithUsageControl, - trackForcedToolUsage, -} from '@/providers/utils' -import { executeTool } from '@/tools' +import { executeResponsesProviderRequest } from '@/providers/responses-provider' +import type { ProviderConfig, ProviderRequest, ProviderResponse } from '@/providers/types' const logger = createLogger('AzureOpenAIProvider') @@ -46,16 +21,6 @@ export const azureOpenAIProvider: ProviderConfig = { executeRequest: async ( request: ProviderRequest ): Promise => { - logger.info('Preparing Azure OpenAI request', { - model: request.model, - hasSystemPrompt: !!request.systemPrompt, - hasMessages: !!request.messages?.length, - hasTools: !!request.tools?.length, - toolCount: request.tools?.length || 0, - hasResponseFormat: !!request.responseFormat, - stream: !!request.stream, - }) - const azureEndpoint = request.azureEndpoint || env.AZURE_OPENAI_ENDPOINT const azureApiVersion = request.azureApiVersion || env.AZURE_OPENAI_API_VERSION || '2024-07-01-preview' @@ -73,584 +38,17 @@ export const azureOpenAIProvider: ProviderConfig = { const deploymentName = request.model.replace('azure/', '') const apiUrl = `${azureEndpoint.replace(/\/$/, '')}/openai/v1/responses?api-version=${azureApiVersion}` - const allMessages: Message[] = [] - - if (request.systemPrompt) { - allMessages.push({ - role: 'system', - content: request.systemPrompt, - }) - } - - if (request.context) { - allMessages.push({ - role: 'user', - content: request.context, - }) - } - - if (request.messages) { - allMessages.push(...request.messages) - } - - const initialInput = buildResponsesInputFromMessages(allMessages) - - const basePayload: Record = { - model: deploymentName, - } - - if (request.temperature !== undefined) basePayload.temperature = request.temperature - if (request.maxTokens != null) basePayload.max_output_tokens = request.maxTokens - - if (request.reasoningEffort !== undefined) { - basePayload.reasoning = { - effort: request.reasoningEffort, - summary: 'auto', - } - } - - if (request.verbosity !== undefined) { - basePayload.text = { - ...(basePayload.text ?? {}), - verbosity: request.verbosity, - } - } - - if (request.responseFormat) { - basePayload.response_format = { - type: 'json_schema', - json_schema: { - name: request.responseFormat.name || 'response_schema', - schema: request.responseFormat.schema || request.responseFormat, - strict: request.responseFormat.strict !== false, - }, - } - - logger.info('Added JSON schema response format to Azure OpenAI request') - } - - const tools = request.tools?.length - ? request.tools.map((tool) => ({ - type: 'function', - function: { - name: tool.id, - description: tool.description, - parameters: tool.parameters, - }, - })) - : undefined - - let preparedTools: ReturnType | null = null - let responsesToolChoice: ReturnType | undefined - - if (tools?.length) { - preparedTools = prepareToolsWithUsageControl(tools, request.tools, logger, 'azure-openai') - const { tools: filteredTools, toolChoice } = preparedTools - - if (filteredTools?.length) { - const convertedTools = convertToolsToResponses(filteredTools) - if (!convertedTools.length) { - throw new Error('All tools have empty names') - } - - basePayload.tools = convertedTools - basePayload.parallel_tool_calls = true - } - - if (toolChoice) { - responsesToolChoice = toResponsesToolChoice(toolChoice) - if (responsesToolChoice) { - basePayload.tool_choice = responsesToolChoice - } - - logger.info('Azure OpenAI request configuration:', { - toolCount: filteredTools?.length || 0, - toolChoice: - typeof toolChoice === 'string' - ? toolChoice - : toolChoice.type === 'function' - ? `force:${toolChoice.function?.name}` - : toolChoice.type === 'tool' - ? `force:${toolChoice.name}` - : toolChoice.type === 'any' - ? `force:${toolChoice.any?.name || 'unknown'}` - : 'unknown', - model: deploymentName, - }) - } - } - - const headers: Record = { - 'Content-Type': 'application/json', - 'OpenAI-Beta': 'responses=v1', - 'api-key': request.apiKey, - } - - const createRequestBody = ( - input: ResponsesInputItem[], - overrides: Record = {} - ) => ({ - ...basePayload, - input, - ...overrides, + return executeResponsesProviderRequest(request, { + providerId: 'azure-openai', + providerLabel: 'Azure OpenAI', + modelName: deploymentName, + endpoint: apiUrl, + headers: { + 'Content-Type': 'application/json', + 'OpenAI-Beta': 'responses=v1', + 'api-key': request.apiKey, + }, + logger, }) - - const parseErrorResponse = async (response: Response): Promise => { - const text = await response.text() - try { - const payload = JSON.parse(text) - return payload?.error?.message || text - } catch { - return text - } - } - - const postResponses = async (body: Record) => { - const response = await fetch(apiUrl, { - method: 'POST', - headers, - body: JSON.stringify(body), - }) - - if (!response.ok) { - const message = await parseErrorResponse(response) - throw new Error(`Azure OpenAI API error (${response.status}): ${message}`) - } - - return response.json() - } - - const providerStartTime = Date.now() - const providerStartTimeISO = new Date(providerStartTime).toISOString() - - try { - if (request.stream && (!tools || tools.length === 0)) { - logger.info('Using streaming response for Azure OpenAI request') - - const streamResponse = await fetch(apiUrl, { - method: 'POST', - headers, - body: JSON.stringify(createRequestBody(initialInput, { stream: true })), - }) - - if (!streamResponse.ok) { - const message = await parseErrorResponse(streamResponse) - throw new Error(`Azure OpenAI API error (${streamResponse.status}): ${message}`) - } - - const streamingResult = { - stream: createReadableStreamFromResponses(streamResponse, (content, usage) => { - streamingResult.execution.output.content = content - streamingResult.execution.output.tokens = { - input: usage?.promptTokens || 0, - output: usage?.completionTokens || 0, - total: usage?.totalTokens || 0, - } - - const costResult = calculateCost( - request.model, - usage?.promptTokens || 0, - usage?.completionTokens || 0 - ) - streamingResult.execution.output.cost = { - input: costResult.input, - output: costResult.output, - total: costResult.total, - } - - const streamEndTime = Date.now() - const streamEndTimeISO = new Date(streamEndTime).toISOString() - - if (streamingResult.execution.output.providerTiming) { - streamingResult.execution.output.providerTiming.endTime = streamEndTimeISO - streamingResult.execution.output.providerTiming.duration = - streamEndTime - providerStartTime - - if (streamingResult.execution.output.providerTiming.timeSegments?.[0]) { - streamingResult.execution.output.providerTiming.timeSegments[0].endTime = - streamEndTime - streamingResult.execution.output.providerTiming.timeSegments[0].duration = - streamEndTime - providerStartTime - } - } - }), - execution: { - success: true, - output: { - content: '', - model: request.model, - tokens: { input: 0, output: 0, total: 0 }, - toolCalls: undefined, - providerTiming: { - startTime: providerStartTimeISO, - endTime: new Date().toISOString(), - duration: Date.now() - providerStartTime, - timeSegments: [ - { - type: 'model', - name: 'Streaming response', - startTime: providerStartTime, - endTime: Date.now(), - duration: Date.now() - providerStartTime, - }, - ], - }, - cost: { input: 0, output: 0, total: 0 }, - }, - logs: [], - metadata: { - startTime: providerStartTimeISO, - endTime: new Date().toISOString(), - duration: Date.now() - providerStartTime, - }, - }, - } as StreamingExecution - - return streamingResult as StreamingExecution - } - - const initialCallTime = Date.now() - const forcedTools = preparedTools?.forcedTools || [] - let usedForcedTools: string[] = [] - let hasUsedForcedTool = false - let currentToolChoice = responsesToolChoice - - const checkForForcedToolUsage = ( - toolCallsInResponse: ResponsesToolCall[], - toolChoice: string | { type: string; name?: string } | undefined - ) => { - if (typeof toolChoice === 'object' && toolCallsInResponse.length > 0) { - const result = trackForcedToolUsage( - toolCallsInResponse, - toolChoice, - logger, - 'azure-openai', - forcedTools, - usedForcedTools - ) - hasUsedForcedTool = result.hasUsedForcedTool - usedForcedTools = result.usedForcedTools - } - } - - const currentInput: ResponsesInputItem[] = [...initialInput] - let currentResponse = await postResponses( - createRequestBody(currentInput, { tool_choice: currentToolChoice }) - ) - const firstResponseTime = Date.now() - initialCallTime - - const initialUsage = parseResponsesUsage(currentResponse.usage) - const tokens = { - input: initialUsage?.promptTokens || 0, - output: initialUsage?.completionTokens || 0, - total: initialUsage?.totalTokens || 0, - } - - const toolCalls = [] - const toolResults = [] - let iterationCount = 0 - let modelTime = firstResponseTime - let toolsTime = 0 - let content = extractResponseText(currentResponse.output) || '' - - const timeSegments: TimeSegment[] = [ - { - type: 'model', - name: 'Initial response', - startTime: initialCallTime, - endTime: initialCallTime + firstResponseTime, - duration: firstResponseTime, - }, - ] - - checkForForcedToolUsage(extractResponseToolCalls(currentResponse.output), currentToolChoice) - - while (iterationCount < MAX_TOOL_ITERATIONS) { - const responseText = extractResponseText(currentResponse.output) - if (responseText) { - content = responseText - } - - const toolCallsInResponse = extractResponseToolCalls(currentResponse.output) - if (!toolCallsInResponse.length) { - break - } - - const outputInputItems = convertResponseOutputToInputItems(currentResponse.output) - if (outputInputItems.length) { - currentInput.push(...outputInputItems) - } - - logger.info( - `Processing ${toolCallsInResponse.length} tool calls in parallel (iteration ${ - iterationCount + 1 - }/${MAX_TOOL_ITERATIONS})` - ) - - const toolsStartTime = Date.now() - - const toolExecutionPromises = toolCallsInResponse.map(async (toolCall) => { - const toolCallStartTime = Date.now() - const toolName = toolCall.name - - try { - const toolArgs = toolCall.arguments ? JSON.parse(toolCall.arguments) : {} - const tool = request.tools?.find((t) => t.id === toolName) - - if (!tool) { - return null - } - - const { toolParams, executionParams } = prepareToolExecution(tool, toolArgs, request) - const result = await executeTool(toolName, executionParams) - const toolCallEndTime = Date.now() - - return { - toolCall, - toolName, - toolParams, - result, - startTime: toolCallStartTime, - endTime: toolCallEndTime, - duration: toolCallEndTime - toolCallStartTime, - } - } catch (error) { - const toolCallEndTime = Date.now() - logger.error('Error processing tool call:', { error, toolName }) - - return { - toolCall, - toolName, - toolParams: {}, - result: { - success: false, - output: undefined, - error: error instanceof Error ? error.message : 'Tool execution failed', - }, - startTime: toolCallStartTime, - endTime: toolCallEndTime, - duration: toolCallEndTime - toolCallStartTime, - } - } - }) - - const executionResults = await Promise.allSettled(toolExecutionPromises) - - for (const settledResult of executionResults) { - if (settledResult.status === 'rejected' || !settledResult.value) continue - - const { toolCall, toolName, toolParams, result, startTime, endTime, duration } = - settledResult.value - - timeSegments.push({ - type: 'tool', - name: toolName, - startTime: startTime, - endTime: endTime, - duration: duration, - }) - - let resultContent: any - if (result.success) { - toolResults.push(result.output) - resultContent = result.output - } else { - resultContent = { - error: true, - message: result.error || 'Tool execution failed', - tool: toolName, - } - } - - toolCalls.push({ - name: toolName, - arguments: toolParams, - startTime: new Date(startTime).toISOString(), - endTime: new Date(endTime).toISOString(), - duration: duration, - result: resultContent, - success: result.success, - }) - - currentInput.push({ - type: 'function_call_output', - call_id: toolCall.id, - output: JSON.stringify(resultContent), - }) - } - - const thisToolsTime = Date.now() - toolsStartTime - toolsTime += thisToolsTime - - if (typeof currentToolChoice === 'object' && hasUsedForcedTool && forcedTools.length > 0) { - const remainingTools = forcedTools.filter((tool) => !usedForcedTools.includes(tool)) - - if (remainingTools.length > 0) { - currentToolChoice = { - type: 'function', - name: remainingTools[0], - } - logger.info(`Forcing next tool: ${remainingTools[0]}`) - } else { - currentToolChoice = 'auto' - logger.info('All forced tools have been used, switching to auto tool_choice') - } - } - - const nextModelStartTime = Date.now() - - currentResponse = await postResponses( - createRequestBody(currentInput, { tool_choice: currentToolChoice }) - ) - - checkForForcedToolUsage(extractResponseToolCalls(currentResponse.output), currentToolChoice) - - const nextModelEndTime = Date.now() - const thisModelTime = nextModelEndTime - nextModelStartTime - - timeSegments.push({ - type: 'model', - name: `Model response (iteration ${iterationCount + 1})`, - startTime: nextModelStartTime, - endTime: nextModelEndTime, - duration: thisModelTime, - }) - - modelTime += thisModelTime - - const usage = parseResponsesUsage(currentResponse.usage) - if (usage) { - tokens.input += usage.promptTokens - tokens.output += usage.completionTokens - tokens.total += usage.totalTokens - } - - iterationCount++ - } - - if (request.stream) { - logger.info('Using streaming for final response after tool processing') - - const accumulatedCost = calculateCost(request.model, tokens.input, tokens.output) - - const streamResponse = await fetch(apiUrl, { - method: 'POST', - headers, - body: JSON.stringify( - createRequestBody(currentInput, { stream: true, tool_choice: 'auto' }) - ), - }) - - if (!streamResponse.ok) { - const message = await parseErrorResponse(streamResponse) - throw new Error(`Azure OpenAI API error (${streamResponse.status}): ${message}`) - } - - const streamingResult = { - stream: createReadableStreamFromResponses(streamResponse, (content, usage) => { - streamingResult.execution.output.content = content - streamingResult.execution.output.tokens = { - input: tokens.input + (usage?.promptTokens || 0), - output: tokens.output + (usage?.completionTokens || 0), - total: tokens.total + (usage?.totalTokens || 0), - } - - const streamCost = calculateCost( - request.model, - usage?.promptTokens || 0, - usage?.completionTokens || 0 - ) - streamingResult.execution.output.cost = { - input: accumulatedCost.input + streamCost.input, - output: accumulatedCost.output + streamCost.output, - total: accumulatedCost.total + streamCost.total, - } - }), - execution: { - success: true, - output: { - content: '', - model: request.model, - tokens: { - input: tokens.input, - output: tokens.output, - total: tokens.total, - }, - toolCalls: - toolCalls.length > 0 - ? { - list: toolCalls, - count: toolCalls.length, - } - : undefined, - providerTiming: { - startTime: providerStartTimeISO, - endTime: new Date().toISOString(), - duration: Date.now() - providerStartTime, - modelTime: modelTime, - toolsTime: toolsTime, - firstResponseTime: firstResponseTime, - iterations: iterationCount + 1, - timeSegments: timeSegments, - }, - cost: { - input: accumulatedCost.input, - output: accumulatedCost.output, - total: accumulatedCost.total, - }, - }, - logs: [], - metadata: { - startTime: providerStartTimeISO, - endTime: new Date().toISOString(), - duration: Date.now() - providerStartTime, - }, - }, - } as StreamingExecution - - return streamingResult as StreamingExecution - } - - const providerEndTime = Date.now() - const providerEndTimeISO = new Date(providerEndTime).toISOString() - const totalDuration = providerEndTime - providerStartTime - - return { - content, - model: request.model, - tokens, - toolCalls: toolCalls.length > 0 ? toolCalls : undefined, - toolResults: toolResults.length > 0 ? toolResults : undefined, - timing: { - startTime: providerStartTimeISO, - endTime: providerEndTimeISO, - duration: totalDuration, - modelTime: modelTime, - toolsTime: toolsTime, - firstResponseTime: firstResponseTime, - iterations: iterationCount + 1, - timeSegments: timeSegments, - }, - } - } catch (error) { - const providerEndTime = Date.now() - const providerEndTimeISO = new Date(providerEndTime).toISOString() - const totalDuration = providerEndTime - providerStartTime - - logger.error('Error in Azure OpenAI request:', { - error, - duration: totalDuration, - }) - - const enhancedError = new Error(error instanceof Error ? error.message : String(error)) - // @ts-ignore - Adding timing property to the error - enhancedError.timing = { - startTime: providerStartTimeISO, - endTime: providerEndTimeISO, - duration: totalDuration, - } - - throw enhancedError - } }, } diff --git a/apps/sim/providers/openai/index.ts b/apps/sim/providers/openai/index.ts index f4cf93306e..d254a0edda 100644 --- a/apps/sim/providers/openai/index.ts +++ b/apps/sim/providers/openai/index.ts @@ -1,33 +1,8 @@ import { createLogger } from '@sim/logger' import type { StreamingExecution } from '@/executor/types' -import { MAX_TOOL_ITERATIONS } from '@/providers' import { getProviderDefaultModel, getProviderModels } from '@/providers/models' -import { - buildResponsesInputFromMessages, - convertResponseOutputToInputItems, - convertToolsToResponses, - createReadableStreamFromResponses, - extractResponseText, - extractResponseToolCalls, - parseResponsesUsage, - type ResponsesInputItem, - type ResponsesToolCall, - toResponsesToolChoice, -} from '@/providers/responses-utils' -import type { - Message, - ProviderConfig, - ProviderRequest, - ProviderResponse, - TimeSegment, -} from '@/providers/types' -import { - calculateCost, - prepareToolExecution, - prepareToolsWithUsageControl, - trackForcedToolUsage, -} from '@/providers/utils' -import { executeTool } from '@/tools' +import { executeResponsesProviderRequest } from '@/providers/responses-provider' +import type { ProviderConfig, ProviderRequest, ProviderResponse } from '@/providers/types' const logger = createLogger('OpenAIProvider') const responsesEndpoint = 'https://api.openai.com/v1/responses' @@ -43,598 +18,21 @@ export const openaiProvider: ProviderConfig = { executeRequest: async ( request: ProviderRequest ): Promise => { - logger.info('Preparing OpenAI request', { - model: request.model, - hasSystemPrompt: !!request.systemPrompt, - hasMessages: !!request.messages?.length, - hasTools: !!request.tools?.length, - toolCount: request.tools?.length || 0, - hasResponseFormat: !!request.responseFormat, - stream: !!request.stream, - }) - if (!request.apiKey) { throw new Error('API key is required for OpenAI') } - const allMessages: Message[] = [] - - if (request.systemPrompt) { - allMessages.push({ - role: 'system', - content: request.systemPrompt, - }) - } - - if (request.context) { - allMessages.push({ - role: 'user', - content: request.context, - }) - } - - if (request.messages) { - allMessages.push(...request.messages) - } - - const initialInput = buildResponsesInputFromMessages(allMessages) - - const basePayload: Record = { - model: request.model, - } - - if (request.temperature !== undefined) basePayload.temperature = request.temperature - if (request.maxTokens != null) basePayload.max_output_tokens = request.maxTokens - - if (request.reasoningEffort !== undefined) { - basePayload.reasoning = { - effort: request.reasoningEffort, - summary: 'auto', - } - } - - if (request.verbosity !== undefined) { - basePayload.text = { - ...(basePayload.text ?? {}), - verbosity: request.verbosity, - } - } - - if (request.responseFormat) { - basePayload.response_format = { - type: 'json_schema', - json_schema: { - name: request.responseFormat.name || 'response_schema', - schema: request.responseFormat.schema || request.responseFormat, - strict: request.responseFormat.strict !== false, - }, - } - - logger.info('Added JSON schema response format to request') - } - - const tools = request.tools?.length - ? request.tools.map((tool) => ({ - type: 'function', - function: { - name: tool.id, - description: tool.description, - parameters: tool.parameters, - }, - })) - : undefined - - let preparedTools: ReturnType | null = null - let responsesToolChoice: ReturnType | undefined - - if (tools?.length) { - preparedTools = prepareToolsWithUsageControl(tools, request.tools, logger, 'openai') - const { tools: filteredTools, toolChoice } = preparedTools - - if (filteredTools?.length) { - const convertedTools = convertToolsToResponses(filteredTools) - if (!convertedTools.length) { - throw new Error('All tools have empty names') - } - - basePayload.tools = convertedTools - basePayload.parallel_tool_calls = true - } - - if (toolChoice) { - responsesToolChoice = toResponsesToolChoice(toolChoice) - if (responsesToolChoice) { - basePayload.tool_choice = responsesToolChoice - } - - logger.info('OpenAI request configuration:', { - toolCount: filteredTools?.length || 0, - toolChoice: - typeof toolChoice === 'string' - ? toolChoice - : toolChoice.type === 'function' - ? `force:${toolChoice.function?.name}` - : toolChoice.type === 'tool' - ? `force:${toolChoice.name}` - : toolChoice.type === 'any' - ? `force:${toolChoice.any?.name || 'unknown'}` - : 'unknown', - model: request.model, - }) - } - } - - const headers: Record = { - Authorization: `Bearer ${request.apiKey}`, - 'Content-Type': 'application/json', - 'OpenAI-Beta': 'responses=v1', - } - - const createRequestBody = ( - input: ResponsesInputItem[], - overrides: Record = {} - ) => ({ - ...basePayload, - input, - ...overrides, + return executeResponsesProviderRequest(request, { + providerId: 'openai', + providerLabel: 'OpenAI', + modelName: request.model, + endpoint: responsesEndpoint, + headers: { + Authorization: `Bearer ${request.apiKey}`, + 'Content-Type': 'application/json', + 'OpenAI-Beta': 'responses=v1', + }, + logger, }) - - const parseErrorResponse = async (response: Response): Promise => { - const text = await response.text() - try { - const payload = JSON.parse(text) - return payload?.error?.message || text - } catch { - return text - } - } - - const postResponses = async (body: Record) => { - const response = await fetch(responsesEndpoint, { - method: 'POST', - headers, - body: JSON.stringify(body), - }) - - if (!response.ok) { - const message = await parseErrorResponse(response) - throw new Error(`OpenAI API error (${response.status}): ${message}`) - } - - return response.json() - } - - const providerStartTime = Date.now() - const providerStartTimeISO = new Date(providerStartTime).toISOString() - - try { - if (request.stream && (!tools || tools.length === 0)) { - logger.info('Using streaming response for OpenAI request') - - const streamResponse = await fetch(responsesEndpoint, { - method: 'POST', - headers, - body: JSON.stringify(createRequestBody(initialInput, { stream: true })), - }) - - if (!streamResponse.ok) { - const message = await parseErrorResponse(streamResponse) - throw new Error(`OpenAI API error (${streamResponse.status}): ${message}`) - } - - const streamingResult = { - stream: createReadableStreamFromResponses(streamResponse, (content, usage) => { - streamingResult.execution.output.content = content - streamingResult.execution.output.tokens = { - input: usage?.promptTokens || 0, - output: usage?.completionTokens || 0, - total: usage?.totalTokens || 0, - } - - const costResult = calculateCost( - request.model, - usage?.promptTokens || 0, - usage?.completionTokens || 0 - ) - streamingResult.execution.output.cost = { - input: costResult.input, - output: costResult.output, - total: costResult.total, - } - - const streamEndTime = Date.now() - const streamEndTimeISO = new Date(streamEndTime).toISOString() - - if (streamingResult.execution.output.providerTiming) { - streamingResult.execution.output.providerTiming.endTime = streamEndTimeISO - streamingResult.execution.output.providerTiming.duration = - streamEndTime - providerStartTime - - if (streamingResult.execution.output.providerTiming.timeSegments?.[0]) { - streamingResult.execution.output.providerTiming.timeSegments[0].endTime = - streamEndTime - streamingResult.execution.output.providerTiming.timeSegments[0].duration = - streamEndTime - providerStartTime - } - } - }), - execution: { - success: true, - output: { - content: '', - model: request.model, - tokens: { input: 0, output: 0, total: 0 }, - toolCalls: undefined, - providerTiming: { - startTime: providerStartTimeISO, - endTime: new Date().toISOString(), - duration: Date.now() - providerStartTime, - timeSegments: [ - { - type: 'model', - name: 'Streaming response', - startTime: providerStartTime, - endTime: Date.now(), - duration: Date.now() - providerStartTime, - }, - ], - }, - cost: { input: 0, output: 0, total: 0 }, - }, - logs: [], - metadata: { - startTime: providerStartTimeISO, - endTime: new Date().toISOString(), - duration: Date.now() - providerStartTime, - }, - }, - } as StreamingExecution - - return streamingResult as StreamingExecution - } - - const initialCallTime = Date.now() - const forcedTools = preparedTools?.forcedTools || [] - let usedForcedTools: string[] = [] - let hasUsedForcedTool = false - let currentToolChoice = responsesToolChoice - - const checkForForcedToolUsage = ( - toolCallsInResponse: ResponsesToolCall[], - toolChoice: string | { type: string; name?: string } | undefined - ) => { - if (typeof toolChoice === 'object' && toolCallsInResponse.length > 0) { - const result = trackForcedToolUsage( - toolCallsInResponse, - toolChoice, - logger, - 'openai', - forcedTools, - usedForcedTools - ) - hasUsedForcedTool = result.hasUsedForcedTool - usedForcedTools = result.usedForcedTools - } - } - - const currentInput: ResponsesInputItem[] = [...initialInput] - let currentResponse = await postResponses( - createRequestBody(currentInput, { tool_choice: currentToolChoice }) - ) - const firstResponseTime = Date.now() - initialCallTime - - const initialUsage = parseResponsesUsage(currentResponse.usage) - const tokens = { - input: initialUsage?.promptTokens || 0, - output: initialUsage?.completionTokens || 0, - total: initialUsage?.totalTokens || 0, - } - - const toolCalls = [] - const toolResults = [] - let iterationCount = 0 - let modelTime = firstResponseTime - let toolsTime = 0 - let content = extractResponseText(currentResponse.output) || '' - - const timeSegments: TimeSegment[] = [ - { - type: 'model', - name: 'Initial response', - startTime: initialCallTime, - endTime: initialCallTime + firstResponseTime, - duration: firstResponseTime, - }, - ] - - checkForForcedToolUsage(extractResponseToolCalls(currentResponse.output), currentToolChoice) - - while (iterationCount < MAX_TOOL_ITERATIONS) { - const responseText = extractResponseText(currentResponse.output) - if (responseText) { - content = responseText - } - - const toolCallsInResponse = extractResponseToolCalls(currentResponse.output) - if (!toolCallsInResponse.length) { - break - } - - const outputInputItems = convertResponseOutputToInputItems(currentResponse.output) - if (outputInputItems.length) { - currentInput.push(...outputInputItems) - } - - logger.info( - `Processing ${toolCallsInResponse.length} tool calls in parallel (iteration ${ - iterationCount + 1 - }/${MAX_TOOL_ITERATIONS})` - ) - - const toolsStartTime = Date.now() - - const toolExecutionPromises = toolCallsInResponse.map(async (toolCall) => { - const toolCallStartTime = Date.now() - const toolName = toolCall.name - - try { - const toolArgs = toolCall.arguments ? JSON.parse(toolCall.arguments) : {} - const tool = request.tools?.find((t) => t.id === toolName) - - if (!tool) { - return null - } - - const { toolParams, executionParams } = prepareToolExecution(tool, toolArgs, request) - const result = await executeTool(toolName, executionParams) - const toolCallEndTime = Date.now() - - return { - toolCall, - toolName, - toolParams, - result, - startTime: toolCallStartTime, - endTime: toolCallEndTime, - duration: toolCallEndTime - toolCallStartTime, - } - } catch (error) { - const toolCallEndTime = Date.now() - logger.error('Error processing tool call:', { error, toolName }) - - return { - toolCall, - toolName, - toolParams: {}, - result: { - success: false, - output: undefined, - error: error instanceof Error ? error.message : 'Tool execution failed', - }, - startTime: toolCallStartTime, - endTime: toolCallEndTime, - duration: toolCallEndTime - toolCallStartTime, - } - } - }) - - const executionResults = await Promise.allSettled(toolExecutionPromises) - - for (const settledResult of executionResults) { - if (settledResult.status === 'rejected' || !settledResult.value) continue - - const { toolCall, toolName, toolParams, result, startTime, endTime, duration } = - settledResult.value - - timeSegments.push({ - type: 'tool', - name: toolName, - startTime: startTime, - endTime: endTime, - duration: duration, - }) - - let resultContent: any - if (result.success) { - toolResults.push(result.output) - resultContent = result.output - } else { - resultContent = { - error: true, - message: result.error || 'Tool execution failed', - tool: toolName, - } - } - - toolCalls.push({ - name: toolName, - arguments: toolParams, - startTime: new Date(startTime).toISOString(), - endTime: new Date(endTime).toISOString(), - duration: duration, - result: resultContent, - success: result.success, - }) - - currentInput.push({ - type: 'function_call_output', - call_id: toolCall.id, - output: JSON.stringify(resultContent), - }) - } - - const thisToolsTime = Date.now() - toolsStartTime - toolsTime += thisToolsTime - - if (typeof currentToolChoice === 'object' && hasUsedForcedTool && forcedTools.length > 0) { - const remainingTools = forcedTools.filter((tool) => !usedForcedTools.includes(tool)) - - if (remainingTools.length > 0) { - currentToolChoice = { - type: 'function', - name: remainingTools[0], - } - logger.info(`Forcing next tool: ${remainingTools[0]}`) - } else { - currentToolChoice = 'auto' - logger.info('All forced tools have been used, switching to auto tool_choice') - } - } - - const nextModelStartTime = Date.now() - - currentResponse = await postResponses( - createRequestBody(currentInput, { tool_choice: currentToolChoice }) - ) - - checkForForcedToolUsage(extractResponseToolCalls(currentResponse.output), currentToolChoice) - - const nextModelEndTime = Date.now() - const thisModelTime = nextModelEndTime - nextModelStartTime - - timeSegments.push({ - type: 'model', - name: `Model response (iteration ${iterationCount + 1})`, - startTime: nextModelStartTime, - endTime: nextModelEndTime, - duration: thisModelTime, - }) - - modelTime += thisModelTime - - const usage = parseResponsesUsage(currentResponse.usage) - if (usage) { - tokens.input += usage.promptTokens - tokens.output += usage.completionTokens - tokens.total += usage.totalTokens - } - - iterationCount++ - } - - if (request.stream) { - logger.info('Using streaming for final response after tool processing') - - const accumulatedCost = calculateCost(request.model, tokens.input, tokens.output) - - const streamResponse = await fetch(responsesEndpoint, { - method: 'POST', - headers, - body: JSON.stringify( - createRequestBody(currentInput, { stream: true, tool_choice: 'auto' }) - ), - }) - - if (!streamResponse.ok) { - const message = await parseErrorResponse(streamResponse) - throw new Error(`OpenAI API error (${streamResponse.status}): ${message}`) - } - - const streamingResult = { - stream: createReadableStreamFromResponses(streamResponse, (content, usage) => { - streamingResult.execution.output.content = content - streamingResult.execution.output.tokens = { - input: tokens.input + (usage?.promptTokens || 0), - output: tokens.output + (usage?.completionTokens || 0), - total: tokens.total + (usage?.totalTokens || 0), - } - - const streamCost = calculateCost( - request.model, - usage?.promptTokens || 0, - usage?.completionTokens || 0 - ) - streamingResult.execution.output.cost = { - input: accumulatedCost.input + streamCost.input, - output: accumulatedCost.output + streamCost.output, - total: accumulatedCost.total + streamCost.total, - } - }), - execution: { - success: true, - output: { - content: '', - model: request.model, - tokens: { - input: tokens.input, - output: tokens.output, - total: tokens.total, - }, - toolCalls: - toolCalls.length > 0 - ? { - list: toolCalls, - count: toolCalls.length, - } - : undefined, - providerTiming: { - startTime: providerStartTimeISO, - endTime: new Date().toISOString(), - duration: Date.now() - providerStartTime, - modelTime: modelTime, - toolsTime: toolsTime, - firstResponseTime: firstResponseTime, - iterations: iterationCount + 1, - timeSegments: timeSegments, - }, - cost: { - input: accumulatedCost.input, - output: accumulatedCost.output, - total: accumulatedCost.total, - }, - }, - logs: [], - metadata: { - startTime: providerStartTimeISO, - endTime: new Date().toISOString(), - duration: Date.now() - providerStartTime, - }, - }, - } as StreamingExecution - - return streamingResult as StreamingExecution - } - - const providerEndTime = Date.now() - const providerEndTimeISO = new Date(providerEndTime).toISOString() - const totalDuration = providerEndTime - providerStartTime - - return { - content, - model: request.model, - tokens, - toolCalls: toolCalls.length > 0 ? toolCalls : undefined, - toolResults: toolResults.length > 0 ? toolResults : undefined, - timing: { - startTime: providerStartTimeISO, - endTime: providerEndTimeISO, - duration: totalDuration, - modelTime: modelTime, - toolsTime: toolsTime, - firstResponseTime: firstResponseTime, - iterations: iterationCount + 1, - timeSegments: timeSegments, - }, - } - } catch (error) { - const providerEndTime = Date.now() - const providerEndTimeISO = new Date(providerEndTime).toISOString() - const totalDuration = providerEndTime - providerStartTime - - logger.error('Error in OpenAI request:', { - error, - duration: totalDuration, - }) - - const enhancedError = new Error(error instanceof Error ? error.message : String(error)) - // @ts-ignore - Adding timing property to the error - enhancedError.timing = { - startTime: providerStartTimeISO, - endTime: providerEndTimeISO, - duration: totalDuration, - } - - throw enhancedError - } }, } diff --git a/apps/sim/providers/responses-provider.ts b/apps/sim/providers/responses-provider.ts new file mode 100644 index 0000000000..6dabbdfa76 --- /dev/null +++ b/apps/sim/providers/responses-provider.ts @@ -0,0 +1,640 @@ +import type { Logger } from '@sim/logger' +import type { StreamingExecution } from '@/executor/types' +import { MAX_TOOL_ITERATIONS } from '@/providers' +import { + buildResponsesInputFromMessages, + convertResponseOutputToInputItems, + convertToolsToResponses, + createReadableStreamFromResponses, + extractResponseText, + extractResponseToolCalls, + parseResponsesUsage, + type ResponsesInputItem, + type ResponsesToolCall, + toResponsesToolChoice, +} from '@/providers/responses-utils' +import type { Message, ProviderRequest, ProviderResponse, TimeSegment } from '@/providers/types' +import { + calculateCost, + prepareToolExecution, + prepareToolsWithUsageControl, + trackForcedToolUsage, +} from '@/providers/utils' +import { executeTool } from '@/tools' + +type PreparedTools = ReturnType +type ToolChoice = PreparedTools['toolChoice'] + +export interface ResponsesProviderConfig { + providerId: string + providerLabel: string + modelName: string + endpoint: string + headers: Record + logger: Logger +} + +/** + * Executes a Responses API request with tool-loop handling and streaming support. + */ +export async function executeResponsesProviderRequest( + request: ProviderRequest, + config: ResponsesProviderConfig +): Promise { + const { logger } = config + + logger.info(`Preparing ${config.providerLabel} request`, { + model: request.model, + hasSystemPrompt: !!request.systemPrompt, + hasMessages: !!request.messages?.length, + hasTools: !!request.tools?.length, + toolCount: request.tools?.length || 0, + hasResponseFormat: !!request.responseFormat, + stream: !!request.stream, + }) + + const allMessages: Message[] = [] + + if (request.systemPrompt) { + allMessages.push({ + role: 'system', + content: request.systemPrompt, + }) + } + + if (request.context) { + allMessages.push({ + role: 'user', + content: request.context, + }) + } + + if (request.messages) { + allMessages.push(...request.messages) + } + + const initialInput = buildResponsesInputFromMessages(allMessages) + + const basePayload: Record = { + model: config.modelName, + } + + if (request.temperature !== undefined) basePayload.temperature = request.temperature + if (request.maxTokens != null) basePayload.max_output_tokens = request.maxTokens + + if (request.reasoningEffort !== undefined) { + basePayload.reasoning = { + effort: request.reasoningEffort, + summary: 'auto', + } + } + + if (request.verbosity !== undefined) { + basePayload.text = { + ...(basePayload.text ?? {}), + verbosity: request.verbosity, + } + } + + if (request.responseFormat) { + basePayload.response_format = { + type: 'json_schema', + json_schema: { + name: request.responseFormat.name || 'response_schema', + schema: request.responseFormat.schema || request.responseFormat, + strict: request.responseFormat.strict !== false, + }, + } + + logger.info(`Added JSON schema response format to ${config.providerLabel} request`) + } + + const tools = request.tools?.length + ? request.tools.map((tool) => ({ + type: 'function', + function: { + name: tool.id, + description: tool.description, + parameters: tool.parameters, + }, + })) + : undefined + + let preparedTools: PreparedTools | null = null + let responsesToolChoice: ReturnType | undefined + let trackingToolChoice: ToolChoice | undefined + + if (tools?.length) { + preparedTools = prepareToolsWithUsageControl(tools, request.tools, logger, config.providerId) + const { tools: filteredTools, toolChoice } = preparedTools + trackingToolChoice = toolChoice + + if (filteredTools?.length) { + const convertedTools = convertToolsToResponses(filteredTools) + if (!convertedTools.length) { + throw new Error('All tools have empty names') + } + + basePayload.tools = convertedTools + basePayload.parallel_tool_calls = true + } + + if (toolChoice) { + responsesToolChoice = toResponsesToolChoice(toolChoice) + if (responsesToolChoice) { + basePayload.tool_choice = responsesToolChoice + } + + logger.info(`${config.providerLabel} request configuration:`, { + toolCount: filteredTools?.length || 0, + toolChoice: + typeof toolChoice === 'string' + ? toolChoice + : toolChoice.type === 'function' + ? `force:${toolChoice.function?.name}` + : toolChoice.type === 'tool' + ? `force:${toolChoice.name}` + : toolChoice.type === 'any' + ? `force:${toolChoice.any?.name || 'unknown'}` + : 'unknown', + model: config.modelName, + }) + } + } + + const createRequestBody = (input: ResponsesInputItem[], overrides: Record = {}) => ({ + ...basePayload, + input, + ...overrides, + }) + + const parseErrorResponse = async (response: Response): Promise => { + const text = await response.text() + try { + const payload = JSON.parse(text) + return payload?.error?.message || text + } catch { + return text + } + } + + const postResponses = async (body: Record) => { + const response = await fetch(config.endpoint, { + method: 'POST', + headers: config.headers, + body: JSON.stringify(body), + }) + + if (!response.ok) { + const message = await parseErrorResponse(response) + throw new Error(`${config.providerLabel} API error (${response.status}): ${message}`) + } + + return response.json() + } + + const providerStartTime = Date.now() + const providerStartTimeISO = new Date(providerStartTime).toISOString() + + try { + if (request.stream && (!tools || tools.length === 0)) { + logger.info(`Using streaming response for ${config.providerLabel} request`) + + const streamResponse = await fetch(config.endpoint, { + method: 'POST', + headers: config.headers, + body: JSON.stringify(createRequestBody(initialInput, { stream: true })), + }) + + if (!streamResponse.ok) { + const message = await parseErrorResponse(streamResponse) + throw new Error(`${config.providerLabel} API error (${streamResponse.status}): ${message}`) + } + + const streamingResult = { + stream: createReadableStreamFromResponses(streamResponse, (content, usage) => { + streamingResult.execution.output.content = content + streamingResult.execution.output.tokens = { + input: usage?.promptTokens || 0, + output: usage?.completionTokens || 0, + total: usage?.totalTokens || 0, + } + + const costResult = calculateCost( + request.model, + usage?.promptTokens || 0, + usage?.completionTokens || 0 + ) + streamingResult.execution.output.cost = { + input: costResult.input, + output: costResult.output, + total: costResult.total, + } + + const streamEndTime = Date.now() + const streamEndTimeISO = new Date(streamEndTime).toISOString() + + if (streamingResult.execution.output.providerTiming) { + streamingResult.execution.output.providerTiming.endTime = streamEndTimeISO + streamingResult.execution.output.providerTiming.duration = + streamEndTime - providerStartTime + + if (streamingResult.execution.output.providerTiming.timeSegments?.[0]) { + streamingResult.execution.output.providerTiming.timeSegments[0].endTime = + streamEndTime + streamingResult.execution.output.providerTiming.timeSegments[0].duration = + streamEndTime - providerStartTime + } + } + }), + execution: { + success: true, + output: { + content: '', + model: request.model, + tokens: { input: 0, output: 0, total: 0 }, + toolCalls: undefined, + providerTiming: { + startTime: providerStartTimeISO, + endTime: new Date().toISOString(), + duration: Date.now() - providerStartTime, + timeSegments: [ + { + type: 'model', + name: 'Streaming response', + startTime: providerStartTime, + endTime: Date.now(), + duration: Date.now() - providerStartTime, + }, + ], + }, + cost: { input: 0, output: 0, total: 0 }, + }, + logs: [], + metadata: { + startTime: providerStartTimeISO, + endTime: new Date().toISOString(), + duration: Date.now() - providerStartTime, + }, + }, + } as StreamingExecution + + return streamingResult as StreamingExecution + } + + const initialCallTime = Date.now() + const forcedTools = preparedTools?.forcedTools || [] + let usedForcedTools: string[] = [] + let hasUsedForcedTool = false + let currentToolChoice = responsesToolChoice + let currentTrackingToolChoice = trackingToolChoice + + const checkForForcedToolUsage = ( + toolCallsInResponse: ResponsesToolCall[], + toolChoice: ToolChoice | undefined + ) => { + if (typeof toolChoice === 'object' && toolCallsInResponse.length > 0) { + const result = trackForcedToolUsage( + toolCallsInResponse, + toolChoice, + logger, + config.providerId, + forcedTools, + usedForcedTools + ) + hasUsedForcedTool = result.hasUsedForcedTool + usedForcedTools = result.usedForcedTools + } + } + + const currentInput: ResponsesInputItem[] = [...initialInput] + let currentResponse = await postResponses( + createRequestBody(currentInput, { tool_choice: currentToolChoice }) + ) + const firstResponseTime = Date.now() - initialCallTime + + const initialUsage = parseResponsesUsage(currentResponse.usage) + const tokens = { + input: initialUsage?.promptTokens || 0, + output: initialUsage?.completionTokens || 0, + total: initialUsage?.totalTokens || 0, + } + + const toolCalls = [] + const toolResults = [] + let iterationCount = 0 + let modelTime = firstResponseTime + let toolsTime = 0 + let content = extractResponseText(currentResponse.output) || '' + + const timeSegments: TimeSegment[] = [ + { + type: 'model', + name: 'Initial response', + startTime: initialCallTime, + endTime: initialCallTime + firstResponseTime, + duration: firstResponseTime, + }, + ] + + checkForForcedToolUsage( + extractResponseToolCalls(currentResponse.output), + currentTrackingToolChoice + ) + + while (iterationCount < MAX_TOOL_ITERATIONS) { + const responseText = extractResponseText(currentResponse.output) + if (responseText) { + content = responseText + } + + const toolCallsInResponse = extractResponseToolCalls(currentResponse.output) + if (!toolCallsInResponse.length) { + break + } + + const outputInputItems = convertResponseOutputToInputItems(currentResponse.output) + if (outputInputItems.length) { + currentInput.push(...outputInputItems) + } + + logger.info( + `Processing ${toolCallsInResponse.length} tool calls in parallel (iteration ${ + iterationCount + 1 + }/${MAX_TOOL_ITERATIONS})` + ) + + const toolsStartTime = Date.now() + + const toolExecutionPromises = toolCallsInResponse.map(async (toolCall) => { + const toolCallStartTime = Date.now() + const toolName = toolCall.name + + try { + const toolArgs = toolCall.arguments ? JSON.parse(toolCall.arguments) : {} + const tool = request.tools?.find((t) => t.id === toolName) + + if (!tool) { + return null + } + + const { toolParams, executionParams } = prepareToolExecution(tool, toolArgs, request) + const result = await executeTool(toolName, executionParams) + const toolCallEndTime = Date.now() + + return { + toolCall, + toolName, + toolParams, + result, + startTime: toolCallStartTime, + endTime: toolCallEndTime, + duration: toolCallEndTime - toolCallStartTime, + } + } catch (error) { + const toolCallEndTime = Date.now() + logger.error('Error processing tool call:', { error, toolName }) + + return { + toolCall, + toolName, + toolParams: {}, + result: { + success: false, + output: undefined, + error: error instanceof Error ? error.message : 'Tool execution failed', + }, + startTime: toolCallStartTime, + endTime: toolCallEndTime, + duration: toolCallEndTime - toolCallStartTime, + } + } + }) + + const executionResults = await Promise.allSettled(toolExecutionPromises) + + for (const settledResult of executionResults) { + if (settledResult.status === 'rejected' || !settledResult.value) continue + + const { toolCall, toolName, toolParams, result, startTime, endTime, duration } = + settledResult.value + + timeSegments.push({ + type: 'tool', + name: toolName, + startTime: startTime, + endTime: endTime, + duration: duration, + }) + + let resultContent: any + if (result.success) { + toolResults.push(result.output) + resultContent = result.output + } else { + resultContent = { + error: true, + message: result.error || 'Tool execution failed', + tool: toolName, + } + } + + toolCalls.push({ + name: toolName, + arguments: toolParams, + startTime: new Date(startTime).toISOString(), + endTime: new Date(endTime).toISOString(), + duration: duration, + result: resultContent, + success: result.success, + }) + + currentInput.push({ + type: 'function_call_output', + call_id: toolCall.id, + output: JSON.stringify(resultContent), + }) + } + + const thisToolsTime = Date.now() - toolsStartTime + toolsTime += thisToolsTime + + if (typeof currentToolChoice === 'object' && hasUsedForcedTool && forcedTools.length > 0) { + const remainingTools = forcedTools.filter((tool) => !usedForcedTools.includes(tool)) + + if (remainingTools.length > 0) { + currentToolChoice = { + type: 'function', + name: remainingTools[0], + } + currentTrackingToolChoice = { + type: 'function', + function: { name: remainingTools[0] }, + } + logger.info(`Forcing next tool: ${remainingTools[0]}`) + } else { + currentToolChoice = 'auto' + currentTrackingToolChoice = 'auto' + logger.info('All forced tools have been used, switching to auto tool_choice') + } + } + + const nextModelStartTime = Date.now() + + currentResponse = await postResponses( + createRequestBody(currentInput, { tool_choice: currentToolChoice }) + ) + + checkForForcedToolUsage( + extractResponseToolCalls(currentResponse.output), + currentTrackingToolChoice + ) + + const nextModelEndTime = Date.now() + const thisModelTime = nextModelEndTime - nextModelStartTime + + timeSegments.push({ + type: 'model', + name: `Model response (iteration ${iterationCount + 1})`, + startTime: nextModelStartTime, + endTime: nextModelEndTime, + duration: thisModelTime, + }) + + modelTime += thisModelTime + + const usage = parseResponsesUsage(currentResponse.usage) + if (usage) { + tokens.input += usage.promptTokens + tokens.output += usage.completionTokens + tokens.total += usage.totalTokens + } + + iterationCount++ + } + + if (request.stream) { + logger.info('Using streaming for final response after tool processing') + + const accumulatedCost = calculateCost(request.model, tokens.input, tokens.output) + + const streamResponse = await fetch(config.endpoint, { + method: 'POST', + headers: config.headers, + body: JSON.stringify( + createRequestBody(currentInput, { stream: true, tool_choice: 'auto' }) + ), + }) + + if (!streamResponse.ok) { + const message = await parseErrorResponse(streamResponse) + throw new Error(`${config.providerLabel} API error (${streamResponse.status}): ${message}`) + } + + const streamingResult = { + stream: createReadableStreamFromResponses(streamResponse, (content, usage) => { + streamingResult.execution.output.content = content + streamingResult.execution.output.tokens = { + input: tokens.input + (usage?.promptTokens || 0), + output: tokens.output + (usage?.completionTokens || 0), + total: tokens.total + (usage?.totalTokens || 0), + } + + const streamCost = calculateCost( + request.model, + usage?.promptTokens || 0, + usage?.completionTokens || 0 + ) + streamingResult.execution.output.cost = { + input: accumulatedCost.input + streamCost.input, + output: accumulatedCost.output + streamCost.output, + total: accumulatedCost.total + streamCost.total, + } + }), + execution: { + success: true, + output: { + content: '', + model: request.model, + tokens: { + input: tokens.input, + output: tokens.output, + total: tokens.total, + }, + toolCalls: + toolCalls.length > 0 + ? { + list: toolCalls, + count: toolCalls.length, + } + : undefined, + providerTiming: { + startTime: providerStartTimeISO, + endTime: new Date().toISOString(), + duration: Date.now() - providerStartTime, + modelTime: modelTime, + toolsTime: toolsTime, + firstResponseTime: firstResponseTime, + iterations: iterationCount + 1, + timeSegments: timeSegments, + }, + cost: { + input: accumulatedCost.input, + output: accumulatedCost.output, + total: accumulatedCost.total, + }, + }, + logs: [], + metadata: { + startTime: providerStartTimeISO, + endTime: new Date().toISOString(), + duration: Date.now() - providerStartTime, + }, + }, + } as StreamingExecution + + return streamingResult as StreamingExecution + } + + const providerEndTime = Date.now() + const providerEndTimeISO = new Date(providerEndTime).toISOString() + const totalDuration = providerEndTime - providerStartTime + + return { + content, + model: request.model, + tokens, + toolCalls: toolCalls.length > 0 ? toolCalls : undefined, + toolResults: toolResults.length > 0 ? toolResults : undefined, + timing: { + startTime: providerStartTimeISO, + endTime: providerEndTimeISO, + duration: totalDuration, + modelTime: modelTime, + toolsTime: toolsTime, + firstResponseTime: firstResponseTime, + iterations: iterationCount + 1, + timeSegments: timeSegments, + }, + } + } catch (error) { + const providerEndTime = Date.now() + const providerEndTimeISO = new Date(providerEndTime).toISOString() + const totalDuration = providerEndTime - providerStartTime + + logger.error(`Error in ${config.providerLabel} request:`, { + error, + duration: totalDuration, + }) + + const enhancedError = new Error(error instanceof Error ? error.message : String(error)) + // @ts-ignore - Adding timing property to the error + enhancedError.timing = { + startTime: providerStartTimeISO, + endTime: providerEndTimeISO, + duration: totalDuration, + } + + throw enhancedError + } +} From 97efa956f86c60e6c9ee7da5a3196d8bc76ca13d Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Wed, 4 Feb 2026 13:30:43 -0800 Subject: [PATCH 03/13] Fix streaming --- apps/sim/app/api/wand/route.ts | 10 +++++++++- apps/sim/providers/responses-utils.ts | 15 ++++++++++++--- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/apps/sim/app/api/wand/route.ts b/apps/sim/app/api/wand/route.ts index 9673387862..9ad06ceaf6 100644 --- a/apps/sim/app/api/wand/route.ts +++ b/apps/sim/app/api/wand/route.ts @@ -377,7 +377,15 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg } if (eventType === 'response.output_text.delta') { - const content = parsed.delta + let content = '' + if (typeof parsed.delta === 'string') { + content = parsed.delta + } else if (parsed.delta && typeof parsed.delta.text === 'string') { + content = parsed.delta.text + } else if (typeof parsed.text === 'string') { + content = parsed.text + } + if (content) { chunkCount++ if (chunkCount === 1) { diff --git a/apps/sim/providers/responses-utils.ts b/apps/sim/providers/responses-utils.ts index 6245796e0e..3b64ecb53d 100644 --- a/apps/sim/providers/responses-utils.ts +++ b/apps/sim/providers/responses-utils.ts @@ -359,9 +359,18 @@ export function createReadableStreamFromResponses( } if (eventType === 'response.output_text.delta') { - if (typeof event.delta === 'string' && event.delta.length > 0) { - fullContent += event.delta - controller.enqueue(encoder.encode(event.delta)) + let deltaText = '' + if (typeof event.delta === 'string') { + deltaText = event.delta + } else if (event.delta && typeof event.delta.text === 'string') { + deltaText = event.delta.text + } else if (typeof event.text === 'string') { + deltaText = event.text + } + + if (deltaText.length > 0) { + fullContent += deltaText + controller.enqueue(encoder.encode(deltaText)) } } From a96de42e6916966549e262548c9bd8be2018d8ed Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Wed, 4 Feb 2026 13:39:20 -0800 Subject: [PATCH 04/13] Bug fixes --- apps/sim/app/api/wand/route.ts | 4 +- apps/sim/lib/copilot/chat-title.ts | 2 +- apps/sim/providers/responses-utils.ts | 75 ++++++++++++++++++++++----- 3 files changed, 66 insertions(+), 15 deletions(-) diff --git a/apps/sim/app/api/wand/route.ts b/apps/sim/app/api/wand/route.ts index 9ad06ceaf6..dd54ae152c 100644 --- a/apps/sim/app/api/wand/route.ts +++ b/apps/sim/app/api/wand/route.ts @@ -262,7 +262,7 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg ) const apiUrl = useWandAzure - ? `${azureEndpoint}/openai/v1/responses?api-version=${azureApiVersion}` + ? `${azureEndpoint?.replace(/\/$/, '')}/openai/v1/responses?api-version=${azureApiVersion}` : 'https://api.openai.com/v1/responses' const headers: Record = { @@ -461,7 +461,7 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg } const apiUrl = useWandAzure - ? `${azureEndpoint}/openai/v1/responses?api-version=${azureApiVersion}` + ? `${azureEndpoint?.replace(/\/$/, '')}/openai/v1/responses?api-version=${azureApiVersion}` : 'https://api.openai.com/v1/responses' const headers: Record = { diff --git a/apps/sim/lib/copilot/chat-title.ts b/apps/sim/lib/copilot/chat-title.ts index 0edbcc30a6..26d6b06a41 100644 --- a/apps/sim/lib/copilot/chat-title.ts +++ b/apps/sim/lib/copilot/chat-title.ts @@ -24,7 +24,7 @@ export async function generateChatTitle(message: string): Promise try { const apiUrl = useChatTitleAzure - ? `${azureEndpoint}/openai/v1/responses?api-version=${azureApiVersion}` + ? `${azureEndpoint?.replace(/\/$/, '')}/openai/v1/responses?api-version=${azureApiVersion}` : 'https://api.openai.com/v1/responses' const headers: Record = { diff --git a/apps/sim/providers/responses-utils.ts b/apps/sim/providers/responses-utils.ts index 3b64ecb53d..afb0510dd9 100644 --- a/apps/sim/providers/responses-utils.ts +++ b/apps/sim/providers/responses-utils.ts @@ -57,7 +57,10 @@ export function buildResponsesInputFromMessages(messages: Message[]): ResponsesI continue } - if (message.content) { + if ( + message.content && + (message.role === 'system' || message.role === 'user' || message.role === 'assistant') + ) { input.push({ role: message.role, content: message.content, @@ -200,6 +203,28 @@ export function convertResponseOutputToInputItems(output: unknown): ResponsesInp content: text, }) } + + const toolCalls = Array.isArray(item.tool_calls) ? item.tool_calls : [] + for (const toolCall of toolCalls) { + const callId = toolCall?.id + const name = toolCall?.function?.name ?? toolCall?.name + if (!callId || !name) { + continue + } + + const argumentsValue = + typeof toolCall?.function?.arguments === 'string' + ? toolCall.function.arguments + : JSON.stringify(toolCall?.function?.arguments ?? {}) + + items.push({ + type: 'function_call', + call_id: callId, + name, + arguments: argumentsValue, + }) + } + continue } @@ -233,28 +258,54 @@ export function extractResponseToolCalls(output: unknown): ResponsesToolCall[] { return [] } - return output - .map((item) => { - if (!item || item.type !== 'function_call') { - return null - } + const toolCalls: ResponsesToolCall[] = [] + + for (const item of output) { + if (!item || typeof item !== 'object') { + continue + } + if (item.type === 'function_call') { const callId = item.call_id ?? item.id const name = item.name ?? item.function?.name if (!callId || !name) { - return null + continue } const argumentsValue = typeof item.arguments === 'string' ? item.arguments : JSON.stringify(item.arguments ?? {}) - return { + toolCalls.push({ id: callId, name, arguments: argumentsValue, + }) + continue + } + + if (item.type === 'message' && Array.isArray(item.tool_calls)) { + for (const toolCall of item.tool_calls) { + const callId = toolCall?.id + const name = toolCall?.function?.name ?? toolCall?.name + if (!callId || !name) { + continue + } + + const argumentsValue = + typeof toolCall?.function?.arguments === 'string' + ? toolCall.function.arguments + : JSON.stringify(toolCall?.function?.arguments ?? {}) + + toolCalls.push({ + id: callId, + name, + arguments: argumentsValue, + }) } - }) - .filter(Boolean) as ResponsesToolCall[] + } + } + + return toolCalls } /** @@ -269,8 +320,8 @@ export function parseResponsesUsage(usage: any): ResponsesUsageTokens | undefine const outputTokens = Number(usage.output_tokens ?? 0) const cachedTokens = Number(usage.input_tokens_details?.cached_tokens ?? 0) const reasoningTokens = Number(usage.output_tokens_details?.reasoning_tokens ?? 0) - const completionTokens = outputTokens + reasoningTokens - const totalTokens = inputTokens + completionTokens + const completionTokens = outputTokens + const totalTokens = inputTokens + outputTokens return { promptTokens: inputTokens, From 6e88a73cab87c5444b67d0f127bdaf02c001aedc Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Wed, 4 Feb 2026 13:46:56 -0800 Subject: [PATCH 05/13] Bug fixes --- apps/sim/app/api/wand/route.ts | 24 ++++++++++++++++++++---- apps/sim/providers/responses-utils.ts | 7 +++++-- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/apps/sim/app/api/wand/route.ts b/apps/sim/app/api/wand/route.ts index dd54ae152c..0cbe0e4e71 100644 --- a/apps/sim/app/api/wand/route.ts +++ b/apps/sim/app/api/wand/route.ts @@ -313,10 +313,21 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg return } + let finalUsage: any = null + let usageRecorded = false + + const recordUsage = async () => { + if (usageRecorded || !finalUsage) { + return + } + + usageRecorded = true + await updateUserStatsForWand(session.user.id, finalUsage, requestId, isBYOK) + } + try { let buffer = '' let chunkCount = 0 - let finalUsage: any = null let activeEventType: string | undefined while (true) { @@ -324,6 +335,7 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg if (done) { logger.info(`[${requestId}] Stream completed. Total chunks: ${chunkCount}`) + await recordUsage() controller.enqueue(encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`)) controller.close() break @@ -353,9 +365,7 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg if (data === '[DONE]') { logger.info(`[${requestId}] Received [DONE] signal`) - if (finalUsage) { - await updateUserStatsForWand(session.user.id, finalUsage, requestId, isBYOK) - } + await recordUsage() controller.enqueue( encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`) @@ -423,6 +433,12 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg stack: streamError?.stack, }) + try { + await recordUsage() + } catch (usageError) { + logger.warn(`[${requestId}] Failed to record usage after stream error`, usageError) + } + const errorData = `data: ${JSON.stringify({ error: 'Streaming failed', done: true })}\n\n` controller.enqueue(encoder.encode(errorData)) controller.close() diff --git a/apps/sim/providers/responses-utils.ts b/apps/sim/providers/responses-utils.ts index afb0510dd9..0860d90e67 100644 --- a/apps/sim/providers/responses-utils.ts +++ b/apps/sim/providers/responses-utils.ts @@ -310,6 +310,9 @@ export function extractResponseToolCalls(output: unknown): ResponsesToolCall[] { /** * Maps Responses API usage data to prompt/completion token counts. + * + * Note: output_tokens is expected to include reasoning tokens; fall back to reasoning_tokens + * when output_tokens is missing or zero. */ export function parseResponsesUsage(usage: any): ResponsesUsageTokens | undefined { if (!usage || typeof usage !== 'object') { @@ -320,8 +323,8 @@ export function parseResponsesUsage(usage: any): ResponsesUsageTokens | undefine const outputTokens = Number(usage.output_tokens ?? 0) const cachedTokens = Number(usage.input_tokens_details?.cached_tokens ?? 0) const reasoningTokens = Number(usage.output_tokens_details?.reasoning_tokens ?? 0) - const completionTokens = outputTokens - const totalTokens = inputTokens + outputTokens + const completionTokens = Math.max(outputTokens, reasoningTokens) + const totalTokens = inputTokens + completionTokens return { promptTokens: inputTokens, From 8533f1f99652118a527e1eed28ef511e5835bb13 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Wed, 4 Feb 2026 13:52:36 -0800 Subject: [PATCH 06/13] Fix responseformat --- apps/sim/providers/responses-provider.ts | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/apps/sim/providers/responses-provider.ts b/apps/sim/providers/responses-provider.ts index 6dabbdfa76..09f908e489 100644 --- a/apps/sim/providers/responses-provider.ts +++ b/apps/sim/providers/responses-provider.ts @@ -97,12 +97,15 @@ export async function executeResponsesProviderRequest( } if (request.responseFormat) { - basePayload.response_format = { - type: 'json_schema', - json_schema: { - name: request.responseFormat.name || 'response_schema', - schema: request.responseFormat.schema || request.responseFormat, - strict: request.responseFormat.strict !== false, + basePayload.text = { + ...(basePayload.text ?? {}), + format: { + type: 'json_schema', + json_schema: { + name: request.responseFormat.name || 'response_schema', + schema: request.responseFormat.schema || request.responseFormat, + strict: request.responseFormat.strict !== false, + }, }, } From b8191a38c3a8e41046abdcff379f89a94126a8ea Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Wed, 4 Feb 2026 14:07:09 -0800 Subject: [PATCH 07/13] Refactor --- apps/sim/app/api/wand/route.ts | 11 ++- apps/sim/lib/copilot/chat-title.ts | 2 +- apps/sim/providers/azure-openai/index.ts | 2 +- .../{responses-provider.ts => openai/core.ts} | 79 +++++++++++++++---- apps/sim/providers/openai/index.ts | 2 +- .../{responses-utils.ts => openai/utils.ts} | 28 +++++-- 6 files changed, 99 insertions(+), 25 deletions(-) rename apps/sim/providers/{responses-provider.ts => openai/core.ts} (90%) rename apps/sim/providers/{responses-utils.ts => openai/utils.ts} (93%) diff --git a/apps/sim/app/api/wand/route.ts b/apps/sim/app/api/wand/route.ts index 0cbe0e4e71..70df2b1f91 100644 --- a/apps/sim/app/api/wand/route.ts +++ b/apps/sim/app/api/wand/route.ts @@ -11,7 +11,7 @@ import { env } from '@/lib/core/config/env' import { getCostMultiplier, isBillingEnabled } from '@/lib/core/config/feature-flags' import { generateRequestId } from '@/lib/core/utils/request' import { verifyWorkspaceMembership } from '@/app/api/workflows/utils' -import { extractResponseText, parseResponsesUsage } from '@/providers/responses-utils' +import { extractResponseText, parseResponsesUsage } from '@/providers/openai/utils' import { getModelPricing } from '@/providers/utils' export const dynamic = 'force-dynamic' @@ -386,12 +386,19 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg throw new Error(parsed?.error?.message || 'Responses stream error') } - if (eventType === 'response.output_text.delta') { + if ( + eventType === 'response.output_text.delta' || + eventType === 'response.output_json.delta' + ) { let content = '' if (typeof parsed.delta === 'string') { content = parsed.delta } else if (parsed.delta && typeof parsed.delta.text === 'string') { content = parsed.delta.text + } else if (parsed.delta && parsed.delta.json !== undefined) { + content = JSON.stringify(parsed.delta.json) + } else if (parsed.json !== undefined) { + content = JSON.stringify(parsed.json) } else if (typeof parsed.text === 'string') { content = parsed.text } diff --git a/apps/sim/lib/copilot/chat-title.ts b/apps/sim/lib/copilot/chat-title.ts index 26d6b06a41..10dd882998 100644 --- a/apps/sim/lib/copilot/chat-title.ts +++ b/apps/sim/lib/copilot/chat-title.ts @@ -1,6 +1,6 @@ import { createLogger } from '@sim/logger' import { env } from '@/lib/core/config/env' -import { extractResponseText } from '@/providers/responses-utils' +import { extractResponseText } from '@/providers/openai/utils' const logger = createLogger('SimAgentUtils') diff --git a/apps/sim/providers/azure-openai/index.ts b/apps/sim/providers/azure-openai/index.ts index 32049b2a84..da11e00170 100644 --- a/apps/sim/providers/azure-openai/index.ts +++ b/apps/sim/providers/azure-openai/index.ts @@ -2,7 +2,7 @@ import { createLogger } from '@sim/logger' import { env } from '@/lib/core/config/env' import type { StreamingExecution } from '@/executor/types' import { getProviderDefaultModel, getProviderModels } from '@/providers/models' -import { executeResponsesProviderRequest } from '@/providers/responses-provider' +import { executeResponsesProviderRequest } from '@/providers/openai/core' import type { ProviderConfig, ProviderRequest, ProviderResponse } from '@/providers/types' const logger = createLogger('AzureOpenAIProvider') diff --git a/apps/sim/providers/responses-provider.ts b/apps/sim/providers/openai/core.ts similarity index 90% rename from apps/sim/providers/responses-provider.ts rename to apps/sim/providers/openai/core.ts index 09f908e489..f1c0b7f15b 100644 --- a/apps/sim/providers/responses-provider.ts +++ b/apps/sim/providers/openai/core.ts @@ -1,6 +1,14 @@ import type { Logger } from '@sim/logger' import type { StreamingExecution } from '@/executor/types' import { MAX_TOOL_ITERATIONS } from '@/providers' +import type { Message, ProviderRequest, ProviderResponse, TimeSegment } from '@/providers/types' +import { + calculateCost, + prepareToolExecution, + prepareToolsWithUsageControl, + trackForcedToolUsage, +} from '@/providers/utils' +import { executeTool } from '@/tools' import { buildResponsesInputFromMessages, convertResponseOutputToInputItems, @@ -12,19 +20,59 @@ import { type ResponsesInputItem, type ResponsesToolCall, toResponsesToolChoice, -} from '@/providers/responses-utils' -import type { Message, ProviderRequest, ProviderResponse, TimeSegment } from '@/providers/types' -import { - calculateCost, - prepareToolExecution, - prepareToolsWithUsageControl, - trackForcedToolUsage, -} from '@/providers/utils' -import { executeTool } from '@/tools' +} from './utils' type PreparedTools = ReturnType type ToolChoice = PreparedTools['toolChoice'] +/** + * Recursively enforces OpenAI strict mode requirements on a JSON schema. + * - Sets additionalProperties: false on all object types. + * - Ensures required includes ALL property keys. + */ +function enforceStrictSchema(schema: any): any { + if (!schema || typeof schema !== 'object') return schema + + const result = { ...schema } + + // If this is an object type, enforce strict requirements + if (result.type === 'object') { + result.additionalProperties = false + + // Recursively process properties and ensure required includes all keys + if (result.properties && typeof result.properties === 'object') { + const propKeys = Object.keys(result.properties) + result.required = propKeys // Strict mode requires ALL properties + result.properties = Object.fromEntries( + Object.entries(result.properties).map(([key, value]) => [key, enforceStrictSchema(value)]) + ) + } + } + + // Handle array items + if (result.type === 'array' && result.items) { + result.items = enforceStrictSchema(result.items) + } + + // Handle anyOf, oneOf, allOf + for (const keyword of ['anyOf', 'oneOf', 'allOf']) { + if (Array.isArray(result[keyword])) { + result[keyword] = result[keyword].map(enforceStrictSchema) + } + } + + // Handle $defs / definitions + for (const defKey of ['$defs', 'definitions']) { + if (result[defKey] && typeof result[defKey] === 'object') { + result[defKey] = Object.fromEntries( + Object.entries(result[defKey]).map(([key, value]) => [key, enforceStrictSchema(value)]) + ) + } + } + + return result +} + export interface ResponsesProviderConfig { providerId: string providerLabel: string @@ -97,15 +145,18 @@ export async function executeResponsesProviderRequest( } if (request.responseFormat) { + const isStrict = request.responseFormat.strict !== false + const rawSchema = request.responseFormat.schema || request.responseFormat + // OpenAI strict mode requires additionalProperties: false on ALL nested objects + const cleanedSchema = isStrict ? enforceStrictSchema(rawSchema) : rawSchema + basePayload.text = { ...(basePayload.text ?? {}), format: { type: 'json_schema', - json_schema: { - name: request.responseFormat.name || 'response_schema', - schema: request.responseFormat.schema || request.responseFormat, - strict: request.responseFormat.strict !== false, - }, + name: request.responseFormat.name || 'response_schema', + schema: cleanedSchema, + strict: isStrict, }, } diff --git a/apps/sim/providers/openai/index.ts b/apps/sim/providers/openai/index.ts index d254a0edda..db95598e8c 100644 --- a/apps/sim/providers/openai/index.ts +++ b/apps/sim/providers/openai/index.ts @@ -1,8 +1,8 @@ import { createLogger } from '@sim/logger' import type { StreamingExecution } from '@/executor/types' import { getProviderDefaultModel, getProviderModels } from '@/providers/models' -import { executeResponsesProviderRequest } from '@/providers/responses-provider' import type { ProviderConfig, ProviderRequest, ProviderResponse } from '@/providers/types' +import { executeResponsesProviderRequest } from './core' const logger = createLogger('OpenAIProvider') const responsesEndpoint = 'https://api.openai.com/v1/responses' diff --git a/apps/sim/providers/responses-utils.ts b/apps/sim/providers/openai/utils.ts similarity index 93% rename from apps/sim/providers/responses-utils.ts rename to apps/sim/providers/openai/utils.ts index 0860d90e67..06196bc35c 100644 --- a/apps/sim/providers/responses-utils.ts +++ b/apps/sim/providers/openai/utils.ts @@ -146,12 +146,21 @@ function extractTextFromMessageItem(item: any): string { const textParts: string[] = [] for (const part of item.content) { - if ( - part && - (part.type === 'output_text' || part.type === 'text') && - typeof part.text === 'string' - ) { + if (!part || typeof part !== 'object') { + continue + } + + if ((part.type === 'output_text' || part.type === 'text') && typeof part.text === 'string') { textParts.push(part.text) + continue + } + + if (part.type === 'output_json') { + if (typeof part.text === 'string') { + textParts.push(part.text) + } else if (part.json !== undefined) { + textParts.push(JSON.stringify(part.json)) + } } } @@ -412,12 +421,19 @@ export function createReadableStreamFromResponses( return } - if (eventType === 'response.output_text.delta') { + if ( + eventType === 'response.output_text.delta' || + eventType === 'response.output_json.delta' + ) { let deltaText = '' if (typeof event.delta === 'string') { deltaText = event.delta } else if (event.delta && typeof event.delta.text === 'string') { deltaText = event.delta.text + } else if (event.delta && event.delta.json !== undefined) { + deltaText = JSON.stringify(event.delta.json) + } else if (event.json !== undefined) { + deltaText = JSON.stringify(event.json) } else if (typeof event.text === 'string') { deltaText = event.text } From c6a946977a993f4591df29c8f7c8de217b81cabe Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Wed, 4 Feb 2026 14:09:23 -0800 Subject: [PATCH 08/13] Fix bugs --- apps/sim/app/api/wand/route.ts | 97 ++++++++++++++++--------------- apps/sim/providers/openai/core.ts | 5 ++ 2 files changed, 55 insertions(+), 47 deletions(-) diff --git a/apps/sim/app/api/wand/route.ts b/apps/sim/app/api/wand/route.ts index 70df2b1f91..6df2034148 100644 --- a/apps/sim/app/api/wand/route.ts +++ b/apps/sim/app/api/wand/route.ts @@ -374,62 +374,65 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg return } + let parsed: any try { - const parsed = JSON.parse(data) - const eventType = parsed?.type ?? activeEventType - - if ( - eventType === 'response.error' || - eventType === 'error' || - activeEventType === 'response.failed' - ) { - throw new Error(parsed?.error?.message || 'Responses stream error') - } + parsed = JSON.parse(data) + } catch (parseError) { + logger.debug(`[${requestId}] Skipped non-JSON line: ${data.substring(0, 100)}`) + continue + } - if ( - eventType === 'response.output_text.delta' || - eventType === 'response.output_json.delta' - ) { - let content = '' - if (typeof parsed.delta === 'string') { - content = parsed.delta - } else if (parsed.delta && typeof parsed.delta.text === 'string') { - content = parsed.delta.text - } else if (parsed.delta && parsed.delta.json !== undefined) { - content = JSON.stringify(parsed.delta.json) - } else if (parsed.json !== undefined) { - content = JSON.stringify(parsed.json) - } else if (typeof parsed.text === 'string') { - content = parsed.text - } + const eventType = parsed?.type ?? activeEventType - if (content) { - chunkCount++ - if (chunkCount === 1) { - logger.info(`[${requestId}] Received first content chunk`) - } + if ( + eventType === 'response.error' || + eventType === 'error' || + eventType === 'response.failed' + ) { + throw new Error(parsed?.error?.message || 'Responses stream error') + } - controller.enqueue( - encoder.encode(`data: ${JSON.stringify({ chunk: content })}\n\n`) - ) + if ( + eventType === 'response.output_text.delta' || + eventType === 'response.output_json.delta' + ) { + let content = '' + if (typeof parsed.delta === 'string') { + content = parsed.delta + } else if (parsed.delta && typeof parsed.delta.text === 'string') { + content = parsed.delta.text + } else if (parsed.delta && parsed.delta.json !== undefined) { + content = JSON.stringify(parsed.delta.json) + } else if (parsed.json !== undefined) { + content = JSON.stringify(parsed.json) + } else if (typeof parsed.text === 'string') { + content = parsed.text + } + + if (content) { + chunkCount++ + if (chunkCount === 1) { + logger.info(`[${requestId}] Received first content chunk`) } + + controller.enqueue( + encoder.encode(`data: ${JSON.stringify({ chunk: content })}\n\n`) + ) } + } - if (eventType === 'response.completed') { - const usage = parseResponsesUsage(parsed?.response?.usage ?? parsed?.usage) - if (usage) { - finalUsage = { - prompt_tokens: usage.promptTokens, - completion_tokens: usage.completionTokens, - total_tokens: usage.totalTokens, - } - logger.info( - `[${requestId}] Received usage data: ${JSON.stringify(finalUsage)}` - ) + if (eventType === 'response.completed') { + const usage = parseResponsesUsage(parsed?.response?.usage ?? parsed?.usage) + if (usage) { + finalUsage = { + prompt_tokens: usage.promptTokens, + completion_tokens: usage.completionTokens, + total_tokens: usage.totalTokens, } + logger.info( + `[${requestId}] Received usage data: ${JSON.stringify(finalUsage)}` + ) } - } catch (parseError) { - logger.debug(`[${requestId}] Skipped non-JSON line: ${data.substring(0, 100)}`) } } } diff --git a/apps/sim/providers/openai/core.ts b/apps/sim/providers/openai/core.ts index f1c0b7f15b..1a0546128b 100644 --- a/apps/sim/providers/openai/core.ts +++ b/apps/sim/providers/openai/core.ts @@ -544,6 +544,11 @@ export async function executeResponsesProviderRequest( currentTrackingToolChoice ) + const latestText = extractResponseText(currentResponse.output) + if (latestText) { + content = latestText + } + const nextModelEndTime = Date.now() const thisModelTime = nextModelEndTime - nextModelStartTime From ee5490766516a8adc94c9adf6bf744b4dbfafc53 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Wed, 4 Feb 2026 14:20:39 -0800 Subject: [PATCH 09/13] Fix --- apps/sim/providers/openai/utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/sim/providers/openai/utils.ts b/apps/sim/providers/openai/utils.ts index 06196bc35c..664c0d8fc0 100644 --- a/apps/sim/providers/openai/utils.ts +++ b/apps/sim/providers/openai/utils.ts @@ -414,7 +414,7 @@ export function createReadableStreamFromResponses( if ( eventType === 'response.error' || eventType === 'error' || - activeEventType === 'response.failed' + eventType === 'response.failed' ) { const message = event?.error?.message || 'Responses API stream error' controller.error(new Error(message)) From da06e4a3bc4b96ae6abbeb8b37a262993d576aad Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Wed, 4 Feb 2026 14:50:55 -0800 Subject: [PATCH 10/13] Fix azure openai response format with tool calls --- apps/sim/providers/openai/core.ts | 91 ++++++++++++++++++++++++++++--- 1 file changed, 82 insertions(+), 9 deletions(-) diff --git a/apps/sim/providers/openai/core.ts b/apps/sim/providers/openai/core.ts index 1a0546128b..9592589859 100644 --- a/apps/sim/providers/openai/core.ts +++ b/apps/sim/providers/openai/core.ts @@ -144,23 +144,38 @@ export async function executeResponsesProviderRequest( } } + // Store response format config - for Azure with tools, we defer applying it until after tool calls complete + let deferredTextFormat: { type: string; name: string; schema: any; strict: boolean } | undefined + const hasTools = !!request.tools?.length + const isAzure = config.providerId === 'azure-openai' + if (request.responseFormat) { const isStrict = request.responseFormat.strict !== false const rawSchema = request.responseFormat.schema || request.responseFormat // OpenAI strict mode requires additionalProperties: false on ALL nested objects const cleanedSchema = isStrict ? enforceStrictSchema(rawSchema) : rawSchema - basePayload.text = { - ...(basePayload.text ?? {}), - format: { - type: 'json_schema', - name: request.responseFormat.name || 'response_schema', - schema: cleanedSchema, - strict: isStrict, - }, + const textFormat = { + type: 'json_schema' as const, + name: request.responseFormat.name || 'response_schema', + schema: cleanedSchema, + strict: isStrict, } - logger.info(`Added JSON schema response format to ${config.providerLabel} request`) + // Azure OpenAI has issues combining tools + response_format in the same request + // Defer the format until after tool calls complete for Azure + if (isAzure && hasTools) { + deferredTextFormat = textFormat + logger.info( + `Deferring JSON schema response format for ${config.providerLabel} (will apply after tool calls complete)` + ) + } else { + basePayload.text = { + ...(basePayload.text ?? {}), + format: textFormat, + } + logger.info(`Added JSON schema response format to ${config.providerLabel} request`) + } } const tools = request.tools?.length @@ -572,6 +587,64 @@ export async function executeResponsesProviderRequest( iterationCount++ } + // For Azure with deferred format: make a final call with the response format applied + // This only happens if we had tool calls and a deferred format + if (deferredTextFormat && iterationCount > 0) { + logger.info( + `Applying deferred JSON schema response format for ${config.providerLabel} after tool calls completed` + ) + + const finalFormatStartTime = Date.now() + + // Add the output items from the last response to the input + const lastOutputItems = convertResponseOutputToInputItems(currentResponse.output) + if (lastOutputItems.length) { + currentInput.push(...lastOutputItems) + } + + // Make final call with the response format - build payload without tools + const finalPayload: Record = { + model: config.modelName, + input: currentInput, + text: { + ...(basePayload.text ?? {}), + format: deferredTextFormat, + }, + } + + // Copy over non-tool related settings + if (request.temperature !== undefined) finalPayload.temperature = request.temperature + if (request.maxTokens != null) finalPayload.max_output_tokens = request.maxTokens + + currentResponse = await postResponses(finalPayload) + + const finalFormatEndTime = Date.now() + const finalFormatDuration = finalFormatEndTime - finalFormatStartTime + + timeSegments.push({ + type: 'model', + name: 'Final formatted response', + startTime: finalFormatStartTime, + endTime: finalFormatEndTime, + duration: finalFormatDuration, + }) + + modelTime += finalFormatDuration + + const finalUsage = parseResponsesUsage(currentResponse.usage) + if (finalUsage) { + tokens.input += finalUsage.promptTokens + tokens.output += finalUsage.completionTokens + tokens.total += finalUsage.totalTokens + } + + // Update content with the formatted response + const formattedText = extractResponseText(currentResponse.output) + if (formattedText) { + content = formattedText + } + } + if (request.stream) { logger.info('Using streaming for final response after tool processing') From a36d1a8fcaeafe8c9733471224973769aa1983da Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Wed, 4 Feb 2026 15:13:57 -0800 Subject: [PATCH 11/13] Fixes --- apps/sim/providers/openai/core.ts | 47 +++++++++++++++++++++++-------- 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/apps/sim/providers/openai/core.ts b/apps/sim/providers/openai/core.ts index 9592589859..0f4f4ee508 100644 --- a/apps/sim/providers/openai/core.ts +++ b/apps/sim/providers/openai/core.ts @@ -588,24 +588,36 @@ export async function executeResponsesProviderRequest( } // For Azure with deferred format: make a final call with the response format applied - // This only happens if we had tool calls and a deferred format - if (deferredTextFormat && iterationCount > 0) { + // This happens whenever we have a deferred format, even if no tools were called + // (the initial call was made without the format, so we need to apply it now) + let appliedDeferredFormat = false + if (deferredTextFormat) { logger.info( - `Applying deferred JSON schema response format for ${config.providerLabel} after tool calls completed` + `Applying deferred JSON schema response format for ${config.providerLabel} (iterationCount: ${iterationCount})` ) const finalFormatStartTime = Date.now() - // Add the output items from the last response to the input - const lastOutputItems = convertResponseOutputToInputItems(currentResponse.output) - if (lastOutputItems.length) { - currentInput.push(...lastOutputItems) + // Determine what input to use for the formatted call + let formattedInput: ResponsesInputItem[] + + if (iterationCount > 0) { + // Tools were called - include the conversation history with tool results + const lastOutputItems = convertResponseOutputToInputItems(currentResponse.output) + if (lastOutputItems.length) { + currentInput.push(...lastOutputItems) + } + formattedInput = currentInput + } else { + // No tools were called - just retry the initial call with format applied + // Don't include the model's previous unformatted response + formattedInput = initialInput } // Make final call with the response format - build payload without tools const finalPayload: Record = { model: config.modelName, - input: currentInput, + input: formattedInput, text: { ...(basePayload.text ?? {}), format: deferredTextFormat, @@ -643,19 +655,30 @@ export async function executeResponsesProviderRequest( if (formattedText) { content = formattedText } + + appliedDeferredFormat = true } - if (request.stream) { + // Skip streaming if we already applied deferred format - we have the formatted content + // Making another streaming call would lose the formatted response + if (request.stream && !appliedDeferredFormat) { logger.info('Using streaming for final response after tool processing') const accumulatedCost = calculateCost(request.model, tokens.input, tokens.output) + // For Azure with deferred format in streaming mode, include the format in the streaming call + const streamOverrides: Record = { stream: true, tool_choice: 'auto' } + if (deferredTextFormat) { + streamOverrides.text = { + ...(basePayload.text ?? {}), + format: deferredTextFormat, + } + } + const streamResponse = await fetch(config.endpoint, { method: 'POST', headers: config.headers, - body: JSON.stringify( - createRequestBody(currentInput, { stream: true, tool_choice: 'auto' }) - ), + body: JSON.stringify(createRequestBody(currentInput, streamOverrides)), }) if (!streamResponse.ok) { From 95bdc3300292d32aaf98657e67288d8a6120e3b8 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Wed, 4 Feb 2026 15:26:31 -0800 Subject: [PATCH 12/13] Fixes --- apps/sim/providers/openai/core.ts | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/apps/sim/providers/openai/core.ts b/apps/sim/providers/openai/core.ts index 0f4f4ee508..8ed4c93865 100644 --- a/apps/sim/providers/openai/core.ts +++ b/apps/sim/providers/openai/core.ts @@ -627,6 +627,18 @@ export async function executeResponsesProviderRequest( // Copy over non-tool related settings if (request.temperature !== undefined) finalPayload.temperature = request.temperature if (request.maxTokens != null) finalPayload.max_output_tokens = request.maxTokens + if (request.reasoningEffort !== undefined) { + finalPayload.reasoning = { + effort: request.reasoningEffort, + summary: 'auto', + } + } + if (request.verbosity !== undefined) { + finalPayload.text = { + ...finalPayload.text, + verbosity: request.verbosity, + } + } currentResponse = await postResponses(finalPayload) From 17eb90d96921e19bfeba3d148f8b6b43f2fc0d32 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Wed, 4 Feb 2026 15:40:21 -0800 Subject: [PATCH 13/13] Fix temp --- apps/sim/app/api/wand/route.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/sim/app/api/wand/route.ts b/apps/sim/app/api/wand/route.ts index 6df2034148..bba2c2c2df 100644 --- a/apps/sim/app/api/wand/route.ts +++ b/apps/sim/app/api/wand/route.ts @@ -507,7 +507,7 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg body: JSON.stringify({ model: useWandAzure ? wandModelName : 'gpt-4o', input: messages, - temperature: 0.3, + temperature: 0.2, max_output_tokens: 10000, }), })