Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions langfuse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@
is_known_llm_instrumentor,
is_langfuse_span,
)
from .types import (
MaskOtelSpansFunction,
MaskOtelSpansParams,
MaskOtelSpansResult,
OtelSpanData,
OtelSpanIdentifier,
OtelSpanPatch,
)

Langfuse = _client_module.Langfuse

Expand Down Expand Up @@ -71,6 +79,12 @@
"is_genai_span",
"is_known_llm_instrumentor",
"KNOWN_LLM_INSTRUMENTATION_SCOPE_PREFIXES",
"MaskOtelSpansFunction",
"MaskOtelSpansParams",
"MaskOtelSpansResult",
"OtelSpanData",
"OtelSpanIdentifier",
"OtelSpanPatch",
"experiment",
"api",
]
13 changes: 11 additions & 2 deletions langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,13 @@
PromptClient,
TextPromptClient,
)
from langfuse.types import MaskFunction, ScoreDataType, SpanLevel, TraceContext
from langfuse.types import (
MaskFunction,
MaskOtelSpansFunction,
ScoreDataType,
SpanLevel,
TraceContext,
)


class Langfuse:
Expand Down Expand Up @@ -169,7 +175,8 @@ class Langfuse:
release (Optional[str]): Release version/hash of your application. Used for grouping analytics by release.
media_upload_thread_count (Optional[int]): Number of background threads for handling media uploads. Defaults to 1. Can also be set via LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT environment variable.
sample_rate (Optional[float]): Sampling rate for traces (0.0 to 1.0). Defaults to 1.0 (100% of traces are sampled). Can also be set via LANGFUSE_SAMPLE_RATE environment variable.
mask (Optional[MaskFunction]): Function to mask sensitive data in traces before sending to the API.
mask (Optional[MaskFunction]): Function to mask sensitive data synchronously when Langfuse SDK attributes are created. This applies only to attributes set through Langfuse SDK APIs.
mask_otel_spans (Optional[MaskOtelSpansFunction]): Synchronous export-stage hook that receives a batch of OpenTelemetry span snapshots and can return sparse attribute patches before spans are sent to Langfuse. This runs after export filtering and affects only spans exported by this Langfuse client, not spans already sent to other observability backends. It usually runs on the OpenTelemetry batch span processor worker thread; during flush and shutdown, it may run on the caller thread. The batch contents are governed by the OpenTelemetry export batch settings and are not guaranteed to contain a complete trace or request. If the hook raises or returns an invalid batch result, the whole export batch is dropped.
blocked_instrumentation_scopes (Optional[List[str]]): Deprecated. Use `should_export_span` instead. Equivalent behavior:
```python
from langfuse.span_filter import is_default_export_span
Expand Down Expand Up @@ -246,6 +253,7 @@ def __init__(
media_upload_thread_count: Optional[int] = None,
sample_rate: Optional[float] = None,
mask: Optional[MaskFunction] = None,
mask_otel_spans: Optional[MaskOtelSpansFunction] = None,
blocked_instrumentation_scopes: Optional[List[str]] = None,
should_export_span: Optional[Callable[[ReadableSpan], bool]] = None,
additional_headers: Optional[Dict[str, str]] = None,
Expand Down Expand Up @@ -342,6 +350,7 @@ def __init__(
media_upload_thread_count=media_upload_thread_count,
sample_rate=sample_rate,
mask=mask,
mask_otel_spans=mask_otel_spans,
tracing_enabled=self._tracing_enabled,
blocked_instrumentation_scopes=blocked_instrumentation_scopes,
should_export_span=should_export_span,
Expand Down
1 change: 1 addition & 0 deletions langfuse/_client/get_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def _create_client_from_instance(
media_upload_thread_count=instance.media_upload_thread_count,
sample_rate=instance.sample_rate,
mask=instance.mask,
mask_otel_spans=instance.mask_otel_spans,
blocked_instrumentation_scopes=instance.blocked_instrumentation_scopes,
should_export_span=instance.should_export_span,
additional_headers=instance.additional_headers,
Expand Down
62 changes: 34 additions & 28 deletions langfuse/_client/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from langfuse._utils.request import LangfuseClient
from langfuse.api import AsyncLangfuseAPI, LangfuseAPI
from langfuse.logger import langfuse_logger
from langfuse.types import MaskFunction
from langfuse.types import MaskFunction, MaskOtelSpansFunction

from .._version import __version__ as langfuse_version

Expand Down Expand Up @@ -94,6 +94,7 @@ def __new__(
media_upload_thread_count: Optional[int] = None,
sample_rate: Optional[float] = None,
mask: Optional[MaskFunction] = None,
mask_otel_spans: Optional[MaskOtelSpansFunction] = None,
tracing_enabled: Optional[bool] = None,
blocked_instrumentation_scopes: Optional[List[str]] = None,
should_export_span: Optional[Callable[[ReadableSpan], bool]] = None,
Expand Down Expand Up @@ -128,6 +129,7 @@ def __new__(
media_upload_thread_count=media_upload_thread_count,
sample_rate=sample_rate,
mask=mask,
mask_otel_spans=mask_otel_spans,
tracing_enabled=tracing_enabled
if tracing_enabled is not None
else True,
Expand Down Expand Up @@ -157,6 +159,7 @@ def _initialize_instance(
httpx_client: Optional[httpx.Client] = None,
sample_rate: Optional[float] = None,
mask: Optional[MaskFunction] = None,
mask_otel_spans: Optional[MaskOtelSpansFunction] = None,
tracing_enabled: bool = True,
blocked_instrumentation_scopes: Optional[List[str]] = None,
should_export_span: Optional[Callable[[ReadableSpan], bool]] = None,
Expand All @@ -169,6 +172,7 @@ def _initialize_instance(
self.tracing_enabled = tracing_enabled
self.base_url = base_url
self.mask = mask
self.mask_otel_spans = mask_otel_spans
self.environment = environment

# Store additional client settings for get_client() to use
Expand All @@ -184,33 +188,6 @@ def _initialize_instance(
self.span_exporter = span_exporter
self.tracer_provider: Optional[TracerProvider] = None

# OTEL Tracer
if tracing_enabled:
tracer_provider = tracer_provider or _init_tracer_provider(
environment=environment, release=release, sample_rate=sample_rate
)
self.tracer_provider = tracer_provider

langfuse_processor = LangfuseSpanProcessor(
public_key=self.public_key,
secret_key=secret_key,
base_url=base_url,
timeout=timeout,
flush_at=flush_at,
flush_interval=flush_interval,
blocked_instrumentation_scopes=blocked_instrumentation_scopes,
should_export_span=should_export_span,
additional_headers=additional_headers,
span_exporter=span_exporter,
)
tracer_provider.add_span_processor(langfuse_processor)

self._otel_tracer = tracer_provider.get_tracer(
LANGFUSE_TRACER_NAME,
langfuse_version,
attributes={"public_key": self.public_key},
)

# API Clients

## API clients must be singletons because the underlying HTTPX clients
Expand Down Expand Up @@ -266,6 +243,35 @@ def _initialize_instance(
)
self._media_upload_consumers = []

# OTEL Tracer
if tracing_enabled:
tracer_provider = tracer_provider or _init_tracer_provider(
environment=environment, release=release, sample_rate=sample_rate
)
self.tracer_provider = tracer_provider

langfuse_processor = LangfuseSpanProcessor(
public_key=self.public_key,
secret_key=secret_key,
base_url=base_url,
timeout=timeout,
flush_at=flush_at,
flush_interval=flush_interval,
blocked_instrumentation_scopes=blocked_instrumentation_scopes,
should_export_span=should_export_span,
additional_headers=additional_headers,
span_exporter=span_exporter,
media_manager=self._media_manager,
mask_otel_spans=mask_otel_spans,
)
tracer_provider.add_span_processor(langfuse_processor)

self._otel_tracer = tracer_provider.get_tracer(
LANGFUSE_TRACER_NAME,
langfuse_version,
attributes={"public_key": self.public_key},
)

media_upload_thread_count = media_upload_thread_count or max(
int(os.getenv(LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT, 1)), 1
)
Expand Down
Loading
Loading