diff --git a/src/openlayer/lib/integrations/langchain_callback.py b/src/openlayer/lib/integrations/langchain_callback.py index 10ff3982..6d3da477 100644 --- a/src/openlayer/lib/integrations/langchain_callback.py +++ b/src/openlayer/lib/integrations/langchain_callback.py @@ -2,24 +2,27 @@ # pylint: disable=unused-argument import time -from typing import Any, Dict, List, Optional, Union, Callable +from typing import Any, Callable, Dict, List, Optional, Union from uuid import UUID try: try: from langchain_core import messages as langchain_schema - from langchain_core.callbacks.base import BaseCallbackHandler, AsyncCallbackHandler + from langchain_core.callbacks.base import ( + AsyncCallbackHandler, + BaseCallbackHandler, + ) except ImportError: from langchain import schema as langchain_schema - from langchain.callbacks.base import BaseCallbackHandler, AsyncCallbackHandler + from langchain.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler HAVE_LANGCHAIN = True except ImportError: HAVE_LANGCHAIN = False -from ..tracing import tracer, steps, traces, enums from .. import utils +from ..tracing import enums, steps, tracer, traces LANGCHAIN_TO_OPENLAYER_PROVIDER_MAP = { "azure-openai-chat": "Azure", @@ -156,7 +159,11 @@ def _end_step( # Only upload if this is a standalone trace (not integrated with external trace) # If current_step is set, we're part of a larger trace and shouldn't upload - if is_root_step and run_id in self._traces_by_root and tracer.get_current_step() is None: + if ( + is_root_step + and run_id in self._traces_by_root + and tracer.get_current_step() is None + ): trace = self._traces_by_root.pop(run_id) self._process_and_upload_trace(trace) @@ -187,8 +194,11 @@ def _process_and_upload_trace(self, trace: traces.Trace) -> None: config.update({"ground_truth_column_name": "groundTruth"}) if "context" in trace_data: config.update({"context_column_name": "context"}) + + root_step = trace.steps[0] if trace.steps else None if ( - isinstance(root_step, steps.ChatCompletionStep) + root_step + and isinstance(root_step, steps.ChatCompletionStep) and root_step.inputs and "prompt" in root_step.inputs ): @@ -1107,7 +1117,7 @@ def _start_step( # Check if we're in an existing trace context via ContextVars current_step = tracer.get_current_step() current_trace = tracer.get_current_trace() - + if current_step is not None: # We're inside an existing step context - add as nested current_step.add_nested_step(step) @@ -1128,7 +1138,7 @@ def _start_step( trace = traces.Trace() trace.add_step(step) self._traces_by_root[run_id] = trace - + # Track root steps if parent_run_id is None: self.root_steps.add(run_id) @@ -1176,7 +1186,7 @@ def _end_step( # Only upload if this is a standalone trace (not integrated with external trace) has_standalone_trace = run_id in self._traces_by_root - + # Only upload if: root step + has standalone trace + not part of external trace if is_root_step and has_standalone_trace and not self._has_external_trace: trace = self._traces_by_root.pop(run_id)