Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions src/openlayer/lib/integrations/langchain_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,27 @@

# pylint: disable=unused-argument
import time
from typing import Any, Dict, List, Optional, Union, Callable
from typing import Any, Callable, Dict, List, Optional, Union
from uuid import UUID

try:
try:
from langchain_core import messages as langchain_schema
from langchain_core.callbacks.base import BaseCallbackHandler, AsyncCallbackHandler
from langchain_core.callbacks.base import (
AsyncCallbackHandler,
BaseCallbackHandler,
)
except ImportError:
from langchain import schema as langchain_schema
from langchain.callbacks.base import BaseCallbackHandler, AsyncCallbackHandler
from langchain.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler

HAVE_LANGCHAIN = True
except ImportError:
HAVE_LANGCHAIN = False


from ..tracing import tracer, steps, traces, enums
from .. import utils
from ..tracing import enums, steps, tracer, traces

LANGCHAIN_TO_OPENLAYER_PROVIDER_MAP = {
"azure-openai-chat": "Azure",
Expand Down Expand Up @@ -156,7 +159,11 @@ def _end_step(

# Only upload if this is a standalone trace (not integrated with external trace)
# If current_step is set, we're part of a larger trace and shouldn't upload
if is_root_step and run_id in self._traces_by_root and tracer.get_current_step() is None:
if (
is_root_step
and run_id in self._traces_by_root
and tracer.get_current_step() is None
):
trace = self._traces_by_root.pop(run_id)
self._process_and_upload_trace(trace)

Expand Down Expand Up @@ -187,8 +194,11 @@ def _process_and_upload_trace(self, trace: traces.Trace) -> None:
config.update({"ground_truth_column_name": "groundTruth"})
if "context" in trace_data:
config.update({"context_column_name": "context"})

root_step = trace.steps[0] if trace.steps else None
if (
isinstance(root_step, steps.ChatCompletionStep)
root_step
and isinstance(root_step, steps.ChatCompletionStep)
and root_step.inputs
and "prompt" in root_step.inputs
):
Expand Down Expand Up @@ -1107,7 +1117,7 @@ def _start_step(
# Check if we're in an existing trace context via ContextVars
current_step = tracer.get_current_step()
current_trace = tracer.get_current_trace()

if current_step is not None:
# We're inside an existing step context - add as nested
current_step.add_nested_step(step)
Expand All @@ -1128,7 +1138,7 @@ def _start_step(
trace = traces.Trace()
trace.add_step(step)
self._traces_by_root[run_id] = trace

# Track root steps
if parent_run_id is None:
self.root_steps.add(run_id)
Expand Down Expand Up @@ -1176,7 +1186,7 @@ def _end_step(

# Only upload if this is a standalone trace (not integrated with external trace)
has_standalone_trace = run_id in self._traces_by_root

# Only upload if: root step + has standalone trace + not part of external trace
if is_root_step and has_standalone_trace and not self._has_external_trace:
trace = self._traces_by_root.pop(run_id)
Expand Down