Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 161 additions & 87 deletions apps/sim/app/api/wand/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { userStats, workflow } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import OpenAI, { AzureOpenAI } from 'openai'
import { getBYOKKey } from '@/lib/api-key/byok'
import { getSession } from '@/lib/auth'
import { logModelUsage } from '@/lib/billing/core/usage-log'
Expand All @@ -12,6 +11,7 @@ import { env } from '@/lib/core/config/env'
import { getCostMultiplier, isBillingEnabled } from '@/lib/core/config/feature-flags'
import { generateRequestId } from '@/lib/core/utils/request'
import { verifyWorkspaceMembership } from '@/app/api/workflows/utils'
import { extractResponseText, parseResponsesUsage } from '@/providers/openai/utils'
import { getModelPricing } from '@/providers/utils'

export const dynamic = 'force-dynamic'
Expand All @@ -28,18 +28,6 @@ const openaiApiKey = env.OPENAI_API_KEY

const useWandAzure = azureApiKey && azureEndpoint && azureApiVersion

const client = useWandAzure
? new AzureOpenAI({
apiKey: azureApiKey,
apiVersion: azureApiVersion,
endpoint: azureEndpoint,
})
: openaiApiKey
? new OpenAI({
apiKey: openaiApiKey,
})
: null

if (!useWandAzure && !openaiApiKey) {
logger.warn(
'Neither Azure OpenAI nor OpenAI API key found. Wand generation API will not function.'
Expand Down Expand Up @@ -202,20 +190,18 @@ export async function POST(req: NextRequest) {
}

let isBYOK = false
let activeClient = client
let byokApiKey: string | null = null
let activeOpenAIKey = openaiApiKey

if (workspaceId && !useWandAzure) {
const byokResult = await getBYOKKey(workspaceId, 'openai')
if (byokResult) {
isBYOK = true
byokApiKey = byokResult.apiKey
activeClient = new OpenAI({ apiKey: byokResult.apiKey })
activeOpenAIKey = byokResult.apiKey
logger.info(`[${requestId}] Using BYOK OpenAI key for wand generation`)
}
}

if (!activeClient) {
if (!useWandAzure && !activeOpenAIKey) {
logger.error(`[${requestId}] AI client not initialized. Missing API key.`)
return NextResponse.json(
{ success: false, error: 'Wand generation service is not configured.' },
Expand Down Expand Up @@ -276,17 +262,18 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg
)

const apiUrl = useWandAzure
? `${azureEndpoint}/openai/deployments/${wandModelName}/chat/completions?api-version=${azureApiVersion}`
: 'https://api.openai.com/v1/chat/completions'
? `${azureEndpoint?.replace(/\/$/, '')}/openai/v1/responses?api-version=${azureApiVersion}`
: 'https://api.openai.com/v1/responses'

const headers: Record<string, string> = {
'Content-Type': 'application/json',
'OpenAI-Beta': 'responses=v1',
}

if (useWandAzure) {
headers['api-key'] = azureApiKey!
} else {
headers.Authorization = `Bearer ${byokApiKey || openaiApiKey}`
headers.Authorization = `Bearer ${activeOpenAIKey}`
}

logger.debug(`[${requestId}] Making streaming request to: ${apiUrl}`)
Expand All @@ -296,11 +283,10 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg
headers,
body: JSON.stringify({
model: useWandAzure ? wandModelName : 'gpt-4o',
messages: messages,
input: messages,
temperature: 0.2,
max_tokens: 10000,
max_output_tokens: 10000,
stream: true,
stream_options: { include_usage: true },
}),
})

Expand All @@ -327,16 +313,29 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg
return
}

let finalUsage: any = null
let usageRecorded = false

const recordUsage = async () => {
if (usageRecorded || !finalUsage) {
return
}

usageRecorded = true
await updateUserStatsForWand(session.user.id, finalUsage, requestId, isBYOK)
}

try {
let buffer = ''
let chunkCount = 0
let finalUsage: any = null
let activeEventType: string | undefined

while (true) {
const { done, value } = await reader.read()

if (done) {
logger.info(`[${requestId}] Stream completed. Total chunks: ${chunkCount}`)
await recordUsage()
controller.enqueue(encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`))
controller.close()
break
Expand All @@ -348,47 +347,90 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg
buffer = lines.pop() || ''

for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6).trim()
const trimmed = line.trim()
if (!trimmed) {
continue
}

if (data === '[DONE]') {
logger.info(`[${requestId}] Received [DONE] signal`)
if (trimmed.startsWith('event:')) {
activeEventType = trimmed.slice(6).trim()
continue
}

if (finalUsage) {
await updateUserStatsForWand(session.user.id, finalUsage, requestId, isBYOK)
}
if (!trimmed.startsWith('data:')) {
continue
}

controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`)
)
controller.close()
return
}
const data = trimmed.slice(5).trim()
if (data === '[DONE]') {
logger.info(`[${requestId}] Received [DONE] signal`)

try {
const parsed = JSON.parse(data)
const content = parsed.choices?.[0]?.delta?.content
await recordUsage()

if (content) {
chunkCount++
if (chunkCount === 1) {
logger.info(`[${requestId}] Received first content chunk`)
}
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`)
)
controller.close()
return
}

let parsed: any
try {
parsed = JSON.parse(data)
} catch (parseError) {
logger.debug(`[${requestId}] Skipped non-JSON line: ${data.substring(0, 100)}`)
continue
}

const eventType = parsed?.type ?? activeEventType

if (
eventType === 'response.error' ||
eventType === 'error' ||
eventType === 'response.failed'
) {
throw new Error(parsed?.error?.message || 'Responses stream error')
}

if (
eventType === 'response.output_text.delta' ||
eventType === 'response.output_json.delta'
) {
let content = ''
if (typeof parsed.delta === 'string') {
content = parsed.delta
} else if (parsed.delta && typeof parsed.delta.text === 'string') {
content = parsed.delta.text
} else if (parsed.delta && parsed.delta.json !== undefined) {
content = JSON.stringify(parsed.delta.json)
} else if (parsed.json !== undefined) {
content = JSON.stringify(parsed.json)
} else if (typeof parsed.text === 'string') {
content = parsed.text
}

controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ chunk: content })}\n\n`)
)
if (content) {
chunkCount++
if (chunkCount === 1) {
logger.info(`[${requestId}] Received first content chunk`)
}

if (parsed.usage) {
finalUsage = parsed.usage
logger.info(
`[${requestId}] Received usage data: ${JSON.stringify(parsed.usage)}`
)
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ chunk: content })}\n\n`)
)
}
}

if (eventType === 'response.completed') {
const usage = parseResponsesUsage(parsed?.response?.usage ?? parsed?.usage)
if (usage) {
finalUsage = {
prompt_tokens: usage.promptTokens,
completion_tokens: usage.completionTokens,
total_tokens: usage.totalTokens,
}
} catch (parseError) {
logger.debug(
`[${requestId}] Skipped non-JSON line: ${data.substring(0, 100)}`
logger.info(
`[${requestId}] Received usage data: ${JSON.stringify(finalUsage)}`
)
}
}
Expand All @@ -401,6 +443,12 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg
stack: streamError?.stack,
})

try {
await recordUsage()
} catch (usageError) {
logger.warn(`[${requestId}] Failed to record usage after stream error`, usageError)
}

const errorData = `data: ${JSON.stringify({ error: 'Streaming failed', done: true })}\n\n`
controller.enqueue(encoder.encode(errorData))
controller.close()
Expand All @@ -424,8 +472,6 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg
message: error?.message || 'Unknown error',
code: error?.code,
status: error?.status,
responseStatus: error?.response?.status,
responseData: error?.response?.data ? safeStringify(error.response.data) : undefined,
stack: error?.stack,
useWandAzure,
model: useWandAzure ? wandModelName : 'gpt-4o',
Expand All @@ -440,14 +486,43 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg
}
}

const completion = await activeClient.chat.completions.create({
model: useWandAzure ? wandModelName : 'gpt-4o',
messages: messages,
temperature: 0.3,
max_tokens: 10000,
const apiUrl = useWandAzure
? `${azureEndpoint?.replace(/\/$/, '')}/openai/v1/responses?api-version=${azureApiVersion}`
: 'https://api.openai.com/v1/responses'

const headers: Record<string, string> = {
'Content-Type': 'application/json',
'OpenAI-Beta': 'responses=v1',
}

if (useWandAzure) {
headers['api-key'] = azureApiKey!
} else {
headers.Authorization = `Bearer ${activeOpenAIKey}`
}

const response = await fetch(apiUrl, {
method: 'POST',
headers,
body: JSON.stringify({
model: useWandAzure ? wandModelName : 'gpt-4o',
input: messages,
temperature: 0.2,
max_output_tokens: 10000,
}),
})

const generatedContent = completion.choices[0]?.message?.content?.trim()
if (!response.ok) {
const errorText = await response.text()
const apiError = new Error(
`API request failed: ${response.status} ${response.statusText} - ${errorText}`
)
;(apiError as any).status = response.status
throw apiError
}

const completion = await response.json()
const generatedContent = extractResponseText(completion.output)?.trim()

if (!generatedContent) {
logger.error(
Expand All @@ -461,8 +536,18 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg

logger.info(`[${requestId}] Wand generation successful`)

if (completion.usage) {
await updateUserStatsForWand(session.user.id, completion.usage, requestId, isBYOK)
const usage = parseResponsesUsage(completion.usage)
if (usage) {
await updateUserStatsForWand(
session.user.id,
{
prompt_tokens: usage.promptTokens,
completion_tokens: usage.completionTokens,
total_tokens: usage.totalTokens,
},
requestId,
isBYOK
)
}

return NextResponse.json({ success: true, content: generatedContent })
Expand All @@ -472,10 +557,6 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg
message: error?.message || 'Unknown error',
code: error?.code,
status: error?.status,
responseStatus: error instanceof OpenAI.APIError ? error.status : error?.response?.status,
responseData: (error as any)?.response?.data
? safeStringify((error as any).response.data)
: undefined,
stack: error?.stack,
useWandAzure,
model: useWandAzure ? wandModelName : 'gpt-4o',
Expand All @@ -484,26 +565,19 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg
})

let clientErrorMessage = 'Wand generation failed. Please try again later.'
let status = 500
let status = typeof (error as any)?.status === 'number' ? (error as any).status : 500

if (error instanceof OpenAI.APIError) {
status = error.status || 500
logger.error(
`[${requestId}] ${useWandAzure ? 'Azure OpenAI' : 'OpenAI'} API Error: ${status} - ${error.message}`
)

if (status === 401) {
clientErrorMessage = 'Authentication failed. Please check your API key configuration.'
} else if (status === 429) {
clientErrorMessage = 'Rate limit exceeded. Please try again later.'
} else if (status >= 500) {
clientErrorMessage =
'The wand generation service is currently unavailable. Please try again later.'
}
} else if (useWandAzure && error.message?.includes('DeploymentNotFound')) {
if (useWandAzure && error?.message?.includes('DeploymentNotFound')) {
clientErrorMessage =
'Azure OpenAI deployment not found. Please check your model deployment configuration.'
status = 404
} else if (status === 401) {
clientErrorMessage = 'Authentication failed. Please check your API key configuration.'
} else if (status === 429) {
clientErrorMessage = 'Rate limit exceeded. Please try again later.'
} else if (status >= 500) {
clientErrorMessage =
'The wand generation service is currently unavailable. Please try again later.'
}

return NextResponse.json(
Expand Down
Loading