diff --git a/langfuse/__init__.py b/langfuse/__init__.py index 08d8325cf..ec441e745 100644 --- a/langfuse/__init__.py +++ b/langfuse/__init__.py @@ -36,6 +36,14 @@ is_known_llm_instrumentor, is_langfuse_span, ) +from .types import ( + MaskOtelSpansFunction, + MaskOtelSpansParams, + MaskOtelSpansResult, + OtelSpanData, + OtelSpanIdentifier, + OtelSpanPatch, +) Langfuse = _client_module.Langfuse @@ -71,6 +79,12 @@ "is_genai_span", "is_known_llm_instrumentor", "KNOWN_LLM_INSTRUMENTATION_SCOPE_PREFIXES", + "MaskOtelSpansFunction", + "MaskOtelSpansParams", + "MaskOtelSpansResult", + "OtelSpanData", + "OtelSpanIdentifier", + "OtelSpanPatch", "experiment", "api", ] diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 5f7c0f288..72a51fa72 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -132,7 +132,13 @@ PromptClient, TextPromptClient, ) -from langfuse.types import MaskFunction, ScoreDataType, SpanLevel, TraceContext +from langfuse.types import ( + MaskFunction, + MaskOtelSpansFunction, + ScoreDataType, + SpanLevel, + TraceContext, +) class Langfuse: @@ -169,7 +175,40 @@ class Langfuse: release (Optional[str]): Release version/hash of your application. Used for grouping analytics by release. media_upload_thread_count (Optional[int]): Number of background threads for handling media uploads. Defaults to 1. Can also be set via LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT environment variable. sample_rate (Optional[float]): Sampling rate for traces (0.0 to 1.0). Defaults to 1.0 (100% of traces are sampled). Can also be set via LANGFUSE_SAMPLE_RATE environment variable. - mask (Optional[MaskFunction]): Function to mask sensitive data in traces before sending to the API. + mask (Optional[MaskFunction]): Function to mask sensitive data synchronously when Langfuse SDK attributes are created. This applies only to data set through Langfuse SDK APIs such as `start_observation()`, `update()`, and `set_trace_io()`. + mask_otel_spans (Optional[MaskOtelSpansFunction]): Synchronous export-stage hook for masking raw OpenTelemetry span attributes before this Langfuse client sends them to Langfuse. Use this for spans created by third-party OpenTelemetry instrumentations, or when you need to inspect final span attributes after export filtering and Langfuse media handling. It does not modify spans already exported through other OpenTelemetry exporters. + + The hook receives one OpenTelemetry export batch. A batch is not guaranteed to contain a complete trace, request, or Langfuse observation tree. The hook usually runs on the OpenTelemetry batch span processor worker thread; during `flush()` and shutdown it may run on the caller thread. Keep it synchronous, deterministic, and fast. + + Return `None` to leave the batch unchanged. Return `MaskOtelSpansResult` with `OtelSpanPatch` values to delete or replace attributes on selected spans. If the hook raises or returns an invalid batch result, Langfuse drops the whole export batch. If one returned span patch is invalid, Langfuse drops only that span from the Langfuse export. + + Example: + ```python + from typing import Optional + + from langfuse import Langfuse + from langfuse.types import ( + MaskOtelSpansParams, + MaskOtelSpansResult, + OtelSpanPatch, + ) + + def mask_otel_spans( + *, params: MaskOtelSpansParams + ) -> Optional[MaskOtelSpansResult]: + patches = {} + + for identifier, span in params.spans.items(): + if "gen_ai.prompt.0.content" in span.attributes: + patches[identifier] = OtelSpanPatch( + delete_attributes=("gen_ai.prompt.0.content",), + set_attributes={"masking.applied": True}, + ) + + return MaskOtelSpansResult(span_patches=patches) + + langfuse = Langfuse(mask_otel_spans=mask_otel_spans) + ``` blocked_instrumentation_scopes (Optional[List[str]]): Deprecated. Use `should_export_span` instead. Equivalent behavior: ```python from langfuse.span_filter import is_default_export_span @@ -246,6 +285,7 @@ def __init__( media_upload_thread_count: Optional[int] = None, sample_rate: Optional[float] = None, mask: Optional[MaskFunction] = None, + mask_otel_spans: Optional[MaskOtelSpansFunction] = None, blocked_instrumentation_scopes: Optional[List[str]] = None, should_export_span: Optional[Callable[[ReadableSpan], bool]] = None, additional_headers: Optional[Dict[str, str]] = None, @@ -342,6 +382,7 @@ def __init__( media_upload_thread_count=media_upload_thread_count, sample_rate=sample_rate, mask=mask, + mask_otel_spans=mask_otel_spans, tracing_enabled=self._tracing_enabled, blocked_instrumentation_scopes=blocked_instrumentation_scopes, should_export_span=should_export_span, diff --git a/langfuse/_client/get_client.py b/langfuse/_client/get_client.py index e2ebab0ed..ad1c14855 100644 --- a/langfuse/_client/get_client.py +++ b/langfuse/_client/get_client.py @@ -50,6 +50,7 @@ def _create_client_from_instance( media_upload_thread_count=instance.media_upload_thread_count, sample_rate=instance.sample_rate, mask=instance.mask, + mask_otel_spans=instance.mask_otel_spans, blocked_instrumentation_scopes=instance.blocked_instrumentation_scopes, should_export_span=instance.should_export_span, additional_headers=instance.additional_headers, diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 2d42f6ce1..004566c8f 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -45,7 +45,7 @@ from langfuse._utils.request import LangfuseClient from langfuse.api import AsyncLangfuseAPI, LangfuseAPI from langfuse.logger import langfuse_logger -from langfuse.types import MaskFunction +from langfuse.types import MaskFunction, MaskOtelSpansFunction from .._version import __version__ as langfuse_version @@ -94,6 +94,7 @@ def __new__( media_upload_thread_count: Optional[int] = None, sample_rate: Optional[float] = None, mask: Optional[MaskFunction] = None, + mask_otel_spans: Optional[MaskOtelSpansFunction] = None, tracing_enabled: Optional[bool] = None, blocked_instrumentation_scopes: Optional[List[str]] = None, should_export_span: Optional[Callable[[ReadableSpan], bool]] = None, @@ -128,6 +129,7 @@ def __new__( media_upload_thread_count=media_upload_thread_count, sample_rate=sample_rate, mask=mask, + mask_otel_spans=mask_otel_spans, tracing_enabled=tracing_enabled if tracing_enabled is not None else True, @@ -157,6 +159,7 @@ def _initialize_instance( httpx_client: Optional[httpx.Client] = None, sample_rate: Optional[float] = None, mask: Optional[MaskFunction] = None, + mask_otel_spans: Optional[MaskOtelSpansFunction] = None, tracing_enabled: bool = True, blocked_instrumentation_scopes: Optional[List[str]] = None, should_export_span: Optional[Callable[[ReadableSpan], bool]] = None, @@ -169,6 +172,7 @@ def _initialize_instance( self.tracing_enabled = tracing_enabled self.base_url = base_url self.mask = mask + self.mask_otel_spans = mask_otel_spans self.environment = environment # Store additional client settings for get_client() to use @@ -184,33 +188,6 @@ def _initialize_instance( self.span_exporter = span_exporter self.tracer_provider: Optional[TracerProvider] = None - # OTEL Tracer - if tracing_enabled: - tracer_provider = tracer_provider or _init_tracer_provider( - environment=environment, release=release, sample_rate=sample_rate - ) - self.tracer_provider = tracer_provider - - langfuse_processor = LangfuseSpanProcessor( - public_key=self.public_key, - secret_key=secret_key, - base_url=base_url, - timeout=timeout, - flush_at=flush_at, - flush_interval=flush_interval, - blocked_instrumentation_scopes=blocked_instrumentation_scopes, - should_export_span=should_export_span, - additional_headers=additional_headers, - span_exporter=span_exporter, - ) - tracer_provider.add_span_processor(langfuse_processor) - - self._otel_tracer = tracer_provider.get_tracer( - LANGFUSE_TRACER_NAME, - langfuse_version, - attributes={"public_key": self.public_key}, - ) - # API Clients ## API clients must be singletons because the underlying HTTPX clients @@ -266,6 +243,35 @@ def _initialize_instance( ) self._media_upload_consumers = [] + # OTEL Tracer + if tracing_enabled: + tracer_provider = tracer_provider or _init_tracer_provider( + environment=environment, release=release, sample_rate=sample_rate + ) + self.tracer_provider = tracer_provider + + langfuse_processor = LangfuseSpanProcessor( + public_key=self.public_key, + secret_key=secret_key, + base_url=base_url, + timeout=timeout, + flush_at=flush_at, + flush_interval=flush_interval, + blocked_instrumentation_scopes=blocked_instrumentation_scopes, + should_export_span=should_export_span, + additional_headers=additional_headers, + span_exporter=span_exporter, + media_manager=self._media_manager, + mask_otel_spans=mask_otel_spans, + ) + tracer_provider.add_span_processor(langfuse_processor) + + self._otel_tracer = tracer_provider.get_tracer( + LANGFUSE_TRACER_NAME, + langfuse_version, + attributes={"public_key": self.public_key}, + ) + media_upload_thread_count = media_upload_thread_count or max( int(os.getenv(LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT, 1)), 1 ) diff --git a/langfuse/_client/span_exporter.py b/langfuse/_client/span_exporter.py new file mode 100644 index 000000000..fd23c2b24 --- /dev/null +++ b/langfuse/_client/span_exporter.py @@ -0,0 +1,661 @@ +import json +from collections.abc import Mapping as MappingCollection +from collections.abc import Sequence as SequenceCollection +from types import MappingProxyType +from typing import Any, Dict, Optional, Sequence, cast + +from opentelemetry.attributes import BoundedAttributes +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult +from opentelemetry.sdk.util import BoundedList +from opentelemetry.trace import format_span_id, format_trace_id +from opentelemetry.util.types import AttributeValue + +from langfuse._client.attributes import LangfuseOtelSpanAttributes +from langfuse._task_manager.media_manager import MediaManager +from langfuse._utils.serializer import EventSerializer +from langfuse.logger import langfuse_logger +from langfuse.media import LangfuseMedia +from langfuse.types import ( + MaskOtelSpansFunction, + MaskOtelSpansParams, + MaskOtelSpansResult, + OtelSpanData, + OtelSpanIdentifier, + OtelSpanPatch, +) + +_INPUT_MEDIA_ATTRIBUTE_KEYS = frozenset( + { + LangfuseOtelSpanAttributes.TRACE_INPUT, + LangfuseOtelSpanAttributes.OBSERVATION_INPUT, + "ai.prompt.messages", + "ai.prompt", + "ai.toolCall.args", + "gcp.vertex.agent.llm_request", + "gcp.vertex.agent.tool_call_args", + "prompt", + "lk.input_text", + "lk.user_transcript", + "lk.chat_ctx", + "lk.user_input", + "mlflow.spanInputs", + "traceloop.entity.input", + "input.value", + "pydantic_ai.all_messages", + "gen_ai.system_instructions", + "input", + "gen_ai.input.messages", + "gen_ai.tool.call.arguments", + "genkit:input", + "tool_arguments", + } +) + +_OUTPUT_MEDIA_ATTRIBUTE_KEYS = frozenset( + { + LangfuseOtelSpanAttributes.TRACE_OUTPUT, + LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT, + "ai.response.text", + "ai.result.text", + "ai.toolCall.result", + "ai.response.object", + "ai.result.object", + "ai.response.toolCalls", + "ai.result.toolCalls", + "gcp.vertex.agent.llm_response", + "gcp.vertex.agent.tool_response", + "all_messages_events", + "lk.function_tool.output", + "lk.response.text", + "mlflow.spanOutputs", + "traceloop.entity.output", + "output.value", + "final_result", + "output", + "gen_ai.output.messages", + "gen_ai.tool.call.result", + "genkit:output", + "tool_response", + } +) + +_INPUT_MEDIA_ATTRIBUTE_PREFIXES = ( + "gen_ai.prompt", + "llm.input_messages", +) + +_OUTPUT_MEDIA_ATTRIBUTE_PREFIXES = ( + "gen_ai.completion", + "llm.output_messages", +) + + +class LangfuseTransformingSpanExporter(SpanExporter): + """Apply Langfuse export-stage transformations before delegating export.""" + + def __init__( + self, + *, + exporter: SpanExporter, + media_manager: Optional[MediaManager], + mask_otel_spans: Optional[MaskOtelSpansFunction], + ) -> None: + self._exporter = exporter + self._media_manager = media_manager + self._mask_otel_spans = mask_otel_spans + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + span_attributes = [ + ( + span, + self._process_media_attributes( + span=span, + attributes=dict(span.attributes or {}), + ), + ) + for span in spans + ] + + if self._mask_otel_spans is not None: + masked_span_attributes = self._apply_mask_otel_spans( + span_attributes=span_attributes + ) + + if masked_span_attributes is None: + return SpanExportResult.SUCCESS + + span_attributes = masked_span_attributes + + transformed_spans = [ + self._clone_span(span=span, attributes=attributes) + for span, attributes in span_attributes + ] + + if not transformed_spans: + return SpanExportResult.SUCCESS + + return self._exporter.export(transformed_spans) + + def shutdown(self) -> None: + self._exporter.shutdown() + + def force_flush(self, timeout_millis: int = 30000) -> bool: + return self._exporter.force_flush(timeout_millis=timeout_millis) + + def _process_media_attributes( + self, *, span: ReadableSpan, attributes: Dict[str, AttributeValue] + ) -> Dict[str, AttributeValue]: + if self._media_manager is None: + return attributes + + processed_attributes: Dict[str, AttributeValue] = {} + + for key, value in attributes.items(): + try: + processed_attributes[key] = self._process_media_attribute_value( + span=span, + attribute_key=key, + value=value, + ) + except Exception as error: + langfuse_logger.warning( + "Media processing error: Failed to process span attribute before export. " + f"Leaving attribute unchanged. span_name='{span.name}' " + f"trace_id='{_get_trace_id(span)}' span_id='{_get_span_id(span)}' " + f"attribute_key='{key}' error='{error}'" + ) + processed_attributes[key] = value + + return processed_attributes + + def _process_media_attribute_value( + self, + *, + span: ReadableSpan, + attribute_key: str, + value: AttributeValue, + ) -> AttributeValue: + if isinstance(value, str): + return self._process_media_string( + span=span, + attribute_key=attribute_key, + value=value, + ) + + if _is_attribute_sequence(value): + sequence_value = cast(Sequence[Any], value) + + return cast( + AttributeValue, + tuple( + self._process_media_string( + span=span, + attribute_key=attribute_key, + value=item, + ) + if isinstance(item, str) + else item + for item in sequence_value + ), + ) + + return value + + def _process_media_string( + self, + *, + span: ReadableSpan, + attribute_key: str, + value: str, + ) -> str: + media_manager = cast(MediaManager, self._media_manager) + field = _media_field_for_attribute(attribute_key) + + if _is_base64_data_uri(value): + trace_id, observation_id = _get_required_media_processing_identifiers(span) + processed_direct_value = media_manager._find_and_process_media( + data=value, + trace_id=trace_id, + observation_id=observation_id, + field=field, + ) + + direct_reference = _media_reference_string(processed_direct_value) + + if direct_reference is not None: + return direct_reference + + if processed_direct_value != value: + return _serialize_media_value(processed_direct_value, fallback=value) + + stripped_value = value.lstrip() + + if not stripped_value.startswith(("{", "[")): + return value + + if not _may_contain_serialized_media(value): + return value + + try: + parsed_value = json.loads(value) + except Exception: + return value + + if not isinstance(parsed_value, (dict, list)): + return value + + trace_id, observation_id = _get_required_media_processing_identifiers(span) + processed_json_value = media_manager._find_and_process_media( + data=parsed_value, + trace_id=trace_id, + observation_id=observation_id, + field=field, + ) + + if processed_json_value == parsed_value: + return value + + return _serialize_media_value(processed_json_value, fallback=value) + + def _apply_mask_otel_spans( + self, + *, + span_attributes: Sequence[tuple[ReadableSpan, Dict[str, AttributeValue]]], + ) -> Optional[list[tuple[ReadableSpan, Dict[str, AttributeValue]]]]: + mask_otel_spans = cast(MaskOtelSpansFunction, self._mask_otel_spans) + maskable_span_attributes: list[ + tuple[ReadableSpan, Dict[str, AttributeValue]] + ] = [] + span_data_by_identifier: Dict[OtelSpanIdentifier, OtelSpanData] = {} + + for span, attributes in span_attributes: + if not _has_valid_span_context(span): + langfuse_logger.warning( + "Masking error: Dropping span from export because span context is missing or invalid. " + f"span_name='{span.name}'" + ) + continue + + identifier = _create_otel_span_identifier(span) + + if identifier in span_data_by_identifier: + langfuse_logger.error( + "Masking error: mask_otel_spans received duplicate span identifiers. " + "Dropping export batch. " + f"trace_id='{identifier.trace_id}' span_id='{identifier.span_id}'" + ) + return None + + span_data_by_identifier[identifier] = _create_otel_span_data( + span=span, attributes=attributes, identifier=identifier + ) + maskable_span_attributes.append((span, attributes)) + + if not maskable_span_attributes: + return [] + + try: + result: Any = mask_otel_spans( + params=MaskOtelSpansParams( + spans=MappingProxyType(dict(span_data_by_identifier)) + ) + ) + except Exception as error: + langfuse_logger.error( + "Masking error: mask_otel_spans raised an exception. " + f"Dropping export batch. span_count={len(span_attributes)} " + f"error='{error}'" + ) + return None + + if result is None: + return maskable_span_attributes + + if not isinstance(result, MaskOtelSpansResult): + langfuse_logger.error( + "Masking error: mask_otel_spans returned an invalid result. " + f"Dropping export batch. result_type='{type(result).__name__}'" + ) + return None + + span_patches: Any = result.span_patches + + if not isinstance(span_patches, MappingCollection): + langfuse_logger.error( + "Masking error: mask_otel_spans returned invalid span_patches. " + f"Dropping export batch. " + f"span_patches_type='{type(span_patches).__name__}'" + ) + return None + + span_identifiers = set(span_data_by_identifier) + + for identifier in span_patches: + if identifier not in span_identifiers: + langfuse_logger.error( + "Masking error: mask_otel_spans returned a patch for an unknown " + "span identifier. Dropping export batch. " + f"identifier_type='{type(identifier).__name__}'" + ) + return None + + masked_span_attributes: list[ + tuple[ReadableSpan, Dict[str, AttributeValue]] + ] = [] + + for span, attributes in maskable_span_attributes: + identifier = _create_otel_span_identifier(span) + patch = span_patches.get(identifier) + + if patch is None: + masked_span_attributes.append((span, attributes)) + continue + + patched_attributes = self._apply_otel_span_patch( + span=span, + attributes=attributes, + patch=patch, + ) + + if patched_attributes is not None: + masked_span_attributes.append((span, patched_attributes)) + + return masked_span_attributes + + def _apply_otel_span_patch( + self, + *, + span: ReadableSpan, + attributes: Dict[str, AttributeValue], + patch: Any, + ) -> Optional[Dict[str, AttributeValue]]: + if not isinstance(patch, OtelSpanPatch): + langfuse_logger.error( + "Masking error: mask_otel_spans returned an invalid span patch. " + "Dropping span. " + f"span_name='{span.name}' trace_id='{_get_trace_id(span)}' " + f"span_id='{_get_span_id(span)}' patch_type='{type(patch).__name__}'" + ) + return None + + set_attributes: Any = patch.set_attributes + delete_attributes: Any = patch.delete_attributes + + if not isinstance(set_attributes, MappingCollection): + langfuse_logger.error( + "Masking error: mask_otel_spans returned invalid set_attributes. " + "Dropping span. " + f"span_name='{span.name}' trace_id='{_get_trace_id(span)}' " + f"span_id='{_get_span_id(span)}' " + f"set_attributes_type='{type(set_attributes).__name__}'" + ) + return None + + if isinstance(delete_attributes, (str, bytes)) or not isinstance( + delete_attributes, SequenceCollection + ): + langfuse_logger.error( + "Masking error: mask_otel_spans returned invalid delete_attributes. " + "Dropping span. " + f"span_name='{span.name}' trace_id='{_get_trace_id(span)}' " + f"span_id='{_get_span_id(span)}' " + f"delete_attributes_type='{type(delete_attributes).__name__}'" + ) + return None + + masked_attributes = dict(attributes) + + for key in delete_attributes: + if not _is_valid_attribute_key(key): + langfuse_logger.warning( + "Masking error: mask_otel_spans requested deletion with an invalid attribute key. " + f"Ignoring delete entry. span_name='{span.name}' " + f"trace_id='{_get_trace_id(span)}' span_id='{_get_span_id(span)}' " + f"delete_key_type='{type(key).__name__}'" + ) + continue + + masked_attributes.pop(key, None) + + for key, value in set_attributes.items(): + if not _is_valid_attribute_key(key): + langfuse_logger.warning( + "Masking error: mask_otel_spans returned an invalid set_attributes key. " + f"Ignoring set entry. span_name='{span.name}' " + f"trace_id='{_get_trace_id(span)}' span_id='{_get_span_id(span)}' " + f"attribute_key_type='{type(key).__name__}'" + ) + continue + + cleaned_attribute = _clean_attribute_value(key=key, value=value) + + if cleaned_attribute is None: + masked_attributes.pop(key, None) + langfuse_logger.warning( + "Masking error: mask_otel_spans returned an invalid attribute value. " + f"Deleting attribute from export. span_name='{span.name}' " + f"trace_id='{_get_trace_id(span)}' span_id='{_get_span_id(span)}' " + f"attribute_key='{key}' value_type='{type(value).__name__}'" + ) + continue + + masked_attributes[key] = cleaned_attribute + + return masked_attributes + + @staticmethod + def _clone_span( + *, span: ReadableSpan, attributes: Dict[str, AttributeValue] + ) -> ReadableSpan: + return ReadableSpan( + name=span.name, + context=span.context, + parent=span.parent, + resource=span.resource, + attributes=_clone_attributes_with_dropped_count(span, attributes), + events=_clone_events_with_dropped_count(span), + links=_clone_links_with_dropped_count(span), + kind=span.kind, + status=span.status, + start_time=span.start_time, + end_time=span.end_time, + instrumentation_info=getattr(span, "_instrumentation_info", None), + instrumentation_scope=span.instrumentation_scope, + ) + + +def _create_otel_span_identifier(span: ReadableSpan) -> OtelSpanIdentifier: + return OtelSpanIdentifier( + trace_id=_get_trace_id(span), + span_id=_get_span_id(span), + ) + + +def _has_valid_span_context(span: ReadableSpan) -> bool: + context = span.context + + if context is None: + return False + + is_valid = getattr(context, "is_valid", None) + + if isinstance(is_valid, bool): + return is_valid + + return bool( + getattr(context, "trace_id", None) and getattr(context, "span_id", None) + ) + + +def _create_otel_span_data( + *, + span: ReadableSpan, + attributes: Dict[str, AttributeValue], + identifier: OtelSpanIdentifier, +) -> OtelSpanData: + instrumentation_scope = span.instrumentation_scope + + return OtelSpanData( + trace_id=identifier.trace_id, + span_id=identifier.span_id, + parent_span_id=_get_parent_span_id(span), + name=span.name, + instrumentation_scope_name=instrumentation_scope.name + if instrumentation_scope + else None, + instrumentation_scope_version=instrumentation_scope.version + if instrumentation_scope + else None, + attributes=MappingProxyType(dict(attributes)), + resource_attributes=MappingProxyType(dict(span.resource.attributes or {})), + ) + + +def _clean_attribute_value(*, key: str, value: Any) -> Optional[AttributeValue]: + cleaned_attributes = BoundedAttributes(maxlen=1, immutable=False) + + try: + cleaned_attributes[key] = value + except Exception: + return None + + if key not in cleaned_attributes: + return None + + return cast(AttributeValue, cleaned_attributes[key]) + + +def _is_valid_attribute_key(key: Any) -> bool: + return isinstance(key, str) and bool(key) + + +def _is_attribute_sequence(value: AttributeValue) -> bool: + return isinstance(value, SequenceCollection) and not isinstance(value, (str, bytes)) + + +def _is_base64_data_uri(value: str) -> bool: + if not value.startswith("data:") or "," not in value: + return False + + header, _ = value.split(",", 1) + + return header.endswith(";base64") + + +def _may_contain_serialized_media(value: str) -> bool: + if "data:" in value or "inline_data" in value or "inlineData" in value: + return True + + has_media_type_key = ( + '"media_type"' in value or '"mime_type"' in value or '"mimeType"' in value + ) + + return has_media_type_key and '"data"' in value + + +def _media_reference_string(value: Any) -> Optional[str]: + if not isinstance(value, LangfuseMedia): + return None + + return value._reference_string + + +def _media_upload_failed_marker(content_type: Any) -> str: + return f"" + + +def _get_required_media_processing_identifiers(span: ReadableSpan) -> tuple[str, str]: + context = span.context + + if context is None: + raise ValueError("Span context is required for media processing") + + trace_id = getattr(context, "trace_id", None) + span_id = getattr(context, "span_id", None) + + if not trace_id or not span_id: + raise ValueError("Trace ID and span ID are required for media processing") + + return format_trace_id(trace_id), format_span_id(span_id) + + +def _serialize_media_value(value: Any, *, fallback: str) -> str: + if isinstance(value, str): + return value + + if isinstance(value, LangfuseMedia): + return _media_reference_string(value) or _media_upload_failed_marker( + value._content_type + ) + + try: + return json.dumps(value, cls=EventSerializer) + except Exception: + return fallback + + +def _clone_attributes_with_dropped_count( + span: ReadableSpan, attributes: Dict[str, AttributeValue] +) -> BoundedAttributes: + cloned_attributes = BoundedAttributes( + maxlen=None, + attributes=attributes, + immutable=True, + ) + cloned_attributes.dropped = span.dropped_attributes + + return cloned_attributes + + +def _clone_events_with_dropped_count(span: ReadableSpan) -> BoundedList: + cloned_events: BoundedList = BoundedList(cast(Any, None)) + cloned_events.extend(span.events) + cloned_events.dropped = span.dropped_events + + return cloned_events + + +def _clone_links_with_dropped_count(span: ReadableSpan) -> BoundedList: + cloned_links: BoundedList = BoundedList(cast(Any, None)) + cloned_links.extend(span.links) + cloned_links.dropped = span.dropped_links + + return cloned_links + + +def _media_field_for_attribute( + attribute_key: str, +) -> str: + if attribute_key in _INPUT_MEDIA_ATTRIBUTE_KEYS or attribute_key.startswith( + _INPUT_MEDIA_ATTRIBUTE_PREFIXES + ): + return "input" + + if attribute_key in _OUTPUT_MEDIA_ATTRIBUTE_KEYS or attribute_key.startswith( + _OUTPUT_MEDIA_ATTRIBUTE_PREFIXES + ): + return "output" + + return "metadata" + + +def _get_trace_id(span: ReadableSpan) -> str: + if span.context is None: + return "" + + return format_trace_id(span.context.trace_id) + + +def _get_span_id(span: ReadableSpan) -> str: + if span.context is None: + return "" + + return format_span_id(span.context.span_id) + + +def _get_parent_span_id(span: ReadableSpan) -> Optional[str]: + if span.parent is None: + return None + + return format_span_id(span.parent.span_id) diff --git a/langfuse/_client/span_processor.py b/langfuse/_client/span_processor.py index a684a8813..5f5d08ad3 100644 --- a/langfuse/_client/span_processor.py +++ b/langfuse/_client/span_processor.py @@ -28,10 +28,13 @@ LANGFUSE_OTEL_TRACES_EXPORT_PATH, ) from langfuse._client.propagation import _get_propagated_attributes_from_context +from langfuse._client.span_exporter import LangfuseTransformingSpanExporter from langfuse._client.span_filter import is_default_export_span, is_langfuse_span from langfuse._client.utils import span_formatter +from langfuse._task_manager.media_manager import MediaManager from langfuse._version import __version__ as langfuse_version from langfuse.logger import langfuse_logger +from langfuse.types import MaskOtelSpansFunction class LangfuseSpanProcessor(BatchSpanProcessor): @@ -64,6 +67,8 @@ def __init__( should_export_span: Optional[Callable[[ReadableSpan], bool]] = None, additional_headers: Optional[Dict[str, str]] = None, span_exporter: Optional[SpanExporter] = None, + media_manager: Optional[MediaManager] = None, + mask_otel_spans: Optional[MaskOtelSpansFunction] = None, ): self.public_key = public_key self.blocked_instrumentation_scopes = ( @@ -113,6 +118,13 @@ def __init__( timeout=timeout, ) + if media_manager is not None or mask_otel_spans is not None: + span_exporter = LangfuseTransformingSpanExporter( + exporter=span_exporter, + media_manager=media_manager, + mask_otel_spans=mask_otel_spans, + ) + super().__init__( span_exporter=span_exporter, export_timeout_millis=timeout * 1_000 if timeout else None, diff --git a/langfuse/_task_manager/media_manager.py b/langfuse/_task_manager/media_manager.py index 7a7123798..10ce313ee 100644 --- a/langfuse/_task_manager/media_manager.py +++ b/langfuse/_task_manager/media_manager.py @@ -21,6 +21,14 @@ _SHUTDOWN_SENTINEL = object() +def _is_langfuse_media_reference(value: Any) -> bool: + return ( + isinstance(value, str) + and value.startswith("@@@langfuseMedia:") + and value.endswith("@@@") + ) + + class MediaManager: def __init__( self, @@ -127,6 +135,9 @@ def _process_data_recursively(data: Any, level: int) -> Any: and "media_type" in data and "data" in data ): + if _is_langfuse_media_reference(data["data"]): + return data + media = LangfuseMedia( base64_data_uri=f"data:{data['media_type']};base64," + data["data"], ) @@ -151,6 +162,9 @@ def _process_data_recursively(data: Any, level: int) -> Any: and "mime_type" in data and "data" in data ): + if _is_langfuse_media_reference(data["data"]): + return data + media = LangfuseMedia( base64_data_uri=f"data:{data['mime_type']};base64," + data["data"], ) @@ -167,6 +181,43 @@ def _process_data_recursively(data: Any, level: int) -> Any: return copied + # Google Gemini / Vertex inline data + if isinstance(data, dict): + for inline_data_key in ("inline_data", "inlineData"): + inline_data = data.get(inline_data_key) + + if not isinstance(inline_data, dict) or "data" not in inline_data: + continue + + content_type = inline_data.get("mime_type") or inline_data.get( + "mimeType" + ) + + if not content_type: + continue + + if _is_langfuse_media_reference(inline_data["data"]): + return data + + media = LangfuseMedia( + base64_data_uri=f"data:{content_type};base64," + + inline_data["data"], + ) + + self._process_media( + media=media, + trace_id=trace_id, + observation_id=observation_id, + field=field, + ) + + copied = data.copy() + copied_inline_data = inline_data.copy() + copied_inline_data["data"] = media + copied[inline_data_key] = copied_inline_data + + return copied + if isinstance(data, list): return [_process_data_recursively(item, level + 1) for item in data] diff --git a/langfuse/types.py b/langfuse/types.py index c3029e713..e8302ca42 100644 --- a/langfuse/types.py +++ b/langfuse/types.py @@ -17,14 +17,21 @@ def my_evaluator(*, output: str, **kwargs) -> Evaluation: ``` """ +from dataclasses import dataclass, field +from types import MappingProxyType from typing import ( Any, Dict, Literal, + Mapping, + Optional, Protocol, + Sequence, TypedDict, ) +from opentelemetry.util.types import AttributeValue + try: from typing import NotRequired # type: ignore except ImportError: @@ -54,6 +61,219 @@ class MaskFunction(Protocol): def __call__(self, *, data: Any, **kwargs: Dict[str, Any]) -> Any: ... +@dataclass(frozen=True) +class OtelSpanIdentifier: + """Stable key for one OpenTelemetry span in a masking batch. + + Use this object as the key when returning a patch for a span. It is a + frozen, hashable dataclass, so the safest pattern is to reuse the exact + identifier object from `MaskOtelSpansParams.spans` instead of rebuilding it. + + Attributes: + trace_id: Lowercase 32-character hexadecimal OpenTelemetry trace ID. + span_id: Lowercase 16-character hexadecimal OpenTelemetry span ID. + """ + + trace_id: str + span_id: str + + +@dataclass(frozen=True) +class OtelSpanData: + """Read-only OpenTelemetry span snapshot passed to `mask_otel_spans`. + + The snapshot contains the span data that Langfuse is about to export after + the SDK has applied `should_export_span` filtering and export-stage media + processing. The mappings are immutable views and mutating them is not + supported; return an `OtelSpanPatch` to change exported attributes. + + `mask_otel_spans` can only change span attributes. It cannot change the + span name, IDs, parent relationship, resource attributes, events, links, or + instrumentation scope. + + Attributes: + trace_id: Lowercase 32-character hexadecimal OpenTelemetry trace ID. + span_id: Lowercase 16-character hexadecimal OpenTelemetry span ID. + parent_span_id: Lowercase hexadecimal parent span ID, or `None` for a + root span or when the parent is not available. + name: OpenTelemetry span name. + instrumentation_scope_name: Name of the instrumentation scope that + emitted the span, for example `openai` or `langfuse`. + instrumentation_scope_version: Version of the instrumentation scope, if + the instrumentation library provided one. + attributes: Read-only attributes that will be exported unless patched. + Values use OpenTelemetry `AttributeValue` types: strings, booleans, + numbers, or homogeneous sequences of those scalar values. + resource_attributes: Read-only resource attributes from the span's + OpenTelemetry resource. These are available for decisions only and + cannot be patched through `mask_otel_spans`. + """ + + trace_id: str + span_id: str + parent_span_id: Optional[str] + name: str + instrumentation_scope_name: Optional[str] + instrumentation_scope_version: Optional[str] + attributes: Mapping[str, AttributeValue] + resource_attributes: Mapping[str, AttributeValue] + + +@dataclass(frozen=True) +class MaskOtelSpansParams: + """Input passed to an export-stage OpenTelemetry span masking function. + + A single call receives one OpenTelemetry export batch, not necessarily a + complete trace, request, or Langfuse observation tree. Batch contents depend + on OpenTelemetry span processor settings such as `flush_at`, + `flush_interval`, explicit `flush()`, and shutdown. + + Example: + ```python + from typing import Optional + + from langfuse.types import ( + MaskOtelSpansParams, + MaskOtelSpansResult, + OtelSpanPatch, + ) + + def mask_otel_spans( + *, params: MaskOtelSpansParams + ) -> Optional[MaskOtelSpansResult]: + patches = {} + + for identifier, span in params.spans.items(): + if "http.request.header.authorization" in span.attributes: + patches[identifier] = OtelSpanPatch( + delete_attributes=("http.request.header.authorization",), + set_attributes={"security.redacted": True}, + ) + + return MaskOtelSpansResult(span_patches=patches) + ``` + + Attributes: + spans: Read-only mapping from stable span identifiers to span snapshots. + Return patches using keys from this mapping. + """ + + spans: Mapping[OtelSpanIdentifier, OtelSpanData] + + +@dataclass(frozen=True) +class OtelSpanPatch: + """Attribute changes to apply to one OpenTelemetry span before export. + + Patches are sparse: include only the attributes that should change. Langfuse + deletes `delete_attributes` first and then applies `set_attributes`, so a key + present in both fields is exported with the value from `set_attributes`. + + Attribute values must be valid OpenTelemetry attributes: strings, booleans, + integers, floats, or homogeneous sequences of those scalar types. If one + value is not valid for OpenTelemetry, Langfuse removes that attribute from + the export rather than sending an invalid span. + + Example: + ```python + OtelSpanPatch( + delete_attributes=("gen_ai.prompt.0.content",), + set_attributes={ + "gen_ai.prompt.redacted": True, + "app.masking.rule": "drop_prompt_text", + }, + ) + ``` + + Attributes: + set_attributes: Attribute values to add or replace on the exported span. + delete_attributes: Attribute keys to remove from the exported span. + """ + + set_attributes: Mapping[str, AttributeValue] = field( + default_factory=lambda: MappingProxyType({}) + ) + delete_attributes: Sequence[str] = field(default_factory=tuple) + + +@dataclass(frozen=True) +class MaskOtelSpansResult: + """Patches returned by a `mask_otel_spans` function. + + Omit spans that do not need changes. A mapping value of `None` also leaves + that span unchanged. Returning an invalid patch to drop a span is not a + supported API; use `should_export_span` when you need span-level export + filtering. + + If `mask_otel_spans` raises or returns an object that is not a + `MaskOtelSpansResult`, Langfuse drops the whole export batch. If one + individual `OtelSpanPatch` is invalid, Langfuse drops only that span from + the export batch. + + Attributes: + span_patches: Mapping from identifiers in `MaskOtelSpansParams.spans` to + sparse attribute patches. + """ + + span_patches: Mapping[OtelSpanIdentifier, Optional[OtelSpanPatch]] = field( + default_factory=lambda: MappingProxyType({}) + ) + + +class MaskOtelSpansFunction(Protocol): + """Function protocol for export-stage OpenTelemetry span masking. + + `mask_otel_spans` runs after Langfuse decides which spans this client should + export and after export-stage media handling has converted supported media + payloads into Langfuse media references. It affects only the spans exported + by this Langfuse client. If the same OpenTelemetry spans are sent to another + exporter, that exporter receives its own unmodified copy. + + The function is synchronous. It usually runs on the OpenTelemetry batch span + processor worker thread; during `flush()` and shutdown it may run on the + caller thread. Keep it deterministic and fast, and avoid relying on request + locals, the current active span, or async I/O. + + Return `None` to leave the whole batch unchanged, or return + `MaskOtelSpansResult` with sparse patches for the spans that should change. + + Example: + ```python + from typing import Optional + + from langfuse import Langfuse + from langfuse.types import ( + MaskOtelSpansParams, + MaskOtelSpansResult, + OtelSpanPatch, + ) + + def mask_otel_spans( + *, params: MaskOtelSpansParams + ) -> Optional[MaskOtelSpansResult]: + patches = {} + + for identifier, span in params.spans.items(): + if span.instrumentation_scope_name == "openai": + patches[identifier] = OtelSpanPatch( + delete_attributes=( + "gen_ai.prompt.0.content", + "gen_ai.completion.0.content", + ), + set_attributes={"masking.applied": True}, + ) + + return MaskOtelSpansResult(span_patches=patches) + + langfuse = Langfuse(mask_otel_spans=mask_otel_spans) + ``` + """ + + def __call__( + self, *, params: MaskOtelSpansParams + ) -> Optional[MaskOtelSpansResult]: ... + + class ParsedMediaReference(TypedDict): """A parsed media reference. @@ -78,6 +298,12 @@ class TraceContext(TypedDict): "ScoreDataType", "ExperimentScoreType", "MaskFunction", + "MaskOtelSpansFunction", + "MaskOtelSpansParams", + "MaskOtelSpansResult", + "OtelSpanData", + "OtelSpanIdentifier", + "OtelSpanPatch", "ParsedMediaReference", "TraceContext", ] diff --git a/pyproject.toml b/pyproject.toml index 99c704c0e..ea31f62d6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "langfuse" -version = "4.6.0b1" +version = "4.6.0a1" description = "A client library for accessing langfuse" readme = "README.md" authors = [{ name = "langfuse", email = "developers@langfuse.com" }] diff --git a/tests/e2e/test_core_sdk.py b/tests/e2e/test_core_sdk.py index 9b491a5aa..614d6da41 100644 --- a/tests/e2e/test_core_sdk.py +++ b/tests/e2e/test_core_sdk.py @@ -2069,8 +2069,9 @@ def test_create_trace_sampling_zero(): } -def test_mask_function(): +def test_mask_function(request): LangfuseResourceManager.reset() + request.addfinalizer(LangfuseResourceManager.reset) def mask_func(data): if isinstance(data, dict): diff --git a/tests/unit/test_mask_otel_spans.py b/tests/unit/test_mask_otel_spans.py new file mode 100644 index 000000000..437c3c207 --- /dev/null +++ b/tests/unit/test_mask_otel_spans.py @@ -0,0 +1,1042 @@ +import base64 +import json +import logging +from queue import Queue +from threading import Event +from types import SimpleNamespace +from typing import Sequence +from unittest.mock import Mock + +import pytest +from opentelemetry.attributes import BoundedAttributes +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import ReadableSpan, TracerProvider +from opentelemetry.sdk.trace.export import ( + BatchSpanProcessor, + SpanExporter, + SpanExportResult, +) +from opentelemetry.sdk.util import BoundedList +from opentelemetry.sdk.util.instrumentation import InstrumentationInfo +from opentelemetry.trace import SpanContext, TraceFlags, TraceState + +import langfuse._client.span_exporter as span_exporter_module +from langfuse._client.constants import LANGFUSE_TRACER_NAME +from langfuse._client.span_processor import LangfuseSpanProcessor +from langfuse._task_manager.media_manager import MediaManager +from langfuse._utils.serializer import EventSerializer +from langfuse.types import ( + MaskOtelSpansParams, + MaskOtelSpansResult, + OtelSpanIdentifier, + OtelSpanPatch, +) + + +class InMemorySpanExporter(SpanExporter): + def __init__(self) -> None: + self._finished_spans: list[ReadableSpan] = [] + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + self._finished_spans.extend(spans) + return SpanExportResult.SUCCESS + + def shutdown(self) -> None: + pass + + def get_finished_spans(self) -> list[ReadableSpan]: + return list(self._finished_spans) + + +class FailsOnceSpanExporter(SpanExporter): + def __init__(self) -> None: + self.export_attempts = 0 + self.first_export_attempted = Event() + self.second_export_succeeded = Event() + self._finished_spans: list[ReadableSpan] = [] + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + self.export_attempts += 1 + + if self.export_attempts == 1: + self.first_export_attempted.set() + raise RuntimeError("synthetic export failure") + + self._finished_spans.extend(spans) + self.second_export_succeeded.set() + + return SpanExportResult.SUCCESS + + def shutdown(self) -> None: + pass + + def get_finished_spans(self) -> list[ReadableSpan]: + return list(self._finished_spans) + + +def _media_manager() -> tuple[MediaManager, Queue]: + queue: Queue = Queue() + + return ( + MediaManager( + api_client=SimpleNamespace(media=Mock()), + httpx_client=Mock(), + media_upload_queue=queue, + ), + queue, + ) + + +def _tracer_provider( + *, + exporter: SpanExporter, + media_manager: MediaManager, + mask_otel_spans=None, + resource_attributes=None, + should_export_span=None, +) -> TracerProvider: + provider = TracerProvider( + resource=Resource.create( + {"service.name": "test", **(resource_attributes or {})} + ) + ) + provider.add_span_processor( + LangfuseSpanProcessor( + public_key="test-public-key", + secret_key="test-secret-key", + base_url="http://localhost:3000", + flush_at=10, + flush_interval=1, + span_exporter=exporter, + media_manager=media_manager, + mask_otel_spans=mask_otel_spans, + should_export_span=should_export_span, + ) + ) + + return provider + + +def test_mask_otel_spans_receives_post_media_batch_and_applies_sparse_patch(): + exporter = InMemorySpanExporter() + media_manager, media_queue = _media_manager() + seen_params: list[MaskOtelSpansParams] = [] + image_base64 = base64.b64encode(b"image-bytes").decode("utf-8") + + def mask_otel_spans(*, params: MaskOtelSpansParams): + seen_params.append(params) + + target_identifier = _find_identifier_by_attribute(params, "gen_ai.prompt") + payload = json.loads( + params.spans[target_identifier].attributes["gen_ai.prompt"] + ) + + assert len(params.spans) == 2 + assert payload["inline_data"]["data"].startswith("@@@langfuseMedia:") + + return MaskOtelSpansResult( + span_patches={ + target_identifier: OtelSpanPatch( + set_attributes={ + "gen_ai.request.model": "masked-model", + "masking.applied": True, + } + ) + } + ) + + provider = _tracer_provider( + exporter=exporter, + media_manager=media_manager, + mask_otel_spans=mask_otel_spans, + ) + tracer = provider.get_tracer("openinference.instrumentation.openai") + + with tracer.start_as_current_span("third-party-media-span") as span: + span.set_attribute("gen_ai.request.model", "gpt-4o") + span.set_attribute( + "gen_ai.prompt", + json.dumps( + { + "inline_data": { + "mime_type": "image/jpeg", + "data": image_base64, + } + } + ), + ) + + with tracer.start_as_current_span("third-party-unchanged-span") as span: + span.set_attribute("gen_ai.request.model", "gpt-4o-mini") + + provider.force_flush() + + exported_spans = exporter.get_finished_spans() + exported_by_name = {span.name: span for span in exported_spans} + exported_payload = json.loads( + exported_by_name["third-party-media-span"].attributes["gen_ai.prompt"] + ) + + assert len(seen_params) == 1 + assert len(exported_spans) == 2 + assert ( + exported_by_name["third-party-media-span"].attributes["gen_ai.request.model"] + == "masked-model" + ) + assert ( + exported_by_name["third-party-media-span"].attributes["masking.applied"] is True + ) + assert ( + exported_by_name["third-party-unchanged-span"].attributes[ + "gen_ai.request.model" + ] + == "gpt-4o-mini" + ) + assert exported_payload["inline_data"]["data"].startswith("@@@langfuseMedia:") + assert not media_queue.empty() + + +def test_export_stage_media_prefilter_skips_json_without_media_hints(monkeypatch): + exporter = InMemorySpanExporter() + media_manager, media_queue = _media_manager() + json_load_calls = 0 + original_json_loads = json.loads + + def count_json_loads(*args, **kwargs): + nonlocal json_load_calls + json_load_calls += 1 + return original_json_loads(*args, **kwargs) + + monkeypatch.setattr(span_exporter_module.json, "loads", count_json_loads) + + transforming_exporter = span_exporter_module.LangfuseTransformingSpanExporter( + exporter=exporter, + media_manager=media_manager, + mask_otel_spans=None, + ) + span = SimpleNamespace( + name="third-party-json-without-media", + context=SimpleNamespace(trace_id=1, span_id=2), + ) + payload = '{"messages": [{"role": "user", "content": "hello"}]}' + + result = transforming_exporter._process_media_string( + span=span, + attribute_key="gen_ai.prompt", + value=payload, + ) + + assert json_load_calls == 0 + assert result == payload + assert media_queue.empty() + + +def test_export_stage_media_processes_direct_data_uri_string(): + exporter = InMemorySpanExporter() + media_manager, media_queue = _media_manager() + image_base64 = base64.b64encode(b"image-bytes").decode("utf-8") + + provider = _tracer_provider(exporter=exporter, media_manager=media_manager) + tracer = provider.get_tracer("openinference.instrumentation.openai") + + with tracer.start_as_current_span("third-party-direct-media-span") as span: + span.set_attribute("gen_ai.prompt", f"data:image/jpeg;base64,{image_base64}") + + provider.force_flush() + + exported_span = exporter.get_finished_spans()[0] + + assert exported_span.attributes["gen_ai.prompt"].startswith("@@@langfuseMedia:") + assert not media_queue.empty() + + +def test_export_stage_media_processes_string_sequence_attributes(): + exporter = InMemorySpanExporter() + media_manager, media_queue = _media_manager() + image_base64 = base64.b64encode(b"image-bytes").decode("utf-8") + + provider = _tracer_provider(exporter=exporter, media_manager=media_manager) + tracer = provider.get_tracer("openinference.instrumentation.openai") + + inline_data_payload = json.dumps( + { + "inline_data": { + "mime_type": "image/png", + "data": image_base64, + } + } + ) + + with tracer.start_as_current_span("third-party-sequence-media-span") as span: + span.set_attribute( + "gen_ai.prompt", + [ + f"data:image/jpeg;base64,{image_base64}", + inline_data_payload, + "plain text", + ], + ) + + provider.force_flush() + + exported_span = exporter.get_finished_spans()[0] + exported_sequence = exported_span.attributes["gen_ai.prompt"] + exported_payload = json.loads(exported_sequence[1]) + + assert isinstance(exported_sequence, tuple) + assert exported_sequence[0].startswith("@@@langfuseMedia:") + assert exported_payload["inline_data"]["data"].startswith("@@@langfuseMedia:") + assert exported_sequence[2] == "plain text" + assert media_queue.qsize() == 2 + + +@pytest.mark.parametrize( + ("attribute_key", "expected_field"), + [ + ("langfuse.trace.input", "input"), + ("langfuse.observation.input", "input"), + ("ai.prompt.messages", "input"), + ("gcp.vertex.agent.tool_call_args", "input"), + ("lk.chat_ctx", "input"), + ("traceloop.entity.input", "input"), + ("gen_ai.input.messages", "input"), + ("gen_ai.prompt.0.content", "input"), + ("llm.input_messages.0.message.content", "input"), + ("langfuse.trace.output", "output"), + ("langfuse.observation.output", "output"), + ("ai.response.toolCalls", "output"), + ("gcp.vertex.agent.tool_response", "output"), + ("lk.response.text", "output"), + ("mlflow.spanOutputs", "output"), + ("gen_ai.output.messages", "output"), + ("gen_ai.completion.0.content", "output"), + ("llm.output_messages.0.message.content", "output"), + ("gen_ai.request.model", "metadata"), + ("http.response.body", "metadata"), + ("message_bus.payload", "metadata"), + ("custom.input_payload", "metadata"), + ("ai.prompt.tools", "metadata"), + ], +) +def test_media_field_for_attribute_matches_server_input_output_keys( + attribute_key, expected_field +): + assert ( + span_exporter_module._media_field_for_attribute(attribute_key) == expected_field + ) + + +def test_export_stage_media_skips_already_processed_provider_references(caplog): + exporter = InMemorySpanExporter() + media_manager, media_queue = _media_manager() + image_base64 = base64.b64encode(b"image-bytes").decode("utf-8") + transforming_exporter = span_exporter_module.LangfuseTransformingSpanExporter( + exporter=exporter, + media_manager=media_manager, + mask_otel_spans=None, + ) + span = SimpleNamespace( + name="sdk-media-span", + context=SimpleNamespace(trace_id=1, span_id=2), + ) + provider_payloads = [ + [{"type": "base64", "media_type": "image/jpeg", "data": image_base64}], + [{"type": "media", "mime_type": "image/png", "data": image_base64}], + [{"inline_data": {"mime_type": "image/jpeg", "data": image_base64}}], + [{"inlineData": {"mimeType": "image/png", "data": image_base64}}], + ] + + for payload in provider_payloads: + processed_payload = media_manager._find_and_process_media( + data=payload, + trace_id="trace-id", + observation_id="observation-id", + field="input", + ) + serialized_payload = json.dumps(processed_payload, cls=EventSerializer) + queue_size_before_export_processing = media_queue.qsize() + + caplog.clear() + with caplog.at_level(logging.ERROR, logger="langfuse"): + result = transforming_exporter._process_media_string( + span=span, + attribute_key="langfuse.observation.input", + value=serialized_payload, + ) + + assert result == serialized_payload + assert media_queue.qsize() == queue_size_before_export_processing + assert not any( + "Error parsing base64 data URI" in record.message + for record in caplog.records + ) + + +def test_export_stage_media_replaces_invalid_media_attribute_with_failure_marker(): + exporter = InMemorySpanExporter() + media_manager, media_queue = _media_manager() + invalid_data_uri = "data:image/jpeg;base64," + + provider = _tracer_provider(exporter=exporter, media_manager=media_manager) + tracer = provider.get_tracer("openinference.instrumentation.openai") + + with tracer.start_as_current_span("third-party-invalid-media-span") as span: + span.set_attribute("gen_ai.prompt", invalid_data_uri) + + provider.force_flush() + + exported_span = exporter.get_finished_spans()[0] + + assert exported_span.attributes["gen_ai.prompt"] == ( + "" + ) + assert media_queue.empty() + + +def test_export_stage_media_leaves_attribute_when_span_context_is_missing(caplog): + exporter = InMemorySpanExporter() + media_manager, media_queue = _media_manager() + image_base64 = base64.b64encode(b"image-bytes").decode("utf-8") + transforming_exporter = span_exporter_module.LangfuseTransformingSpanExporter( + exporter=exporter, + media_manager=media_manager, + mask_otel_spans=None, + ) + span = ReadableSpan( + name="missing-context-media-span", + context=None, + attributes={}, + ) + serialized_payload = json.dumps( + [ + { + "type": "base64", + "media_type": "image/jpeg", + "data": image_base64, + }, + ] + ) + attributes = { + "langfuse.observation.input": f"data:image/png;base64,{image_base64}", + "gen_ai.prompt": serialized_payload, + "plain": "hello", + } + + with caplog.at_level(logging.WARNING, logger="langfuse"): + processed_attributes = transforming_exporter._process_media_attributes( + span=span, + attributes=attributes, + ) + + assert processed_attributes == attributes + assert media_queue.empty() + assert ( + sum( + "Span context is required for media processing" in record.message + for record in caplog.records + ) + == 2 + ) + + +def test_clone_span_preserves_dropped_attribute_event_and_link_counts(): + attributes = BoundedAttributes( + maxlen=None, + attributes={"secret": "raw"}, + immutable=True, + ) + attributes.dropped = 3 + events = BoundedList.from_seq(None, []) + events.dropped = 5 + links = BoundedList.from_seq(None, []) + links.dropped = 7 + span = ReadableSpan( + name="limited-span", + context=None, + attributes=attributes, + events=events, + links=links, + ) + + cloned_span = span_exporter_module.LangfuseTransformingSpanExporter._clone_span( + span=span, + attributes={"secret": "masked", "masking.applied": True}, + ) + + assert dict(cloned_span.attributes) == { + "secret": "masked", + "masking.applied": True, + } + assert cloned_span.dropped_attributes == 3 + assert cloned_span.dropped_events == 5 + assert cloned_span.dropped_links == 7 + + +def test_clone_span_preserves_deprecated_instrumentation_info(): + with pytest.warns(DeprecationWarning, match="InstrumentationScope"): + instrumentation_info = InstrumentationInfo("legacy-instrumentation", "1.2.3") + span = ReadableSpan( + name="legacy-instrumentation-span", + context=None, + attributes={}, + instrumentation_info=instrumentation_info, + ) + + cloned_span = span_exporter_module.LangfuseTransformingSpanExporter._clone_span( + span=span, + attributes={}, + ) + + assert getattr(cloned_span, "_instrumentation_info") is instrumentation_info + + +def test_mask_otel_spans_drops_contextless_spans_without_dropping_batch(caplog): + exporter = InMemorySpanExporter() + seen_span_names: list[str] = [] + + def mask_otel_spans(*, params: MaskOtelSpansParams): + seen_span_names.extend(span.name for span in params.spans.values()) + return None + + transforming_exporter = span_exporter_module.LangfuseTransformingSpanExporter( + exporter=exporter, + media_manager=None, + mask_otel_spans=mask_otel_spans, + ) + valid_context = SpanContext( + trace_id=1, + span_id=2, + is_remote=False, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + trace_state=TraceState(), + ) + contextless_spans = [ + ReadableSpan(name="missing-context-1", context=None, attributes={}), + ReadableSpan(name="missing-context-2", context=None, attributes={}), + ] + valid_span = ReadableSpan( + name="valid-context", + context=valid_context, + attributes={"gen_ai.request.model": "gpt-4o"}, + ) + + with caplog.at_level(logging.WARNING, logger="langfuse"): + result = transforming_exporter.export([*contextless_spans, valid_span]) + + exported_spans = exporter.get_finished_spans() + + assert result == SpanExportResult.SUCCESS + assert seen_span_names == ["valid-context"] + assert [span.name for span in exported_spans] == ["valid-context"] + assert ( + sum( + "span context is missing or invalid" in record.message + for record in caplog.records + ) + == 2 + ) + + +def test_exporter_exception_does_not_stop_background_export_thread(): + exporter = FailsOnceSpanExporter() + media_manager, _ = _media_manager() + provider = TracerProvider(resource=Resource.create({"service.name": "test"})) + provider.add_span_processor( + BatchSpanProcessor( + span_exporter=span_exporter_module.LangfuseTransformingSpanExporter( + exporter=exporter, + media_manager=media_manager, + mask_otel_spans=None, + ), + max_export_batch_size=1, + schedule_delay_millis=60_000, + ) + ) + tracer = provider.get_tracer("openinference.instrumentation.openai") + + try: + with tracer.start_as_current_span("first-export-fails"): + pass + + assert exporter.first_export_attempted.wait(timeout=5) + + with tracer.start_as_current_span("second-export-succeeds"): + pass + + assert exporter.second_export_succeeded.wait(timeout=5) + finally: + provider.shutdown() + + exported_spans = exporter.get_finished_spans() + + assert exporter.export_attempts == 2 + assert [span.name for span in exported_spans] == ["second-export-succeeds"] + + +def test_mask_otel_spans_receives_whole_span_snapshot(): + exporter = InMemorySpanExporter() + media_manager, _ = _media_manager() + seen_params: list[MaskOtelSpansParams] = [] + + def mask_otel_spans(*, params: MaskOtelSpansParams): + seen_params.append(params) + + return None + + provider = _tracer_provider( + exporter=exporter, + media_manager=media_manager, + mask_otel_spans=mask_otel_spans, + resource_attributes={"deployment.environment.name": "ci"}, + should_export_span=lambda span: True, + ) + tracer = provider.get_tracer("snapshot.scope", "1.2.3") + + with tracer.start_as_current_span("parent-span") as parent_span: + parent_span.set_attribute("parent.attr", "visible") + + with tracer.start_as_current_span("child-span") as child_span: + child_span.set_attribute("secret", "raw") + + provider.force_flush() + + params = seen_params[0] + parent_identifier = _find_identifier_by_name(params, "parent-span") + child_identifier = _find_identifier_by_name(params, "child-span") + child_data = params.spans[child_identifier] + + assert len(params.spans) == 2 + assert child_data.trace_id == child_identifier.trace_id + assert child_data.span_id == child_identifier.span_id + assert child_data.parent_span_id == parent_identifier.span_id + assert child_data.name == "child-span" + assert child_data.instrumentation_scope_name == "snapshot.scope" + assert child_data.instrumentation_scope_version == "1.2.3" + assert child_data.attributes["secret"] == "raw" + assert child_data.resource_attributes["service.name"] == "test" + assert child_data.resource_attributes["deployment.environment.name"] == "ci" + + with pytest.raises(TypeError): + params.spans[child_identifier] = child_data + + with pytest.raises(TypeError): + child_data.attributes["secret"] = "changed" + + +def test_mask_otel_spans_runs_for_langfuse_sdk_spans(): + exporter = InMemorySpanExporter() + media_manager, _ = _media_manager() + + def mask_otel_spans(*, params: MaskOtelSpansParams): + identifier = next(iter(params.spans)) + + return MaskOtelSpansResult( + span_patches={ + identifier: OtelSpanPatch(set_attributes={"secret": "masked"}) + } + ) + + provider = _tracer_provider( + exporter=exporter, + media_manager=media_manager, + mask_otel_spans=mask_otel_spans, + ) + tracer = provider.get_tracer( + LANGFUSE_TRACER_NAME, attributes={"public_key": "test-public-key"} + ) + + with tracer.start_as_current_span("langfuse-span") as span: + span.set_attribute("langfuse.observation.type", "span") + span.set_attribute("secret", "raw") + + provider.force_flush() + + exported_span = exporter.get_finished_spans()[0] + + assert exported_span.attributes["secret"] == "masked" + + +def test_mask_otel_spans_none_result_leaves_batch_unchanged(): + exporter = InMemorySpanExporter() + media_manager, _ = _media_manager() + + def mask_otel_spans(*, params: MaskOtelSpansParams): + return None + + provider = _tracer_provider( + exporter=exporter, + media_manager=media_manager, + mask_otel_spans=mask_otel_spans, + ) + tracer = provider.get_tracer("openinference.instrumentation.openai") + + with tracer.start_as_current_span("third-party-span") as span: + span.set_attribute("secret", "raw") + + provider.force_flush() + + exported_span = exporter.get_finished_spans()[0] + + assert exported_span.attributes["secret"] == "raw" + + +def test_mask_otel_spans_none_patch_leaves_span_unchanged(): + exporter = InMemorySpanExporter() + media_manager, _ = _media_manager() + + def mask_otel_spans(*, params: MaskOtelSpansParams): + identifier = next(iter(params.spans)) + + return MaskOtelSpansResult(span_patches={identifier: None}) + + provider = _tracer_provider( + exporter=exporter, + media_manager=media_manager, + mask_otel_spans=mask_otel_spans, + ) + tracer = provider.get_tracer("openinference.instrumentation.openai") + + with tracer.start_as_current_span("third-party-span") as span: + span.set_attribute("secret", "raw") + + provider.force_flush() + + exported_span = exporter.get_finished_spans()[0] + + assert exported_span.attributes["secret"] == "raw" + + +def test_mask_otel_spans_exception_drops_batch(): + exporter = InMemorySpanExporter() + media_manager, _ = _media_manager() + + def mask_otel_spans(*, params: MaskOtelSpansParams): + raise RuntimeError("mask failed") + + provider = _tracer_provider( + exporter=exporter, + media_manager=media_manager, + mask_otel_spans=mask_otel_spans, + ) + tracer = provider.get_tracer("openinference.instrumentation.openai") + + with tracer.start_as_current_span("first-span") as span: + span.set_attribute("gen_ai.request.model", "gpt-4o") + + with tracer.start_as_current_span("second-span") as span: + span.set_attribute("gen_ai.request.model", "gpt-4o-mini") + + provider.force_flush() + + assert exporter.get_finished_spans() == [] + + +def test_mask_otel_spans_invalid_result_drops_batch(): + exporter = InMemorySpanExporter() + media_manager, _ = _media_manager() + + def mask_otel_spans(*, params: MaskOtelSpansParams): + return {"span_patches": {}} + + provider = _tracer_provider( + exporter=exporter, + media_manager=media_manager, + mask_otel_spans=mask_otel_spans, + ) + tracer = provider.get_tracer("openinference.instrumentation.openai") + + with tracer.start_as_current_span("third-party-span") as span: + span.set_attribute("gen_ai.request.model", "gpt-4o") + + provider.force_flush() + + assert exporter.get_finished_spans() == [] + + +def test_mask_otel_spans_invalid_span_patches_container_drops_batch(): + exporter = InMemorySpanExporter() + media_manager, _ = _media_manager() + invalid_result = MaskOtelSpansResult() + object.__setattr__(invalid_result, "span_patches", []) + + def mask_otel_spans(*, params: MaskOtelSpansParams): + return invalid_result + + provider = _tracer_provider( + exporter=exporter, + media_manager=media_manager, + mask_otel_spans=mask_otel_spans, + ) + tracer = provider.get_tracer("openinference.instrumentation.openai") + + with tracer.start_as_current_span("third-party-span") as span: + span.set_attribute("gen_ai.request.model", "gpt-4o") + + provider.force_flush() + + assert exporter.get_finished_spans() == [] + + +def test_mask_otel_spans_unknown_identifier_drops_batch(): + exporter = InMemorySpanExporter() + media_manager, _ = _media_manager() + unknown_identifier = OtelSpanIdentifier(trace_id="1" * 32, span_id="2" * 16) + + def mask_otel_spans(*, params: MaskOtelSpansParams): + return MaskOtelSpansResult( + span_patches={ + unknown_identifier: OtelSpanPatch(set_attributes={"secret": "masked"}) + } + ) + + provider = _tracer_provider( + exporter=exporter, + media_manager=media_manager, + mask_otel_spans=mask_otel_spans, + ) + tracer = provider.get_tracer("openinference.instrumentation.openai") + + with tracer.start_as_current_span("third-party-span") as span: + span.set_attribute("gen_ai.request.model", "gpt-4o") + + provider.force_flush() + + assert exporter.get_finished_spans() == [] + + +def test_mask_otel_spans_invalid_patch_drops_only_that_span(): + exporter = InMemorySpanExporter() + media_manager, _ = _media_manager() + + def mask_otel_spans(*, params: MaskOtelSpansParams): + target_identifier = _find_identifier_by_name(params, "drop-me") + + return MaskOtelSpansResult( + span_patches={ + target_identifier: {"set_attributes": {"secret": "masked"}}, + } + ) + + provider = _tracer_provider( + exporter=exporter, + media_manager=media_manager, + mask_otel_spans=mask_otel_spans, + ) + tracer = provider.get_tracer("openinference.instrumentation.openai") + + with tracer.start_as_current_span("drop-me") as span: + span.set_attribute("gen_ai.request.model", "gpt-4o") + + with tracer.start_as_current_span("keep-me") as span: + span.set_attribute("gen_ai.request.model", "gpt-4o-mini") + + provider.force_flush() + + exported_spans = exporter.get_finished_spans() + + assert [span.name for span in exported_spans] == ["keep-me"] + + +@pytest.mark.parametrize("invalid_field", ["set_attributes", "delete_attributes"]) +def test_mask_otel_spans_invalid_patch_containers_drop_only_that_span(invalid_field): + exporter = InMemorySpanExporter() + media_manager, _ = _media_manager() + + def mask_otel_spans(*, params: MaskOtelSpansParams): + target_identifier = _find_identifier_by_name(params, "drop-me") + patch = OtelSpanPatch() + + if invalid_field == "set_attributes": + object.__setattr__(patch, "set_attributes", ["secret"]) + else: + object.__setattr__(patch, "delete_attributes", "secret") + + return MaskOtelSpansResult(span_patches={target_identifier: patch}) + + provider = _tracer_provider( + exporter=exporter, + media_manager=media_manager, + mask_otel_spans=mask_otel_spans, + ) + tracer = provider.get_tracer("openinference.instrumentation.openai") + + with tracer.start_as_current_span("drop-me") as span: + span.set_attribute("gen_ai.request.model", "gpt-4o") + + with tracer.start_as_current_span("keep-me") as span: + span.set_attribute("gen_ai.request.model", "gpt-4o-mini") + + provider.force_flush() + + exported_spans = exporter.get_finished_spans() + + assert [span.name for span in exported_spans] == ["keep-me"] + + +def test_mask_otel_spans_invalid_patch_keys_are_ignored(): + exporter = InMemorySpanExporter() + media_manager, _ = _media_manager() + + def mask_otel_spans(*, params: MaskOtelSpansParams): + identifier = next(iter(params.spans)) + + return MaskOtelSpansResult( + span_patches={ + identifier: OtelSpanPatch( + delete_attributes=[None, "secret"], + set_attributes={ + None: "ignored", + "masked": "value", + }, + ) + } + ) + + provider = _tracer_provider( + exporter=exporter, + media_manager=media_manager, + mask_otel_spans=mask_otel_spans, + ) + tracer = provider.get_tracer("openinference.instrumentation.openai") + + with tracer.start_as_current_span("third-party-span") as span: + span.set_attribute("secret", "raw") + span.set_attribute("kept", "value") + + provider.force_flush() + + exported_span = exporter.get_finished_spans()[0] + + assert "secret" not in exported_span.attributes + assert None not in exported_span.attributes + assert exported_span.attributes["kept"] == "value" + assert exported_span.attributes["masked"] == "value" + + +def test_mask_otel_spans_invalid_set_value_deletes_attribute(): + exporter = InMemorySpanExporter() + media_manager, _ = _media_manager() + + def mask_otel_spans(*, params: MaskOtelSpansParams): + identifier = next(iter(params.spans)) + + return MaskOtelSpansResult( + span_patches={ + identifier: OtelSpanPatch( + set_attributes={ + "secret": object(), + "masking.applied": True, + } + ) + } + ) + + provider = _tracer_provider( + exporter=exporter, + media_manager=media_manager, + mask_otel_spans=mask_otel_spans, + ) + tracer = provider.get_tracer("openinference.instrumentation.openai") + + with tracer.start_as_current_span("third-party-span") as span: + span.set_attribute("gen_ai.request.model", "gpt-4o") + span.set_attribute("secret", "raw") + + provider.force_flush() + + exported_span = exporter.get_finished_spans()[0] + + assert "secret" not in exported_span.attributes + assert exported_span.attributes["gen_ai.request.model"] == "gpt-4o" + assert exported_span.attributes["masking.applied"] is True + + +def test_mask_otel_spans_set_wins_when_key_is_deleted_and_set(): + exporter = InMemorySpanExporter() + media_manager, _ = _media_manager() + + def mask_otel_spans(*, params: MaskOtelSpansParams): + identifier = next(iter(params.spans)) + + return MaskOtelSpansResult( + span_patches={ + identifier: OtelSpanPatch( + delete_attributes=["secret"], + set_attributes={"secret": "masked"}, + ) + } + ) + + provider = _tracer_provider( + exporter=exporter, + media_manager=media_manager, + mask_otel_spans=mask_otel_spans, + ) + tracer = provider.get_tracer("openinference.instrumentation.openai") + + with tracer.start_as_current_span("third-party-span") as span: + span.set_attribute("gen_ai.request.model", "gpt-4o") + span.set_attribute("secret", "raw") + + provider.force_flush() + + exported_span = exporter.get_finished_spans()[0] + + assert exported_span.attributes["secret"] == "masked" + + +def test_mask_otel_spans_runs_after_should_export_span_filter(): + exporter = InMemorySpanExporter() + media_manager, media_queue = _media_manager() + seen_params: list[MaskOtelSpansParams] = [] + image_base64 = base64.b64encode(b"image-bytes").decode("utf-8") + + def mask_otel_spans(*, params: MaskOtelSpansParams): + seen_params.append(params) + + return MaskOtelSpansResult() + + provider = _tracer_provider( + exporter=exporter, + media_manager=media_manager, + mask_otel_spans=mask_otel_spans, + should_export_span=lambda span: span.name == "keep-me", + ) + tracer = provider.get_tracer("openinference.instrumentation.openai") + + with tracer.start_as_current_span("drop-me") as span: + span.set_attribute("gen_ai.prompt", f"data:image/jpeg;base64,{image_base64}") + + with tracer.start_as_current_span("keep-me") as span: + span.set_attribute("gen_ai.request.model", "gpt-4o") + + provider.force_flush() + + exported_spans = exporter.get_finished_spans() + + assert len(seen_params) == 1 + assert [span_data.name for span_data in seen_params[0].spans.values()] == [ + "keep-me" + ] + assert [span.name for span in exported_spans] == ["keep-me"] + assert media_queue.empty() + + +def _find_identifier_by_attribute( + params: MaskOtelSpansParams, attribute_key: str +) -> OtelSpanIdentifier: + return next( + identifier + for identifier, span_data in params.spans.items() + if attribute_key in span_data.attributes + ) + + +def _find_identifier_by_name( + params: MaskOtelSpansParams, name: str +) -> OtelSpanIdentifier: + return next( + identifier + for identifier, span_data in params.spans.items() + if span_data.name == name + ) diff --git a/tests/unit/test_media_manager.py b/tests/unit/test_media_manager.py index 68684fac4..01b315dac 100644 --- a/tests/unit/test_media_manager.py +++ b/tests/unit/test_media_manager.py @@ -194,3 +194,47 @@ def test_find_and_process_media_sse_in_nested_structure_passes_through(): assert result == data assert queue.empty() + + +def test_find_and_process_media_gemini_inline_data_is_processed(): + queue = Queue() + manager = MediaManager( + api_client=SimpleNamespace(media=Mock()), + httpx_client=Mock(), + media_upload_queue=queue, + ) + + data = { + "inline_data": { + "mime_type": "image/jpeg", + "data": "/9j/4AAQSkZJRgABAQAAAQABAAD/4QBARXhpZgAA", + } + } + result = manager._find_and_process_media( + data=data, trace_id="trace-id", observation_id=None, field="input" + ) + + assert isinstance(result["inline_data"]["data"], LangfuseMedia) + assert not queue.empty() + + +def test_find_and_process_media_gemini_inline_data_camel_case_is_processed(): + queue = Queue() + manager = MediaManager( + api_client=SimpleNamespace(media=Mock()), + httpx_client=Mock(), + media_upload_queue=queue, + ) + + data = { + "inlineData": { + "mimeType": "image/png", + "data": "iVBORw0KGgo=", + } + } + result = manager._find_and_process_media( + data=data, trace_id="trace-id", observation_id=None, field="input" + ) + + assert isinstance(result["inlineData"]["data"], LangfuseMedia) + assert not queue.empty() diff --git a/tests/unit/test_otel.py b/tests/unit/test_otel.py index e7eb74280..636b344d2 100644 --- a/tests/unit/test_otel.py +++ b/tests/unit/test_otel.py @@ -1967,12 +1967,15 @@ class TestMultiProjectSetup(TestOTelBase): """ @pytest.fixture(scope="function") - def multi_project_setup(self, monkeypatch): + def multi_project_setup(self, monkeypatch, request): """Create two separate Langfuse clients with different projects.""" # Reset any previous trace providers + from opentelemetry import context as otel_context from opentelemetry import trace as trace_api_reset original_provider = trace_api_reset.get_tracer_provider() + context_token = otel_context.attach(otel_context.Context()) + request.addfinalizer(lambda: otel_context.detach(context_token)) # Create exporters and tracers for two projects exporter_project1 = InMemorySpanExporter() diff --git a/tests/unit/test_resource_manager.py b/tests/unit/test_resource_manager.py index d0880dcd6..8e3b505c7 100644 --- a/tests/unit/test_resource_manager.py +++ b/tests/unit/test_resource_manager.py @@ -14,6 +14,7 @@ from langfuse._task_manager.media_manager import MediaManager from langfuse._task_manager.media_upload_consumer import MediaUploadConsumer from langfuse._task_manager.score_ingestion_consumer import ScoreIngestionConsumer +from langfuse.types import MaskOtelSpansResult class NoOpSpanExporter(SpanExporter): @@ -38,6 +39,9 @@ def test_get_client_preserves_all_settings(monkeypatch): def should_export(span): return span.name != "drop" + def mask_otel_spans(*, params): + return MaskOtelSpansResult() + span_exporter = NoOpSpanExporter() settings = { @@ -49,6 +53,7 @@ def should_export(span): "flush_at": 100, "sample_rate": 0.8, "should_export_span": should_export, + "mask_otel_spans": mask_otel_spans, "additional_headers": {"X-Custom": "value"}, "span_exporter": span_exporter, } @@ -65,6 +70,7 @@ def should_export(span): assert rm.timeout == settings["timeout"] assert rm.sample_rate == settings["sample_rate"] assert rm.should_export_span is should_export + assert rm.mask_otel_spans is mask_otel_spans assert rm.additional_headers == settings["additional_headers"] assert rm.span_exporter is span_exporter diff --git a/uv.lock b/uv.lock index 9f36a9102..11c0acdff 100644 --- a/uv.lock +++ b/uv.lock @@ -3,7 +3,7 @@ revision = 3 requires-python = ">=3.10, <4.0" [options] -exclude-newer = "2026-04-27T07:52:36.360279576Z" +exclude-newer = "2026-05-01T11:54:53.932901388Z" exclude-newer-span = "P7D" [[package]] @@ -554,7 +554,7 @@ wheels = [ [[package]] name = "langfuse" -version = "4.6.0b1" +version = "4.6.0a1" source = { editable = "." } dependencies = [ { name = "backoff" },