From 95de36a069ddb82a7286d0d892a4b86d3b17a886 Mon Sep 17 00:00:00 2001 From: louayboukhris Date: Fri, 17 Apr 2026 14:57:56 +0100 Subject: [PATCH] feat: add Python Azure OpenAI instrumentation package Add traceai-azure-openai package for tracing Azure OpenAI API calls. This brings Python parity with the existing Java Azure OpenAI integration. - AzureOpenAIInstrumentor for auto-instrumenting AzureOpenAI/AsyncAzureOpenAI clients - Azure-specific attributes: deployment name, API version, endpoint - Chat completions, embeddings, and completions tracing - Streaming and async support - Unit tests (7 tests) and usage examples --- python/frameworks/azure_openai/CHANGELOG.md | 7 + python/frameworks/azure_openai/README.md | 84 +++ .../examples/async_chat_completions.py | 35 + .../azure_openai/examples/chat_completions.py | 30 + .../examples/chat_completions_stream.py | 35 + .../azure_openai/examples/embeddings.py | 32 + .../azure_openai/examples/requirements.txt | 3 + python/frameworks/azure_openai/pyproject.toml | 18 + .../traceai_azure_openai/__init__.py | 78 +++ .../_attributes/__init__.py | 0 .../_attributes/_responses_api.py | 651 ++++++++++++++++++ .../traceai_azure_openai/_request.py | 493 +++++++++++++ .../_request_attributes_extractor.py | 240 +++++++ .../_response_accumulator.py | 374 ++++++++++ .../_response_attributes_extractor.py | 282 ++++++++ .../traceai_azure_openai/_span_io_handler.py | 167 +++++ .../traceai_azure_openai/_stream.py | 179 +++++ .../traceai_azure_openai/_utils.py | 148 ++++ .../traceai_azure_openai/_with_span.py | 95 +++ .../traceai_azure_openai/package.py | 2 + .../traceai_azure_openai/py.typed | 0 .../traceai_azure_openai/version.py | 1 + python/tests/test_framework_azure_openai.py | 300 ++++++++ 23 files changed, 3254 insertions(+) create mode 100644 python/frameworks/azure_openai/CHANGELOG.md create mode 100644 python/frameworks/azure_openai/README.md create mode 100644 python/frameworks/azure_openai/examples/async_chat_completions.py create mode 100644 python/frameworks/azure_openai/examples/chat_completions.py create mode 100644 python/frameworks/azure_openai/examples/chat_completions_stream.py create mode 100644 python/frameworks/azure_openai/examples/embeddings.py create mode 100644 python/frameworks/azure_openai/examples/requirements.txt create mode 100644 python/frameworks/azure_openai/pyproject.toml create mode 100644 python/frameworks/azure_openai/traceai_azure_openai/__init__.py create mode 100644 python/frameworks/azure_openai/traceai_azure_openai/_attributes/__init__.py create mode 100644 python/frameworks/azure_openai/traceai_azure_openai/_attributes/_responses_api.py create mode 100644 python/frameworks/azure_openai/traceai_azure_openai/_request.py create mode 100644 python/frameworks/azure_openai/traceai_azure_openai/_request_attributes_extractor.py create mode 100644 python/frameworks/azure_openai/traceai_azure_openai/_response_accumulator.py create mode 100644 python/frameworks/azure_openai/traceai_azure_openai/_response_attributes_extractor.py create mode 100644 python/frameworks/azure_openai/traceai_azure_openai/_span_io_handler.py create mode 100644 python/frameworks/azure_openai/traceai_azure_openai/_stream.py create mode 100644 python/frameworks/azure_openai/traceai_azure_openai/_utils.py create mode 100644 python/frameworks/azure_openai/traceai_azure_openai/_with_span.py create mode 100644 python/frameworks/azure_openai/traceai_azure_openai/package.py create mode 100644 python/frameworks/azure_openai/traceai_azure_openai/py.typed create mode 100644 python/frameworks/azure_openai/traceai_azure_openai/version.py create mode 100644 python/tests/test_framework_azure_openai.py diff --git a/python/frameworks/azure_openai/CHANGELOG.md b/python/frameworks/azure_openai/CHANGELOG.md new file mode 100644 index 00000000..68f4eb31 --- /dev/null +++ b/python/frameworks/azure_openai/CHANGELOG.md @@ -0,0 +1,7 @@ +## [0.1.0] - 2026-04-10 +### Feature +- Initial release with Azure OpenAI instrumentation support +- Chat completions, embeddings, and completions tracing +- Azure-specific attribute capture (deployment, API version, endpoint) +- Streaming and non-streaming response support +- Async and sync client support diff --git a/python/frameworks/azure_openai/README.md b/python/frameworks/azure_openai/README.md new file mode 100644 index 00000000..0c1165ed --- /dev/null +++ b/python/frameworks/azure_openai/README.md @@ -0,0 +1,84 @@ +# Azure OpenAI OpenTelemetry Integration + +## Overview +This integration provides support for using OpenTelemetry with Azure OpenAI. It enables tracing and monitoring of applications built with Azure OpenAI, capturing chat completions, embeddings, and completions with Azure-specific attributes such as deployment name and API version. + +## Installation + +1. **Install traceAI Azure OpenAI** + +```bash +pip install traceAI-azure-openai +``` + +### Set Environment Variables +Set up your environment variables to authenticate with FutureAGI and Azure OpenAI + +```python +import os + +os.environ["FI_API_KEY"] = FI_API_KEY +os.environ["FI_SECRET_KEY"] = FI_SECRET_KEY +os.environ["AZURE_OPENAI_ENDPOINT"] = AZURE_OPENAI_ENDPOINT +os.environ["AZURE_OPENAI_API_KEY"] = AZURE_OPENAI_API_KEY +os.environ["AZURE_OPENAI_DEPLOYMENT"] = AZURE_OPENAI_DEPLOYMENT +os.environ["AZURE_OPENAI_API_VERSION"] = AZURE_OPENAI_API_VERSION +``` + +## Quickstart + +### Register Tracer Provider +Set up the trace provider to establish the observability pipeline. The trace provider: + +```python +from fi_instrumentation import register +from fi_instrumentation.fi_types import ProjectType + +trace_provider = register( + project_type=ProjectType.OBSERVE, + project_name="azure_openai_app" +) +``` + +### Configure Azure OpenAI Instrumentation +Set up your Azure OpenAI client with built-in observability. + +```python +from traceai_azure_openai import AzureOpenAIInstrumentor + +AzureOpenAIInstrumentor().instrument(tracer_provider=trace_provider) +``` + +### Create Azure OpenAI Components +Set up your Azure OpenAI client with built-in observability. + +```python +from openai import AzureOpenAI + +client = AzureOpenAI( + azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], + api_key=os.environ["AZURE_OPENAI_API_KEY"], + api_version=os.environ["AZURE_OPENAI_API_VERSION"], +) + +response = client.chat.completions.create( + model=os.environ["AZURE_OPENAI_DEPLOYMENT"], + messages=[ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "Can you tell me a joke?"} + ] +) + +print(response.choices[0].message.content) +``` + +## Azure-Specific Attributes + +This instrumentation captures the following Azure-specific span attributes in addition to the standard GenAI semantic conventions: + +| Attribute | Description | +|-----------|-------------| +| `gen_ai.provider.name` | Set to `azure` | +| `gen_ai.azure.deployment` | The Azure OpenAI deployment name | +| `gen_ai.azure.api_version` | The Azure OpenAI API version | +| `server.address` | The Azure OpenAI endpoint hostname | diff --git a/python/frameworks/azure_openai/examples/async_chat_completions.py b/python/frameworks/azure_openai/examples/async_chat_completions.py new file mode 100644 index 00000000..241b5d43 --- /dev/null +++ b/python/frameworks/azure_openai/examples/async_chat_completions.py @@ -0,0 +1,35 @@ +import asyncio +import os +from openai import AsyncAzureOpenAI + +from fi_instrumentation.otel import register +from fi_instrumentation.fi_types import ProjectType +from traceai_azure_openai import AzureOpenAIInstrumentor + +# Configure trace provider +trace_provider = register( + project_type=ProjectType.OBSERVE, + project_name="azure_openai_async_app", +) + +# Initialize the Azure OpenAI instrumentor +AzureOpenAIInstrumentor().instrument(tracer_provider=trace_provider) + + +async def main(): + client = AsyncAzureOpenAI( + azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], + api_key=os.environ["AZURE_OPENAI_API_KEY"], + api_version=os.environ.get("AZURE_OPENAI_API_VERSION", "2024-02-01"), + ) + + response = await client.chat.completions.create( + model=os.environ["AZURE_OPENAI_DEPLOYMENT"], + messages=[{"role": "user", "content": "Write a haiku about Azure cloud."}], + max_tokens=50, + ) + print(response.choices[0].message.content) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/frameworks/azure_openai/examples/chat_completions.py b/python/frameworks/azure_openai/examples/chat_completions.py new file mode 100644 index 00000000..fefd9f9e --- /dev/null +++ b/python/frameworks/azure_openai/examples/chat_completions.py @@ -0,0 +1,30 @@ +import os +from openai import AzureOpenAI + +from fi_instrumentation.otel import register +from fi_instrumentation.fi_types import ProjectType +from traceai_azure_openai import AzureOpenAIInstrumentor + +# Configure trace provider +trace_provider = register( + project_type=ProjectType.OBSERVE, + project_name="azure_openai_app", +) + +# Initialize the Azure OpenAI instrumentor +AzureOpenAIInstrumentor().instrument(tracer_provider=trace_provider) + + +if __name__ == "__main__": + client = AzureOpenAI( + azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], + api_key=os.environ["AZURE_OPENAI_API_KEY"], + api_version=os.environ.get("AZURE_OPENAI_API_VERSION", "2024-02-01"), + ) + + response = client.chat.completions.create( + model=os.environ["AZURE_OPENAI_DEPLOYMENT"], + messages=[{"role": "user", "content": "Write a haiku."}], + max_tokens=20, + ) + print(response.choices[0].message.content) diff --git a/python/frameworks/azure_openai/examples/chat_completions_stream.py b/python/frameworks/azure_openai/examples/chat_completions_stream.py new file mode 100644 index 00000000..c131ea86 --- /dev/null +++ b/python/frameworks/azure_openai/examples/chat_completions_stream.py @@ -0,0 +1,35 @@ +import os +from openai import AzureOpenAI + +from fi_instrumentation.otel import register +from fi_instrumentation.fi_types import ProjectType +from traceai_azure_openai import AzureOpenAIInstrumentor + +# Configure trace provider +trace_provider = register( + project_type=ProjectType.OBSERVE, + project_name="azure_openai_stream_app", +) + +# Initialize the Azure OpenAI instrumentor +AzureOpenAIInstrumentor().instrument(tracer_provider=trace_provider) + + +if __name__ == "__main__": + client = AzureOpenAI( + azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], + api_key=os.environ["AZURE_OPENAI_API_KEY"], + api_version=os.environ.get("AZURE_OPENAI_API_VERSION", "2024-02-01"), + ) + + stream = client.chat.completions.create( + model=os.environ["AZURE_OPENAI_DEPLOYMENT"], + messages=[{"role": "user", "content": "Tell me a short story."}], + max_tokens=100, + stream=True, + ) + + for chunk in stream: + if chunk.choices[0].delta.content is not None: + print(chunk.choices[0].delta.content, end="") + print() diff --git a/python/frameworks/azure_openai/examples/embeddings.py b/python/frameworks/azure_openai/examples/embeddings.py new file mode 100644 index 00000000..bf1d4e67 --- /dev/null +++ b/python/frameworks/azure_openai/examples/embeddings.py @@ -0,0 +1,32 @@ +import os +from openai import AzureOpenAI + +from fi_instrumentation.otel import register +from fi_instrumentation.fi_types import ProjectType +from traceai_azure_openai import AzureOpenAIInstrumentor + +# Configure trace provider +trace_provider = register( + project_type=ProjectType.OBSERVE, + project_name="azure_openai_embeddings_app", +) + +# Initialize the Azure OpenAI instrumentor +AzureOpenAIInstrumentor().instrument(tracer_provider=trace_provider) + + +if __name__ == "__main__": + client = AzureOpenAI( + azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], + api_key=os.environ["AZURE_OPENAI_API_KEY"], + api_version=os.environ.get("AZURE_OPENAI_API_VERSION", "2024-02-01"), + ) + + response = client.embeddings.create( + model=os.environ.get("AZURE_OPENAI_EMBEDDING_DEPLOYMENT", "text-embedding-ada-002"), + input=["Hello world", "Azure OpenAI embeddings are great!"], + ) + + for i, embedding in enumerate(response.data): + print(f"Embedding {i}: dimension={len(embedding.embedding)}") + print(f"Total tokens: {response.usage.total_tokens}") diff --git a/python/frameworks/azure_openai/examples/requirements.txt b/python/frameworks/azure_openai/examples/requirements.txt new file mode 100644 index 00000000..073e5039 --- /dev/null +++ b/python/frameworks/azure_openai/examples/requirements.txt @@ -0,0 +1,3 @@ +traceAI-azure-openai +openai +fi-instrumentation-otel diff --git a/python/frameworks/azure_openai/pyproject.toml b/python/frameworks/azure_openai/pyproject.toml new file mode 100644 index 00000000..5bb29625 --- /dev/null +++ b/python/frameworks/azure_openai/pyproject.toml @@ -0,0 +1,18 @@ +[tool.poetry] +name = "traceAI-azure-openai" +version = "0.1.0" +description = "OpenTelemetry instrumentation for Azure OpenAI" +authors = ["Future AGI "] +readme = "README.md" +packages = [ + { include = "traceai_azure_openai" } +] + +[tool.poetry.dependencies] +python = ">3.9,<3.14" +fi-instrumentation-otel = ">=0.1.11" +openai = ">=1.69.0" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" diff --git a/python/frameworks/azure_openai/traceai_azure_openai/__init__.py b/python/frameworks/azure_openai/traceai_azure_openai/__init__.py new file mode 100644 index 00000000..fd8a9c46 --- /dev/null +++ b/python/frameworks/azure_openai/traceai_azure_openai/__init__.py @@ -0,0 +1,78 @@ +import logging +from importlib import import_module +from typing import Any, Collection + +import logging +logger = logging.getLogger(__name__) +try: + from fi.evals import Protect +except ImportError: + logger.warning("ai-evaluation is not installed, please install it to trace protect") + Protect = None + pass +from fi_instrumentation import FITracer, TraceConfig +from fi_instrumentation.instrumentation._protect_wrapper import GuardrailProtectWrapper +from opentelemetry import trace as trace_api +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore +from traceai_azure_openai._request import _AsyncRequest, _Request +from traceai_azure_openai.package import _instruments +from traceai_azure_openai.version import __version__ +from wrapt import wrap_function_wrapper + +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + +_MODULE = "openai" + + +class AzureOpenAIInstrumentor(BaseInstrumentor): # type: ignore + """ + An instrumentor for Azure OpenAI + """ + + __slots__ = ( + "_original_request", + "_original_async_request", + ) + + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments + + def _instrument(self, **kwargs: Any) -> None: + if not (tracer_provider := kwargs.get("tracer_provider")): + tracer_provider = trace_api.get_tracer_provider() + if not (config := kwargs.get("config")): + config = TraceConfig() + else: + assert isinstance(config, TraceConfig) + tracer = FITracer( + trace_api.get_tracer(__name__, __version__, tracer_provider), + config=config, + ) + openai = import_module(_MODULE) + self._original_request = openai.OpenAI.request + self._original_async_request = openai.AsyncOpenAI.request + wrap_function_wrapper( + module=_MODULE, + name="OpenAI.request", + wrapper=_Request(tracer=tracer, openai=openai), + ) + wrap_function_wrapper( + module=_MODULE, + name="AsyncOpenAI.request", + wrapper=_AsyncRequest(tracer=tracer, openai=openai), + ) + if Protect is not None: + self._original_protect = Protect.protect + wrap_function_wrapper( + module="fi.evals", + name="Protect.protect", + wrapper=GuardrailProtectWrapper(tracer), + ) + else: + self._original_protect = None + + def _uninstrument(self, **kwargs: Any) -> None: + openai = import_module(_MODULE) + openai.OpenAI.request = self._original_request + openai.AsyncOpenAI.request = self._original_async_request diff --git a/python/frameworks/azure_openai/traceai_azure_openai/_attributes/__init__.py b/python/frameworks/azure_openai/traceai_azure_openai/_attributes/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/frameworks/azure_openai/traceai_azure_openai/_attributes/_responses_api.py b/python/frameworks/azure_openai/traceai_azure_openai/_attributes/_responses_api.py new file mode 100644 index 00000000..8afe8ab1 --- /dev/null +++ b/python/frameworks/azure_openai/traceai_azure_openai/_attributes/_responses_api.py @@ -0,0 +1,651 @@ +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Any, Callable, Iterable, Iterator, Tuple, Union + +from opentelemetry.util.types import AttributeValue +from typing_extensions import assert_never + +from fi_instrumentation import safe_json_dumps +from fi_instrumentation.fi_types import ( + ImageAttributes, + MessageAttributes, + MessageContentAttributes, + SpanAttributes, + ToolAttributes, + ToolCallAttributes, +) + +if TYPE_CHECKING: + from openai.types import responses + +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + + +def stop_on_exception( + wrapped: Callable[..., Iterator[Tuple[str, Any]]], +) -> Callable[..., Iterator[Tuple[str, Any]]]: + def wrapper(*args: Any, **kwargs: Any) -> Iterator[Tuple[str, Any]]: + try: + yield from wrapped(*args, **kwargs) + except Exception: + logger.exception(f"Failed to get attribute in {wrapped.__name__}.") + + return wrapper + + +class _ResponsesApiAttributes: + @classmethod + @stop_on_exception + def _get_attributes_from_message_param( + cls, + obj: Union[ + responses.easy_input_message_param.EasyInputMessageParam, + responses.response_input_param.Message, + responses.response_output_message_param.ResponseOutputMessageParam, + ], + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + if (role := obj.get("role")) is not None: + yield f"{prefix}{MessageAttributes.MESSAGE_ROLE}", role + if (content := obj.get("content")) is not None: + if isinstance(content, str): + yield f"{prefix}{MessageAttributes.MESSAGE_CONTENT}", content + elif isinstance(content, Iterable): + if TYPE_CHECKING: + assert not isinstance(content, str) + yield from cls._get_attributes_from_message_param_content_list(content, prefix) + + @classmethod + @stop_on_exception + def _get_attributes_from_message_param_content_list( + cls, + obj: Union[ + Iterable[responses.response_input_message_content_list_param.ResponseInputContentParam], + Iterable[responses.response_output_message_param.Content], + ], + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + for i, item in enumerate(obj): + if "type" not in item: + continue + inner_prefix = f"{prefix}{MessageAttributes.MESSAGE_CONTENTS}.{i}." + if item["type"] == "input_text": + yield from cls._get_attributes_from_response_input_text_param(item, inner_prefix) + elif item["type"] == "output_text": + yield from cls._get_attributes_from_response_output_text_param(item, inner_prefix) + elif item["type"] == "input_image": + yield from cls._get_attributes_from_response_input_image_param(item, inner_prefix) + elif item["type"] == "input_file": + # TODO: Handle input file + pass + elif item["type"] == "refusal": + yield from cls._get_attributes_from_response_output_refusal_param( + item, inner_prefix + ) + elif TYPE_CHECKING: + assert_never(item["type"]) + + @classmethod + @stop_on_exception + def _get_attributes_from_output_message_content( + cls, + obj: Union[ + responses.response_output_text.ResponseOutputText, + responses.response_output_refusal.ResponseOutputRefusal, + ], + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + from openai.types import responses + + if isinstance(obj, responses.ResponseOutputText): + yield from cls._get_attributes_from_response_output_text(obj, prefix) + elif isinstance(obj, responses.ResponseOutputRefusal): + yield from cls._get_attributes_from_response_output_refusal(obj, prefix) + elif TYPE_CHECKING: + assert_never(obj) + + @classmethod + @stop_on_exception + def _get_attributes_from_output_message_content_list( + cls, + obj: Iterable[ + Union[ + responses.response_output_text.ResponseOutputText, + responses.response_output_refusal.ResponseOutputRefusal, + ] + ], + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + for i, item in enumerate(obj): + yield from cls._get_attributes_from_output_message_content( + item, f"{prefix}{MessageAttributes.MESSAGE_CONTENTS}.{i}." + ) + + @classmethod + @stop_on_exception + def _get_attributes_from_response( + cls, + obj: responses.response.Response, + ) -> Iterator[Tuple[str, AttributeValue]]: + yield SpanAttributes.GEN_AI_REQUEST_MODEL, obj.model + if obj.usage: + yield from cls._get_attributes_from_response_usage(obj.usage) + if isinstance(obj.output, Iterable): + for i, item in enumerate(obj.output): + yield from cls._get_attributes_from_response_output_item( + item, + f"{SpanAttributes.GEN_AI_OUTPUT_MESSAGES}.{i}.", + ) + if isinstance(obj.tools, Iterable): + for i, tool in enumerate(obj.tools): + yield from cls._get_attributes_from_response_tool( + tool, + f"{SpanAttributes.GEN_AI_TOOL_DEFINITIONS}.{i}.", + ) + + @classmethod + @stop_on_exception + def _get_attributes_from_response_computer_tool_call( + cls, + obj: responses.response_computer_tool_call.ResponseComputerToolCall, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + yield f"{prefix}{MessageAttributes.MESSAGE_ROLE}", "tool" + if (call_id := obj.call_id) is not None: + yield f"{prefix}{MessageAttributes.MESSAGE_TOOL_CALL_ID}", call_id + + @classmethod + @stop_on_exception + def _get_attributes_from_response_computer_tool_call_param( + cls, + obj: responses.response_computer_tool_call_param.ResponseComputerToolCallParam, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + yield f"{prefix}{MessageAttributes.MESSAGE_ROLE}", "tool" + if (call_id := obj.get("call_id")) is not None: + yield f"{prefix}{MessageAttributes.MESSAGE_TOOL_CALL_ID}", call_id + + @classmethod + @stop_on_exception + def _get_attributes_from_response_create_param_base( + cls, + obj: responses.response_create_params.ResponseCreateParamsBase, + ) -> Iterator[Tuple[str, AttributeValue]]: + invocation_params = dict(obj) + invocation_params.pop("input", None) + invocation_params.pop("instructions", None) + if isinstance((tools := invocation_params.pop("tools", None)), Iterable): + for i, tool in enumerate(tools): + yield from cls._get_attributes_from_response_tool_param( + tool, f"{SpanAttributes.GEN_AI_TOOL_DEFINITIONS}.{i}." + ) + yield SpanAttributes.GEN_AI_REQUEST_PARAMETERS, safe_json_dumps(invocation_params) + if (model := obj.get("model")) is not None: + yield SpanAttributes.GEN_AI_REQUEST_MODEL, model + if (instructions := obj.get("instructions")) is not None: + yield ( + f"{SpanAttributes.GEN_AI_INPUT_MESSAGES}.0.{MessageAttributes.MESSAGE_ROLE}", + "system", + ) + yield ( + f"{SpanAttributes.GEN_AI_INPUT_MESSAGES}.0.{MessageAttributes.MESSAGE_CONTENT}", + instructions, + ) + if (input := obj.get("input")) is not None: + if isinstance(input, str): + yield ( + f"{SpanAttributes.GEN_AI_INPUT_MESSAGES}.1.{MessageAttributes.MESSAGE_ROLE}", + "user", + ) + yield ( + f"{SpanAttributes.GEN_AI_INPUT_MESSAGES}.1.{MessageAttributes.MESSAGE_CONTENT}", + input, + ) + elif isinstance(input, list): + yield from cls._get_attributes_from_response_input_item_params(input) + + @classmethod + @stop_on_exception + def _get_attributes_from_response_file_search_tool_call( + cls, + obj: responses.response_file_search_tool_call.ResponseFileSearchToolCall, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + if (id := obj.id) is not None: + yield f"{prefix}{ToolCallAttributes.TOOL_CALL_ID}", id + if (type := obj.type) is not None: + yield f"{prefix}{ToolCallAttributes.TOOL_CALL_FUNCTION_NAME}", type + + @classmethod + @stop_on_exception + def _get_attributes_from_response_file_search_tool_call_param( + cls, + obj: responses.response_file_search_tool_call_param.ResponseFileSearchToolCallParam, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + if (id := obj.get("id")) is not None: + yield f"{prefix}{ToolCallAttributes.TOOL_CALL_ID}", id + if (type := obj.get("type")) is not None: + yield f"{prefix}{ToolCallAttributes.TOOL_CALL_FUNCTION_NAME}", type + + @classmethod + @stop_on_exception + def _get_attributes_from_response_tool( + cls, + obj: responses.tool.Tool, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + yield (f"{prefix}{ToolAttributes.TOOL_JSON_SCHEMA}", obj.model_dump_json()) + + @classmethod + @stop_on_exception + def _get_attributes_from_response_tool_param( + cls, + obj: responses.tool_param.ToolParam, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + yield (f"{prefix}{ToolAttributes.TOOL_JSON_SCHEMA}", safe_json_dumps(obj)) + + @classmethod + @stop_on_exception + def _get_attributes_from_response_function_tool_call( + cls, + obj: responses.ResponseFunctionToolCall, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + if (call_id := obj.call_id) is not None: + yield f"{prefix}{ToolCallAttributes.TOOL_CALL_ID}", call_id + if (name := obj.name) is not None: + yield f"{prefix}{ToolCallAttributes.TOOL_CALL_FUNCTION_NAME}", name + if obj.arguments != "{}": + yield ( + f"{prefix}{ToolCallAttributes.TOOL_CALL_FUNCTION_ARGUMENTS_JSON}", + obj.arguments, + ) + + @classmethod + @stop_on_exception + def _get_attributes_from_response_function_tool_call_param( + cls, + obj: responses.response_function_tool_call_param.ResponseFunctionToolCallParam, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + if (call_id := obj.get("call_id")) is not None: + yield f"{prefix}{ToolCallAttributes.TOOL_CALL_ID}", call_id + if (name := obj.get("name")) is not None: + yield f"{prefix}{ToolCallAttributes.TOOL_CALL_FUNCTION_NAME}", name + if (arguments := obj.get("arguments")) is not None: + if arguments != "{}": + yield ( + f"{prefix}{ToolCallAttributes.TOOL_CALL_FUNCTION_ARGUMENTS_JSON}", + arguments, + ) + + @classmethod + @stop_on_exception + def _get_attributes_from_response_function_web_search( + cls, + obj: responses.response_function_web_search.ResponseFunctionWebSearch, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + if (id := obj.id) is not None: + yield f"{prefix}{ToolCallAttributes.TOOL_CALL_ID}", id + if (type := obj.type) is not None: + yield f"{prefix}{ToolCallAttributes.TOOL_CALL_FUNCTION_NAME}", type + + @classmethod + @stop_on_exception + def _get_attributes_from_response_function_web_search_param( + cls, + obj: responses.response_function_web_search_param.ResponseFunctionWebSearchParam, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + if (id := obj.get("id")) is not None: + yield f"{prefix}{ToolCallAttributes.TOOL_CALL_ID}", id + if (type := obj.get("type")) is not None: + yield f"{prefix}{ToolCallAttributes.TOOL_CALL_FUNCTION_NAME}", type + + @classmethod + @stop_on_exception + def _get_attributes_from_response_image_generation_call( + cls, + obj: responses.response_output_item.ImageGenerationCall, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + if (image := obj.result) is not None: + yield f"{prefix}{MessageContentAttributes.MESSAGE_CONTENT_TYPE}", "image" + yield f"{prefix}{MessageContentAttributes.MESSAGE_CONTENT_IMAGE}.status", obj.status + yield ( + f"{prefix}{MessageContentAttributes.MESSAGE_CONTENT_IMAGE}", + image, + ) + + @classmethod + @stop_on_exception + def _get_attributes_from_response_input_image_param( + cls, + obj: responses.response_input_image_param.ResponseInputImageParam, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + if "image_url" in obj and (image_url := obj["image_url"]): + yield f"{prefix}{MessageContentAttributes.MESSAGE_CONTENT_TYPE}", "image" + yield ( + f"{prefix}{MessageContentAttributes.MESSAGE_CONTENT_IMAGE}", + image_url, + ) + + @classmethod + @stop_on_exception + def _get_attributes_from_response_input_item_param( + cls, + obj: responses.response_input_param.ResponseInputItemParam, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + if "type" not in obj: + if "role" in obj and "content" in obj: + yield from cls._get_attributes_from_message_param( + { + "type": "message", + "role": obj["role"], # type: ignore[typeddict-item] + "content": obj["content"], # type: ignore[typeddict-item] + }, + prefix, + ) + elif obj["type"] == "message": + yield from cls._get_attributes_from_message_param(obj, prefix) + elif obj["type"] == "function_call": + yield f"{prefix}{MessageAttributes.MESSAGE_ROLE}", "assistant" + yield from cls._get_attributes_from_response_function_tool_call_param( + obj, + f"{prefix}{MessageAttributes.MESSAGE_TOOL_CALLS}.0.", + ) + elif obj["type"] == "function_call_output": + yield from cls._get_attributes_from_response_input_param_function_call_output( + obj, prefix + ) + elif obj["type"] == "reasoning": + yield from cls._get_attributes_from_response_reasoning_item_param(obj, prefix) + elif obj["type"] == "item_reference": + # TODO: Handle item reference + pass + elif obj["type"] == "file_search_call": + yield f"{prefix}{MessageAttributes.MESSAGE_ROLE}", "assistant" + yield from cls._get_attributes_from_response_file_search_tool_call_param( + obj, + f"{prefix}{MessageAttributes.MESSAGE_TOOL_CALLS}.0.", + ) + elif obj["type"] == "computer_call": + yield f"{prefix}{MessageAttributes.MESSAGE_ROLE}", "assistant" + yield from cls._get_attributes_from_response_computer_tool_call_param( + obj, + f"{prefix}{MessageAttributes.MESSAGE_TOOL_CALLS}.0.", + ) + elif obj["type"] == "computer_call_output": + yield from cls._get_attributes_from_response_input_param_computer_call_output( + obj, prefix + ) + elif obj["type"] == "web_search_call": + yield f"{prefix}{MessageAttributes.MESSAGE_ROLE}", "assistant" + yield from cls._get_attributes_from_response_function_web_search_param( + obj, + f"{prefix}{MessageAttributes.MESSAGE_TOOL_CALLS}.0.", + ) + elif obj["type"] == "image_generation_call": + # TODO: Handle image generation call + pass + elif obj["type"] == "code_interpreter_call": + # TODO: Handle code interpreter call + pass + elif obj["type"] == "local_shell_call": + # TODO: Handle local shell call + pass + elif obj["type"] == "local_shell_call_output": + # TODO: Handle local shell call output + pass + elif obj["type"] == "mcp_list_tools": + # TODO: Handle mcp list tools + pass + elif obj["type"] == "mcp_approval_request": + # TODO: Handle mcp approval request + pass + elif obj["type"] == "mcp_approval_response": + # TODO: Handle mcp approval response + pass + elif obj["type"] == "mcp_call": + # TODO: Handle mcp call + pass + elif TYPE_CHECKING and obj["type"] is not None: + assert_never(obj["type"]) + + @classmethod + @stop_on_exception + def _get_attributes_from_response_input_item_params( + cls, + obj: Iterable[responses.response_input_param.ResponseInputItemParam], + msg_idx: int = 1, + ) -> Iterator[Tuple[str, AttributeValue]]: + for i, item in enumerate(obj, msg_idx): + prefix = f"{SpanAttributes.GEN_AI_INPUT_MESSAGES}.{i}." + yield from cls._get_attributes_from_response_input_item_param(item, prefix) + + @classmethod + @stop_on_exception + def _get_attributes_from_response_input_param_computer_call_output( + cls, + obj: responses.response_input_param.ComputerCallOutput, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + yield f"{prefix}{MessageAttributes.MESSAGE_ROLE}", "tool" + if (call_id := obj.get("call_id")) is not None: + yield f"{prefix}{MessageAttributes.MESSAGE_TOOL_CALL_ID}", call_id + + @classmethod + @stop_on_exception + def _get_attributes_from_response_input_param_function_call_output( + cls, + obj: responses.response_input_param.FunctionCallOutput, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + yield f"{prefix}{MessageAttributes.MESSAGE_ROLE}", "tool" + if (call_id := obj.get("call_id")) is not None: + yield f"{prefix}{MessageAttributes.MESSAGE_TOOL_CALL_ID}", call_id + if (output := obj.get("output")) is not None: + yield f"{prefix}{MessageAttributes.MESSAGE_CONTENT}", output + + @classmethod + @stop_on_exception + def _get_attributes_from_response_input_text_param( + cls, + obj: responses.response_input_text_param.ResponseInputTextParam, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + if (text := obj.get("text")) is not None: + yield f"{prefix}{MessageContentAttributes.MESSAGE_CONTENT_TEXT}", text + yield f"{prefix}{MessageContentAttributes.MESSAGE_CONTENT_TYPE}", "text" + + @classmethod + @stop_on_exception + def _get_attributes_from_response_output_item( + cls, + obj: responses.response_output_item.ResponseOutputItem, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + if obj.type == "message": + yield from cls._get_attributes_from_response_output_message(obj, prefix) + elif obj.type == "function_call": + yield f"{prefix}{MessageAttributes.MESSAGE_ROLE}", "assistant" + yield from cls._get_attributes_from_response_function_tool_call( + obj, + f"{prefix}{MessageAttributes.MESSAGE_TOOL_CALLS}.0.", + ) + elif obj.type == "file_search_call": + yield f"{prefix}{MessageAttributes.MESSAGE_ROLE}", "assistant" + yield from cls._get_attributes_from_response_file_search_tool_call( + obj, + f"{prefix}{MessageAttributes.MESSAGE_TOOL_CALLS}.0.", + ) + elif obj.type == "computer_call": + yield f"{prefix}{MessageAttributes.MESSAGE_ROLE}", "assistant" + yield from cls._get_attributes_from_response_computer_tool_call( + obj, + f"{prefix}{MessageAttributes.MESSAGE_TOOL_CALLS}.0.", + ) + elif obj.type == "reasoning": + yield f"{prefix}{MessageAttributes.MESSAGE_ROLE}", "assistant" + yield from cls._get_attributes_from_response_reasoning_item(obj, prefix) + elif obj.type == "web_search_call": + yield f"{prefix}{MessageAttributes.MESSAGE_ROLE}", "assistant" + yield from cls._get_attributes_from_response_function_web_search( + obj, + f"{prefix}{MessageAttributes.MESSAGE_TOOL_CALLS}.0.", + ) + elif obj.type == "image_generation_call": + yield f"{prefix}{MessageAttributes.MESSAGE_ROLE}", "assistant" + yield from cls._get_attributes_from_response_image_generation_call( + obj, + f"{prefix}{MessageAttributes.MESSAGE_CONTENT}.0.", + ) + elif obj.type == "code_interpreter_call": + # TODO: Handle code interpreter call + pass + elif obj.type == "local_shell_call": + # TODO: Handle local shell call + pass + elif obj.type == "mcp_call": + # TODO: Handle mcp call + pass + elif obj.type == "mcp_list_tools": + # TODO: Handle mcp list tools + pass + elif obj.type == "mcp_approval_request": + # TODO: Handle mcp approval request + pass + elif TYPE_CHECKING: + assert_never(obj.type) + + @classmethod + @stop_on_exception + def _get_attributes_from_response_output_message( + cls, + obj: responses.response_output_message.ResponseOutputMessage, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + yield f"{prefix}{MessageAttributes.MESSAGE_ROLE}", obj.role + yield from cls._get_attributes_from_output_message_content_list(obj.content, prefix) + + @classmethod + @stop_on_exception + def _get_attributes_from_response_output_refusal( + cls, + obj: responses.response_output_refusal.ResponseOutputRefusal, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + yield f"{prefix}{MessageContentAttributes.MESSAGE_CONTENT_TYPE}", "text" + yield f"{prefix}{MessageContentAttributes.MESSAGE_CONTENT_TEXT}", obj.refusal + + @classmethod + @stop_on_exception + def _get_attributes_from_response_output_refusal_param( + cls, + obj: responses.response_output_refusal_param.ResponseOutputRefusalParam, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + if (refusal := obj.get("refusal")) is not None: + yield f"{prefix}{MessageContentAttributes.MESSAGE_CONTENT_TEXT}", refusal + yield f"{prefix}{MessageContentAttributes.MESSAGE_CONTENT_TYPE}", "text" + + @classmethod + @stop_on_exception + def _get_attributes_from_response_output_text( + cls, + obj: responses.response_output_text.ResponseOutputText, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + yield f"{prefix}{MessageContentAttributes.MESSAGE_CONTENT_TYPE}", "text" + yield f"{prefix}{MessageContentAttributes.MESSAGE_CONTENT_TEXT}", obj.text + + @classmethod + @stop_on_exception + def _get_attributes_from_response_output_text_param( + cls, + obj: responses.response_output_text_param.ResponseOutputTextParam, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + if (text := obj.get("text")) is not None: + yield f"{prefix}{MessageContentAttributes.MESSAGE_CONTENT_TEXT}", text + yield f"{prefix}{MessageContentAttributes.MESSAGE_CONTENT_TYPE}", "text" + + @classmethod + @stop_on_exception + def _get_attributes_from_response_reasoning_item( + cls, + obj: responses.response_reasoning_item.ResponseReasoningItem, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + if isinstance(obj.summary, Iterable): + for i, item in enumerate(obj.summary): + yield from cls._get_attributes_from_response_reasoning_item_summary( + item, + f"{prefix}{MessageAttributes.MESSAGE_CONTENTS}.{i}.", + ) + + @classmethod + @stop_on_exception + def _get_attributes_from_response_reasoning_item_param( + cls, + obj: responses.response_reasoning_item_param.ResponseReasoningItemParam, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + yield f"{prefix}{MessageAttributes.MESSAGE_ROLE}", "assistant" + if isinstance((summary := obj.get("summary")), Iterable): + for i, item in enumerate(summary): + if "type" not in item: + continue + if item["type"] == "summary_text": + yield from cls._get_attributes_from_response_reasoning_item_param_summary( + item, + f"{prefix}{MessageAttributes.MESSAGE_CONTENTS}.{i}.", + ) + elif TYPE_CHECKING: + assert_never(item["type"]) + + @classmethod + @stop_on_exception + def _get_attributes_from_response_reasoning_item_param_summary( + cls, + obj: responses.response_reasoning_item_param.Summary, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + if (text := obj.get("text")) is not None: + yield f"{prefix}{MessageContentAttributes.MESSAGE_CONTENT_TEXT}", text + yield f"{prefix}{MessageContentAttributes.MESSAGE_CONTENT_TYPE}", "text" + + @classmethod + @stop_on_exception + def _get_attributes_from_response_reasoning_item_summary( + cls, + obj: responses.response_reasoning_item.Summary, + prefix: str = "", + ) -> Iterator[Tuple[str, AttributeValue]]: + yield f"{prefix}{MessageContentAttributes.MESSAGE_CONTENT_TYPE}", "text" + yield f"{prefix}{MessageContentAttributes.MESSAGE_CONTENT_TEXT}", obj.text + + @classmethod + @stop_on_exception + def _get_attributes_from_response_usage( + cls, + obj: responses.response_usage.ResponseUsage, + ) -> Iterator[Tuple[str, AttributeValue]]: + yield SpanAttributes.GEN_AI_USAGE_TOTAL_TOKENS, obj.total_tokens + yield SpanAttributes.GEN_AI_USAGE_INPUT_TOKENS, obj.input_tokens + yield SpanAttributes.GEN_AI_USAGE_OUTPUT_TOKENS, obj.output_tokens + yield ( + SpanAttributes.GEN_AI_USAGE_OUTPUT_TOKENS_REASONING, + obj.output_tokens_details.reasoning_tokens, + ) + yield ( + SpanAttributes.GEN_AI_USAGE_INPUT_TOKENS_CACHE_READ, + obj.input_tokens_details.cached_tokens, + ) diff --git a/python/frameworks/azure_openai/traceai_azure_openai/_request.py b/python/frameworks/azure_openai/traceai_azure_openai/_request.py new file mode 100644 index 00000000..2aefbf6e --- /dev/null +++ b/python/frameworks/azure_openai/traceai_azure_openai/_request.py @@ -0,0 +1,493 @@ +import logging +from abc import ABC +from contextlib import contextmanager +from itertools import chain +from types import ModuleType +from typing import Any, Awaitable, Callable, Iterable, Iterator, Mapping, Tuple + +from fi_instrumentation import get_attributes_from_context +from fi_instrumentation.fi_types import ( + FiLLMProviderValues, + FiLLMSystemValues, + FiSpanKindValues, + SpanAttributes, +) +from openai.types.create_embedding_response import CreateEmbeddingResponse +from opentelemetry import context as context_api +from opentelemetry import trace as trace_api +from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY +from opentelemetry.trace import INVALID_SPAN +from opentelemetry.util.types import AttributeValue +from traceai_azure_openai._request_attributes_extractor import _RequestAttributesExtractor +from traceai_azure_openai._response_accumulator import ( + _ChatCompletionAccumulator, + _CompletionAccumulator, + _ResponsesAccumulator, +) +from traceai_azure_openai._response_attributes_extractor import _ResponseAttributesExtractor +from traceai_azure_openai._span_io_handler import add_io_to_span_attributes +from traceai_azure_openai._stream import _AsyncStream, _ResponseAccumulator, _Stream +from traceai_azure_openai._utils import ( + _as_input_attributes, + _finish_tracing, + _io_value_and_type, + safe_json_dumps, +) +from traceai_azure_openai._with_span import _WithSpan +from typing_extensions import TypeAlias + +__all__ = ( + "_Request", + "_AsyncRequest", +) + +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + + +class _WithTracer(ABC): + def __init__(self, tracer: trace_api.Tracer, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self._tracer = tracer + + @contextmanager + def _start_as_current_span( + self, + span_name: str, + attributes: Iterable[Tuple[str, AttributeValue]], + context_attributes: Iterable[Tuple[str, AttributeValue]], + extra_attributes: Iterable[Tuple[str, AttributeValue]], + ) -> Iterator[_WithSpan]: + # Because OTEL has a default limit of 128 attributes, we split our attributes into + # two tiers, where the addition of "extra_attributes" is deferred until the end + # and only after the "attributes" are added. + try: + span = self._tracer.start_span(name=span_name, attributes=dict(attributes)) + except Exception: + logger.exception("Failed to start span") + span = INVALID_SPAN + with trace_api.use_span( + span, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False, + ) as span: + yield _WithSpan( + span=span, + context_attributes=dict(context_attributes), + extra_attributes=dict(extra_attributes), + ) + + +_RequestParameters: TypeAlias = Mapping[str, Any] + + +def _is_azure_openai_instance(instance: Any) -> bool: + """Check if the client instance is an Azure OpenAI client.""" + try: + from openai.lib.azure import AzureOpenAI, AsyncAzureOpenAI + return isinstance(instance, (AzureOpenAI, AsyncAzureOpenAI)) + except ImportError: + return False + + +class _WithOpenAI(ABC): + __slots__ = ( + "_openai", + "_stream_types", + "_request_attributes_extractor", + "_response_attributes_extractor", + "_response_accumulator_factories", + ) + + def __init__(self, openai: ModuleType, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self._openai = openai + self._stream_types = (openai.Stream, openai.AsyncStream) + self._request_attributes_extractor = _RequestAttributesExtractor(openai=openai) + self._response_attributes_extractor = _ResponseAttributesExtractor( + openai=openai + ) + + def responses_accumulator(request_parameters: _RequestParameters) -> Any: + return _ResponsesAccumulator( + request_parameters=request_parameters, + chat_completion_type=openai.types.responses.response.Response, + response_attributes_extractor=self._response_attributes_extractor, + ) + + self._response_accumulator_factories: Mapping[ + type, Callable[[_RequestParameters], _ResponseAccumulator] + ] = { + openai.types.Completion: lambda request_parameters: _CompletionAccumulator( + request_parameters=request_parameters, + completion_type=openai.types.Completion, + response_attributes_extractor=self._response_attributes_extractor, + ), + openai.types.chat.ChatCompletion: lambda request_parameters: _ChatCompletionAccumulator( + request_parameters=request_parameters, + chat_completion_type=openai.types.chat.ChatCompletion, + response_attributes_extractor=self._response_attributes_extractor, + ), + openai.types.responses.response.Response: responses_accumulator, + } + + def _get_span_kind(self, cast_to: type) -> str: + return ( + FiSpanKindValues.EMBEDDING.value + if cast_to is self._openai.types.CreateEmbeddingResponse + else FiSpanKindValues.LLM.value + ) + + def _get_attributes_from_instance( + self, instance: Any + ) -> Iterator[Tuple[str, AttributeValue]]: + # Always set Azure as the provider for Azure OpenAI clients + yield SpanAttributes.GEN_AI_PROVIDER_NAME, FiLLMProviderValues.AZURE.value + # Extract Azure-specific attributes from the client instance + if azure_deployment := getattr(instance, "_azure_deployment", None): + yield "gen_ai.azure.deployment", azure_deployment + if api_version := getattr(instance, "_api_version", None): + yield "gen_ai.azure.api_version", api_version + if ( + (base_url := getattr(instance, "base_url", None)) + and (host := getattr(base_url, "host", None)) + and isinstance(host, str) + ): + yield "server.address", host + + def _get_attributes_from_request( + self, + cast_to: type, + request_parameters: Mapping[str, Any], + ) -> Iterator[Tuple[str, AttributeValue]]: + yield SpanAttributes.GEN_AI_SPAN_KIND, self._get_span_kind(cast_to=cast_to) + yield SpanAttributes.GEN_AI_PROVIDER_NAME, FiLLMProviderValues.AZURE.value + try: + yield from _as_input_attributes( + _io_value_and_type(request_parameters), + ) + except Exception: + logger.exception( + f"Failed to get input attributes from request parameters of " + f"type {type(request_parameters)}" + ) + + def _get_extra_attributes_from_request( + self, + cast_to: type, + request_parameters: Mapping[str, Any], + ) -> Iterator[Tuple[str, AttributeValue]]: + # Secondary attributes should be added after input and output to ensure + # that input and output are not dropped if there are too many attributes. + try: + yield from self._request_attributes_extractor.get_attributes_from_request( + cast_to=cast_to, + request_parameters=request_parameters, + ) + except Exception: + logger.exception( + f"Failed to get extra attributes from request options of " + f"type {type(request_parameters)}" + ) + + def _is_streaming(self, response: Any) -> bool: + return isinstance(response, self._stream_types) + + def _is_async_stream(self, response: Any) -> bool: + """Check if the response is an asynchronous stream.""" + return hasattr(response, "__aiter__") and callable(response.__aiter__) + + def _finalize_response( + self, + response: Any, + with_span: _WithSpan, + cast_to: type, + request_parameters: Mapping[str, Any], + ) -> Any: + """ + Monkey-patch the response object to trace the stream, or finish tracing if the response is + not a stream. + """ + if hasattr(response, "parse") and callable(response.parse): + try: + response.parse() + except Exception: + logger.exception(f"Failed to parse response of type {type(response)}") + + if self._is_streaming(response): + # Determine if the response is asynchronous + if self._is_async_stream(response): + stream_wrapper = _AsyncStream + else: + stream_wrapper = _Stream + + # Get the response accumulator if available + response_accumulator_factory = self._response_accumulator_factories.get( + cast_to + ) + response_accumulator = ( + response_accumulator_factory(request_parameters) + if response_accumulator_factory + else None + ) + + # Wrap the response with the appropriate stream wrapper + wrapped_stream = stream_wrapper( + stream=response, + with_span=with_span, + response_accumulator=response_accumulator, + ) + + return wrapped_stream + + else: + _finish_tracing( + status=trace_api.Status(status_code=trace_api.StatusCode.OK), + with_span=with_span, + has_attributes=_ResponseAttributes( + request_parameters=request_parameters, + response=response, + response_attributes_extractor=self._response_attributes_extractor, + ), + ) + return response + + def response_to_dict(self, response: Any) -> Any: + if hasattr(response, "to_dict") and callable(response.to_dict): + return response.to_dict() + elif isinstance(response, dict): + return response + elif hasattr(response, "__dict__"): + return response.__dict__ + else: + return str(response) + + +class _Request(_WithTracer, _WithOpenAI): + def __call__( + self, + wrapped: Callable[..., Any], + instance: Any, + args: Tuple[type, Any], + kwargs: Mapping[str, Any], + ) -> Any: + if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return wrapped(*args, **kwargs) + # Only instrument Azure OpenAI client instances + if not _is_azure_openai_instance(instance): + return wrapped(*args, **kwargs) + try: + cast_to, request_parameters = _parse_request_args(args) + # Extract input data more robustly + input_data = ( + request_parameters.get("messages") + or request_parameters.get("prompt") + or request_parameters.get("input") + ) + llm_tools = request_parameters.get("tools") + span_name: str = cast_to.__name__.split(".")[-1] + except Exception: + logger.exception("Failed to parse request args") + return wrapped(*args, **kwargs) + + with self._start_as_current_span( + span_name=span_name, + attributes=chain( + self._get_attributes_from_instance(instance), + self._get_attributes_from_request( + cast_to=cast_to, + request_parameters=request_parameters, + ), + ), + context_attributes=get_attributes_from_context(), + extra_attributes=self._get_extra_attributes_from_request( + cast_to=cast_to, + request_parameters=request_parameters, + ), + ) as with_span: + # Add input data to span attributes before the request + with_span.set_attribute( + SpanAttributes.GEN_AI_TOOL_DEFINITIONS, safe_json_dumps(llm_tools) + ) + add_io_to_span_attributes(with_span, input_data, None) + try: + response = wrapped(*args, **kwargs) + + if isinstance(response, CreateEmbeddingResponse): + embedding = response.to_dict() + with_span.set_attribute( + SpanAttributes.EMBEDDING_EMBEDDINGS, safe_json_dumps(embedding) + ) + + # Add output data to span attributes after getting response + if not self._is_streaming(response): + add_io_to_span_attributes( + with_span, + None, + response, + is_streaming=self._is_streaming(response), + ) + return self._finalize_response( + response=response, + with_span=with_span, + cast_to=cast_to, + request_parameters=request_parameters, + ) + except Exception as e: + status = trace_api.Status( + status_code=trace_api.StatusCode.ERROR, + description=f"{type(e).__name__}: {e}", + ) + with_span.record_exception(e) + with_span.finish_tracing(status=status) + raise + + +class _AsyncRequest(_WithTracer, _WithOpenAI): + async def __call__( + self, + wrapped: Callable[..., Awaitable[Any]], + instance: Any, + args: Tuple[type, Any], + kwargs: Mapping[str, Any], + ) -> Any: + if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return await wrapped(*args, **kwargs) + # Only instrument Azure OpenAI client instances + if not _is_azure_openai_instance(instance): + return await wrapped(*args, **kwargs) + try: + cast_to, request_parameters = _parse_request_args(args) + # Extract input data more robustly + input_data = request_parameters.get( + "messages", request_parameters.get("prompt") + ) + llm_tools = request_parameters.get("tools") + + span_name: str = cast_to.__name__.split(".")[-1] + except Exception: + logger.exception("Failed to parse request args") + return await wrapped(*args, **kwargs) + with self._start_as_current_span( + span_name=span_name, + attributes=chain( + self._get_attributes_from_instance(instance), + self._get_attributes_from_request( + cast_to=cast_to, + request_parameters=request_parameters, + ), + ), + context_attributes=get_attributes_from_context(), + extra_attributes=self._get_extra_attributes_from_request( + cast_to=cast_to, + request_parameters=request_parameters, + ), + ) as with_span: + # Add input data to span attributes before the request + with_span.set_attribute( + SpanAttributes.GEN_AI_TOOL_DEFINITIONS, safe_json_dumps(llm_tools) + ) + add_io_to_span_attributes(with_span, input_data, None) + try: + response = await wrapped(*args, **kwargs) + + if isinstance(response, CreateEmbeddingResponse): + embedding = response.to_dict() + with_span.set_attribute( + SpanAttributes.EMBEDDING_EMBEDDINGS, safe_json_dumps(embedding) + ) + + add_io_to_span_attributes( + with_span, None, response, is_streaming=self._is_streaming(response) + ) + except Exception as exception: + with_span.record_exception(exception) + status = trace_api.Status( + status_code=trace_api.StatusCode.ERROR, + # Follow the format in OTEL SDK for description, see: + # https://github.com/open-telemetry/opentelemetry-python/blob/2b9dcfc5d853d1c10176937a6bcaade54cda1a31/opentelemetry-api/src/opentelemetry/trace/__init__.py#L588 # noqa E501 + description=f"{type(exception).__name__}: {exception}", + ) + with_span.finish_tracing(status=status) + raise + try: + response = self._finalize_response( + response=response, + with_span=with_span, + cast_to=cast_to, + request_parameters=request_parameters, + ) + except Exception: + logger.exception( + f"Failed to finalize response of type {type(response)}" + ) + with_span.finish_tracing() + return response + + +def _parse_request_args(args: Tuple[type, Any]) -> Tuple[type, Mapping[str, Any]]: + # We don't use `signature(request).bind()` because `request` could have been monkey-patched + # (incorrectly) by others and the signature at runtime may not match the original. + # The targeted signature of `request` is here: + # https://github.com/openai/openai-python/blob/f1c7d714914e3321ca2e72839fe2d132a8646e7f/src/openai/_base_client.py#L846-L847 # noqa: E501 + cast_to: type = args[0] + request_parameters: Mapping[str, Any] = ( + json_data + # See https://github.com/openai/openai-python/blob/f1c7d714914e3321ca2e72839fe2d132a8646e7f/src/openai/_models.py#L427 # noqa: E501 + if hasattr(args[1], "json_data") + and isinstance(json_data := args[1].json_data, Mapping) + else {} + ) + return cast_to, request_parameters + + +class _ResponseAttributes: + __slots__ = ("_response", "_request_parameters", "_response_attributes_extractor") + + def __init__( + self, + response: Any, + request_parameters: Mapping[str, Any], + response_attributes_extractor: _ResponseAttributesExtractor, + ) -> None: + if hasattr(response, "parse") and callable(response.parse): + # E.g. see https://github.com/openai/openai-python/blob/f1c7d714914e3321ca2e72839fe2d132a8646e7f/src/openai/_base_client.py#L518 # noqa: E501 + try: + response = response.parse() + except Exception: + logger.exception(f"Failed to parse response of type {type(response)}") + self._request_parameters = request_parameters + self._response = response + self._response_attributes_extractor = response_attributes_extractor + + def get_attributes(self) -> Iterator[Tuple[str, AttributeValue]]: + # Extract just the assistant's response content, not the full API response + output_content = None + response = self._response + + if hasattr(response, "choices") and response.choices: + first_choice = response.choices[0] + if hasattr(first_choice, "message"): + message = first_choice.message + if hasattr(message, "tool_calls") and message.tool_calls: + # Handle tool calls + tool_calls = [ + f"Function: {tc.function.name}({tc.function.arguments})" + for tc in message.tool_calls + if tc.type == "function" + ] + output_content = " \n ".join(tool_calls) + elif getattr(message, "content", None): + output_content = message.content + elif hasattr(first_choice, "text"): + output_content = first_choice.text + + if output_content: + yield SpanAttributes.OUTPUT_VALUE, output_content + + def get_extra_attributes(self) -> Iterator[Tuple[str, AttributeValue]]: + yield from self._response_attributes_extractor.get_attributes_from_response( + response=self._response, + request_parameters=self._request_parameters, + ) diff --git a/python/frameworks/azure_openai/traceai_azure_openai/_request_attributes_extractor.py b/python/frameworks/azure_openai/traceai_azure_openai/_request_attributes_extractor.py new file mode 100644 index 00000000..c27c0a22 --- /dev/null +++ b/python/frameworks/azure_openai/traceai_azure_openai/_request_attributes_extractor.py @@ -0,0 +1,240 @@ +import logging +from enum import Enum +from types import ModuleType +from typing import ( + TYPE_CHECKING, + Any, + Iterable, + Iterator, + List, + Mapping, + Tuple, + Type, + TypeVar, + cast, +) + +from fi_instrumentation import safe_json_dumps +from traceai_azure_openai._attributes._responses_api import _ResponsesApiAttributes +from fi_instrumentation.fi_types import ( + ImageAttributes, + MessageAttributes, + MessageContentAttributes, + SpanAttributes, + ToolCallAttributes, +) +from opentelemetry.util.types import AttributeValue +from traceai_azure_openai._utils import _get_openai_version + +if TYPE_CHECKING: + from openai.types import Completion, CreateEmbeddingResponse + from openai.types.chat import ChatCompletion + from openai.types.responses.response import Response + from openai.types.responses.response_create_params import ResponseCreateParamsBase + +__all__ = ("_RequestAttributesExtractor",) + +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + + +class _RequestAttributesExtractor: + __slots__ = ( + "_openai", + "_chat_completion_type", + "_completion_type", + "_responses_type", + "_create_embedding_response_type", + ) + + def __init__(self, openai: ModuleType) -> None: + self._openai = openai + self._chat_completion_type: Type["ChatCompletion"] = ( + openai.types.chat.ChatCompletion + ) + self._completion_type: Type["Completion"] = openai.types.Completion + self._responses_type: Type["Response"] = openai.types.responses.response.Response + self._create_embedding_response_type: Type["CreateEmbeddingResponse"] = ( + openai.types.CreateEmbeddingResponse + ) + + def get_attributes_from_request( + self, + cast_to: type, + request_parameters: Mapping[str, Any], + ) -> Iterator[Tuple[str, AttributeValue]]: + if not isinstance(request_parameters, Mapping): + return + if cast_to is self._chat_completion_type: + yield from self._get_attributes_from_chat_completion_create_param( + request_parameters + ) + elif cast_to is self._responses_type: + yield from _ResponsesApiAttributes._get_attributes_from_response_create_param_base( + cast("ResponseCreateParamsBase", request_parameters) + ) + elif cast_to is self._create_embedding_response_type: + yield from _get_attributes_from_embedding_create_param(request_parameters) + elif cast_to is self._completion_type: + yield from _get_attributes_from_completion_create_param(request_parameters) + else: + try: + yield SpanAttributes.GEN_AI_REQUEST_PARAMETERS, safe_json_dumps( + request_parameters + ) + except Exception: + logger.exception("Failed to serialize request options") + + def _get_attributes_from_chat_completion_create_param( + self, + params: Mapping[str, Any], + ) -> Iterator[Tuple[str, AttributeValue]]: + if not isinstance(params, Mapping): + return + invocation_params = dict(params) + invocation_params.pop("messages", None) + invocation_params.pop("functions", None) + if isinstance((tools := invocation_params.pop("tools", None)), Iterable): + for i, tool in enumerate(tools): + yield f"{SpanAttributes.GEN_AI_TOOL_DEFINITIONS}.{i}.tool.json_schema", safe_json_dumps(tool) + yield SpanAttributes.GEN_AI_REQUEST_PARAMETERS, safe_json_dumps( + invocation_params + ) + + if (input_messages := params.get("messages")) and isinstance( + input_messages, Iterable + ): + for index, input_message in list(enumerate(input_messages)): + for key, value in self._get_attributes_from_message_param( + input_message + ): + yield f"{SpanAttributes.GEN_AI_INPUT_MESSAGES}.{index}.{key}", value + + def _get_attributes_from_message_param( + self, + message: Mapping[str, Any], + ) -> Iterator[Tuple[str, AttributeValue]]: + if not hasattr(message, "get"): + return + if role := message.get("role"): + yield ( + MessageAttributes.MESSAGE_ROLE, + role.value if isinstance(role, Enum) else role, + ) + if tool_call_id := message.get("tool_call_id"): + yield MessageAttributes.MESSAGE_TOOL_CALL_ID, tool_call_id + if content := message.get("content"): + if isinstance(content, str): + yield MessageAttributes.MESSAGE_CONTENT, content + elif is_iterable_of(content, dict): + for index, c in list(enumerate(content)): + for key, value in self._get_attributes_from_message_content(c): + yield f"{MessageAttributes.MESSAGE_CONTENTS}.{index}.{key}", value + elif isinstance(content, List): + try: + value = safe_json_dumps(content) + except Exception: + logger.exception("Failed to serialize message content") + yield MessageAttributes.MESSAGE_CONTENT, value + + if name := message.get("name"): + yield MessageAttributes.MESSAGE_NAME, name + if (function_call := message.get("function_call")) and hasattr( + function_call, "get" + ): + if function_name := function_call.get("name"): + yield MessageAttributes.MESSAGE_FUNCTION_CALL_NAME, function_name + if function_arguments := function_call.get("arguments"): + yield ( + MessageAttributes.MESSAGE_FUNCTION_CALL_ARGUMENTS_JSON, + function_arguments, + ) + if ( + _get_openai_version() >= (1, 1, 0) + and (tool_calls := message.get("tool_calls"),) + and isinstance(tool_calls, Iterable) + ): + for index, tool_call in enumerate(tool_calls): + if not hasattr(tool_call, "get"): + continue + if (tool_call_id := tool_call.get("id")) is not None: + yield ( + f"{MessageAttributes.MESSAGE_TOOL_CALLS}.{index}." + f"{ToolCallAttributes.TOOL_CALL_ID}", + tool_call_id, + ) + if (function := tool_call.get("function")) and hasattr(function, "get"): + if name := function.get("name"): + yield ( + f"{MessageAttributes.MESSAGE_TOOL_CALLS}.{index}." + f"{ToolCallAttributes.TOOL_CALL_FUNCTION_NAME}", + name, + ) + if arguments := function.get("arguments"): + yield ( + f"{MessageAttributes.MESSAGE_TOOL_CALLS}.{index}." + f"{ToolCallAttributes.TOOL_CALL_FUNCTION_ARGUMENTS_JSON}", + arguments, + ) + + def _get_attributes_from_message_content( + self, + content: Mapping[str, Any], + ) -> Iterator[Tuple[str, AttributeValue]]: + content = dict(content) + type_ = content.pop("type") + if type_ == "text": + yield f"{MessageContentAttributes.MESSAGE_CONTENT_TYPE}", "text" + if text := content.pop("text"): + yield f"{MessageContentAttributes.MESSAGE_CONTENT_TEXT}", text + elif type_ == "image_url": + if image := content.pop("image_url"): + yield f"{MessageContentAttributes.MESSAGE_CONTENT_TYPE}", "image" + yield f"{MessageContentAttributes.MESSAGE_CONTENT_IMAGE}", image.get( + "url", "" + ) + elif type_ == "input_audio": + if audio := content.pop("input_audio"): + yield f"{MessageContentAttributes.MESSAGE_CONTENT_TYPE}", "audio" + yield f"{MessageContentAttributes.MESSAGE_CONTENT_AUDIO}", audio.get( + "data", "" + ) + + def _get_attributes_from_image( + self, + image: Mapping[str, Any], + ) -> Iterator[Tuple[str, AttributeValue]]: + image = dict(image) + if url := image.pop("url"): + yield f"{ImageAttributes.IMAGE_URL}", url + + +def _get_attributes_from_completion_create_param( + params: Mapping[str, Any], +) -> Iterator[Tuple[str, AttributeValue]]: + if not isinstance(params, Mapping): + return + invocation_params = dict(params) + invocation_params.pop("prompt", None) + yield SpanAttributes.GEN_AI_REQUEST_PARAMETERS, safe_json_dumps(invocation_params) + + +def _get_attributes_from_embedding_create_param( + params: Mapping[str, Any], +) -> Iterator[Tuple[str, AttributeValue]]: + if not isinstance(params, Mapping): + return + invocation_params = dict(params) + invocation_params.pop("input", None) + yield SpanAttributes.GEN_AI_REQUEST_PARAMETERS, safe_json_dumps(invocation_params) + + +T = TypeVar("T", bound=type) + + +def is_iterable_of(lst: Iterable[object], tp: T) -> bool: + return isinstance(lst, Iterable) and all(isinstance(x, tp) for x in lst) + + +def is_base64_url(url: str) -> bool: + return url.startswith("data:image/") and "base64" in url diff --git a/python/frameworks/azure_openai/traceai_azure_openai/_response_accumulator.py b/python/frameworks/azure_openai/traceai_azure_openai/_response_accumulator.py new file mode 100644 index 00000000..47488e66 --- /dev/null +++ b/python/frameworks/azure_openai/traceai_azure_openai/_response_accumulator.py @@ -0,0 +1,374 @@ +import warnings +from collections import defaultdict +from copy import deepcopy +from typing import ( + TYPE_CHECKING, + Any, + Callable, + DefaultDict, + Dict, + Iterable, + Iterator, + List, + Mapping, + Optional, + Protocol, + Tuple, + Type, +) + +from fi_instrumentation import safe_json_dumps +from fi_instrumentation.fi_types import FiMimeTypeValues, SpanAttributes +from opentelemetry.util.types import AttributeValue +from traceai_azure_openai._utils import _as_output_attributes, _to_dict, _ValueAndType, _io_value_and_type + +if TYPE_CHECKING: + from openai.types import Completion + from openai.types.chat import ChatCompletion + from openai.types.responses.response import Response + from openai.types.responses.response_completed_event import ResponseCompletedEvent + +__all__ = ( + "_CompletionAccumulator", + "_ChatCompletionAccumulator", + "_ResponsesAccumulator", +) + + +class _CanGetAttributesFromResponse(Protocol): + def get_attributes_from_response( + self, + response: Any, + request_parameters: Mapping[str, Any], + ) -> Iterator[Tuple[str, AttributeValue]]: ... + + +class _ResponsesAccumulator: + __slots__ = ( + "_is_null", + "_values", + "_cached_result", + "_request_parameters", + "_response_attributes_extractor", + "_chat_completion_type", + ) + + def __init__( + self, + request_parameters: Mapping[str, Any], + chat_completion_type: Type["Response"], + response_attributes_extractor: Optional[_CanGetAttributesFromResponse] = None, + ) -> None: + self._chat_completion_type = chat_completion_type + self._request_parameters = request_parameters + self._response_attributes_extractor = response_attributes_extractor + self._is_null = True + self._cached_result: Optional[Dict[str, Any]] = None + self._values: Optional["ResponseCompletedEvent"] = None + + def process_chunk(self, chunk: Any) -> None: + if type(chunk).__name__ == "ResponseCompletedEvent": + self._is_null = False + self._cached_result = None + self._values = chunk + + def _result(self) -> Any: + if self._is_null: + return None + if self._values: + return self._values.response + + def get_attributes(self) -> Iterator[Tuple[str, AttributeValue]]: + if not (result := self._result()): + return + # Extract just the assistant's response content, not the full API response + output_content = None + if hasattr(result, "output") and result.output: + # For Responses API, extract output items + output_items = [] + for item in result.output: + if hasattr(item, "content") and item.content: + for content in item.content: + if hasattr(content, "text"): + output_items.append(content.text) + if output_items: + output_content = " ".join(output_items) + + if output_content: + yield SpanAttributes.OUTPUT_VALUE, output_content + else: + # Fallback to full response if we can't extract content + yield from _as_output_attributes( + _io_value_and_type(result), + ) + + def get_extra_attributes(self) -> Iterator[Tuple[str, AttributeValue]]: + if not (result := self._result()): + return + if self._response_attributes_extractor: + yield from self._response_attributes_extractor.get_attributes_from_response( + response=result, + request_parameters=self._request_parameters, + ) + + +class _ChatCompletionAccumulator: + __slots__ = ( + "_is_null", + "_values", + "_cached_result", + "_request_parameters", + "_response_attributes_extractor", + "_chat_completion_type", + "_content", + "_raw_data", + ) + + def __init__( + self, + request_parameters: Mapping[str, Any], + chat_completion_type: Type["ChatCompletion"], + response_attributes_extractor: Optional[_CanGetAttributesFromResponse] = None, + ) -> None: + self._chat_completion_type = chat_completion_type + self._request_parameters = request_parameters + self._response_attributes_extractor = response_attributes_extractor + self._is_null = True + self._content: List[str] = [] + self._raw_data: List[Dict[str, Any]] = [] + self._cached_result: Optional[Dict[str, Any]] = None + self._values = _ValuesAccumulator( + choices=_IndexedAccumulator( + lambda: _ValuesAccumulator( + message=_ValuesAccumulator( + content=_StringAccumulator(), + function_call=_ValuesAccumulator( + arguments=_StringAccumulator() + ), + tool_calls=_IndexedAccumulator( + lambda: _ValuesAccumulator( + function=_ValuesAccumulator( + arguments=_StringAccumulator() + ), + ) + ), + ), + ), + ), + ) + + def process_chunk(self, chunk: Any) -> None: + raw_chunk = _to_dict(chunk) + self._raw_data.append(raw_chunk) + self._is_null = False + + if "choices" in raw_chunk: + choices = raw_chunk["choices"] + if choices and isinstance(choices, list): + delta = choices[0].get("delta", {}) + content = delta.get("content") + if content is not None: + self._content.append(content) + + def _result(self) -> Optional[Dict[str, Any]]: + if self._is_null: + return None + if not self._cached_result: + self._cached_result = dict(self._values) + return self._cached_result + + def get_attributes(self) -> Iterator[Tuple[str, AttributeValue]]: + # Combine the accumulated content + output_value = "".join(self._content) + + if self._raw_data: + last_chunk = self._raw_data[-1] + if "usage" in last_chunk: + usage = last_chunk["usage"] + total_tokens = usage.get("total_tokens") + prompt_tokens = usage.get("prompt_tokens") + completion_tokens = usage.get("completion_tokens") + + if total_tokens is not None: + yield SpanAttributes.GEN_AI_USAGE_TOTAL_TOKENS, total_tokens + if prompt_tokens is not None: + yield SpanAttributes.GEN_AI_USAGE_INPUT_TOKENS, prompt_tokens + if completion_tokens is not None: + yield SpanAttributes.GEN_AI_USAGE_OUTPUT_TOKENS, completion_tokens + + # Only yield the assistant's response content, not the raw API response + yield SpanAttributes.OUTPUT_VALUE, output_value + + def get_extra_attributes(self) -> Iterator[Tuple[str, AttributeValue]]: + if not (result := self._result()): + return + if self._response_attributes_extractor: + yield from self._response_attributes_extractor.get_attributes_from_response( + self._chat_completion_type.construct(**result), + self._request_parameters, + ) + + +class _CompletionAccumulator: + __slots__ = ( + "_is_null", + "_values", + "_cached_result", + "_request_parameters", + "_response_attributes_extractor", + "_completion_type", + ) + + def __init__( + self, + request_parameters: Mapping[str, Any], + completion_type: Type["Completion"], + response_attributes_extractor: Optional[_CanGetAttributesFromResponse] = None, + ) -> None: + self._completion_type = completion_type + self._request_parameters = request_parameters + self._response_attributes_extractor = response_attributes_extractor + self._is_null = True + self._cached_result: Optional[Dict[str, Any]] = None + self._values = _ValuesAccumulator( + choices=_IndexedAccumulator( + lambda: _ValuesAccumulator(text=_StringAccumulator()) + ), + ) + + def process_chunk(self, chunk: "Completion") -> None: + self._is_null = False + self._cached_result = None + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + # `warnings=False` in `model_dump()` is only supported in Pydantic v2 + values = chunk.model_dump(exclude_unset=True) + self._values += values + + def _result(self) -> Optional[Dict[str, Any]]: + if self._is_null: + return None + if not self._cached_result: + self._cached_result = dict(self._values) + return self._cached_result + + def get_attributes(self) -> Iterator[Tuple[str, AttributeValue]]: + if not (result := self._result()): + return + # Extract just the text content for legacy completions API + output_content = None + choices = result.get("choices", []) + if choices: + texts = [c.get("text", "") for c in choices if c.get("text")] + if texts: + output_content = "".join(texts) + + if output_content: + yield SpanAttributes.OUTPUT_VALUE, output_content + else: + # Fallback to full response + json_string = safe_json_dumps(result) + yield from _as_output_attributes( + _ValueAndType(json_string, FiMimeTypeValues.JSON), + ) + + def get_extra_attributes(self) -> Iterator[Tuple[str, AttributeValue]]: + if not (result := self._result()): + return + if self._response_attributes_extractor: + yield from self._response_attributes_extractor.get_attributes_from_response( + self._completion_type.construct(**result), + self._request_parameters, + ) + + +class _ValuesAccumulator: + __slots__ = ("_values",) + + def __init__(self, **values: Any) -> None: + self._values: Dict[str, Any] = values + + def __iter__(self) -> Iterator[Tuple[str, Any]]: + for key, value in self._values.items(): + if value is None: + continue + if isinstance(value, _ValuesAccumulator): + if dict_value := dict(value): + yield key, dict_value + elif isinstance(value, _IndexedAccumulator): + if list_value := list(value): + yield key, list_value + elif isinstance(value, _StringAccumulator): + if str_value := str(value): + yield key, str_value + else: + yield key, value + + def __iadd__(self, values: Optional[Mapping[str, Any]]) -> "_ValuesAccumulator": + if not values: + return self + for key in self._values.keys(): + if (value := values.get(key)) is None: + continue + self_value = self._values[key] + if isinstance(self_value, _ValuesAccumulator): + if isinstance(value, Mapping): + self_value += value + elif isinstance(self_value, _StringAccumulator): + if isinstance(value, str): + self_value += value + elif isinstance(self_value, _IndexedAccumulator): + if isinstance(value, Iterable): + for v in value: + self_value += v + else: + self_value += value + elif isinstance(self_value, List) and isinstance(value, Iterable): + self_value.extend(value) + else: + self._values[key] = value # replacement + for key in values.keys(): + if key in self._values or (value := values[key]) is None: + continue + value = deepcopy(value) + if isinstance(value, Mapping): + value = _ValuesAccumulator(**value) + self._values[key] = value # new entry + return self + + +class _StringAccumulator: + __slots__ = ("_fragments",) + + def __init__(self) -> None: + self._fragments: List[str] = [] + + def __str__(self) -> str: + return "".join(self._fragments) + + def __iadd__(self, value: Optional[str]) -> "_StringAccumulator": + if not value: + return self + self._fragments.append(value) + return self + + +class _IndexedAccumulator: + __slots__ = ("_indexed",) + + def __init__(self, factory: Callable[[], _ValuesAccumulator]) -> None: + self._indexed: DefaultDict[int, _ValuesAccumulator] = defaultdict(factory) + + def __iter__(self) -> Iterator[Dict[str, Any]]: + for _, values in sorted(self._indexed.items()): + yield dict(values) + + def __iadd__(self, values: Optional[Mapping[str, Any]]) -> "_IndexedAccumulator": + if ( + not values + or not hasattr(values, "get") + or (index := values.get("index")) is None + ): + return self + self._indexed[index] += values + return self diff --git a/python/frameworks/azure_openai/traceai_azure_openai/_response_attributes_extractor.py b/python/frameworks/azure_openai/traceai_azure_openai/_response_attributes_extractor.py new file mode 100644 index 00000000..eebf93b1 --- /dev/null +++ b/python/frameworks/azure_openai/traceai_azure_openai/_response_attributes_extractor.py @@ -0,0 +1,282 @@ +from __future__ import annotations + +import base64 +import logging +from importlib import import_module + +from types import ModuleType +from typing import ( + TYPE_CHECKING, + Any, + Iterable, + Iterator, + Mapping, + Optional, + Sequence, + Tuple, + Type, +) + +from fi_instrumentation.fi_types import ( + EmbeddingAttributes, + MessageAttributes, + MessageContentAttributes, + SpanAttributes, + ToolCallAttributes, +) +from opentelemetry.util.types import AttributeValue +from traceai_azure_openai._attributes._responses_api import _ResponsesApiAttributes +from traceai_azure_openai._utils import _get_openai_version, _get_texts + +if TYPE_CHECKING: + from openai.types import Completion, CreateEmbeddingResponse + from openai.types.chat import ChatCompletion + from openai.types.images_response import ImagesResponse + from openai.types.responses.response import Response + +__all__ = ("_ResponseAttributesExtractor",) + +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + + +try: + _NUMPY: Optional[ModuleType] = import_module("numpy") +except ImportError: + _NUMPY = None + + +class _ResponseAttributesExtractor: + __slots__ = ( + "_openai", + "_chat_completion_type", + "_completion_type", + "_create_embedding_response_type", + "_images_response_type", + "_responses_type", + ) + + def __init__(self, openai: ModuleType) -> None: + self._openai = openai + self._chat_completion_type: Type["ChatCompletion"] = ( + openai.types.chat.ChatCompletion + ) + self._completion_type: Type["Completion"] = openai.types.Completion + self._responses_type: Type["Response"] = openai.types.responses.response.Response + self._create_embedding_response_type: Type["CreateEmbeddingResponse"] = ( + openai.types.CreateEmbeddingResponse + ) + self._images_response_type: Type["ImagesResponse"] = ( + openai.types.images_response.ImagesResponse + ) + + def get_attributes_from_response( + self, + response: Any, + request_parameters: Mapping[str, Any], + ) -> Iterator[Tuple[str, AttributeValue]]: + if isinstance(response, self._chat_completion_type): + yield from self._get_attributes_from_chat_completion( + completion=response, + request_parameters=request_parameters, + ) + elif isinstance(response, self._responses_type): + yield from self._get_attributes_from_responses_response( + response=response, + request_parameters=request_parameters, + ) + elif isinstance(response, self._create_embedding_response_type): + yield from self._get_attributes_from_create_embedding_response( + response=response, + request_parameters=request_parameters, + ) + elif isinstance(response, self._completion_type): + yield from self._get_attributes_from_completion( + completion=response, + request_parameters=request_parameters, + ) + elif isinstance(response, self._images_response_type): + yield from self._get_attributes_from_image_generation( + data=response.data, + request_parameters=request_parameters, + ) + else: + yield from () + + def _get_attributes_from_responses_response( + self, + response: Response, + request_parameters: Mapping[str, Any], + ) -> Iterator[Tuple[str, AttributeValue]]: + yield from _ResponsesApiAttributes._get_attributes_from_response(response) + + def _get_attributes_from_chat_completion( + self, + completion: "ChatCompletion", + request_parameters: Mapping[str, Any], + ) -> Iterator[Tuple[str, AttributeValue]]: + if model := getattr(completion, "model", None): + yield SpanAttributes.GEN_AI_REQUEST_MODEL, model + if usage := getattr(completion, "usage", None): + yield from self._get_attributes_from_completion_usage(usage) + + if (choices := getattr(completion, "choices", None)) and isinstance( + choices, Iterable + ): + for choice in choices: + if (index := getattr(choice, "index", None)) is None: + continue + if message := getattr(choice, "message", None): + for key, value in self._get_attributes_from_chat_completion_message( + message + ): + yield f"{SpanAttributes.GEN_AI_OUTPUT_MESSAGES}.{index}.{key}", value + + def _get_attributes_from_completion( + self, + completion: "Completion", + request_parameters: Mapping[str, Any], + ) -> Iterator[Tuple[str, AttributeValue]]: + if model := getattr(completion, "model", None): + yield SpanAttributes.GEN_AI_REQUEST_MODEL, model + if usage := getattr(completion, "usage", None): + yield from self._get_attributes_from_completion_usage(usage) + if model_prompt := request_parameters.get("prompt"): + if prompts := list(_get_texts(model_prompt, model)): + yield SpanAttributes.GEN_AI_PROMPTS, prompts + + def _get_attributes_from_create_embedding_response( + self, + response: "CreateEmbeddingResponse", + request_parameters: Mapping[str, Any], + ) -> Iterator[Tuple[str, AttributeValue]]: + if usage := getattr(response, "usage", None): + yield from self._get_attributes_from_embedding_usage(usage) + if model := getattr(response, "model"): + yield f"{SpanAttributes.EMBEDDING_MODEL_NAME}", model + if (data := getattr(response, "data", None)) and isinstance(data, Iterable): + for embedding in data: + if (index := getattr(embedding, "index", None)) is None: + continue + for key, value in self._get_attributes_from_embedding(embedding): + yield f"{SpanAttributes.EMBEDDING_EMBEDDINGS}.{index}.{key}", value + + embedding_input = request_parameters.get("input") + for index, text in enumerate(_get_texts(embedding_input, model)): + yield ( + ( + f"{SpanAttributes.EMBEDDING_EMBEDDINGS}.{index}." + f"{EmbeddingAttributes.EMBEDDING_TEXT}" + ), + text, + ) + + def _get_attributes_from_embedding( + self, + embedding: object, + ) -> Iterator[Tuple[str, AttributeValue]]: + if not (_vector := getattr(embedding, "embedding", None)): + return + if ( + isinstance(_vector, Sequence) + and len(_vector) + and isinstance(_vector[0], float) + ): + vector = list(_vector) + yield f"{EmbeddingAttributes.EMBEDDING_VECTOR}", vector + elif isinstance(_vector, str) and _vector and _NUMPY: + try: + vector = _NUMPY.frombuffer( + base64.b64decode(_vector), dtype="float32" + ).tolist() + except Exception: + logger.exception("Failed to decode embedding") + pass + else: + yield f"{EmbeddingAttributes.EMBEDDING_VECTOR}", vector + + def _get_attributes_from_chat_completion_message( + self, + message: object, + ) -> Iterator[Tuple[str, AttributeValue]]: + if role := getattr(message, "role", None): + yield MessageAttributes.MESSAGE_ROLE, role + if content := getattr(message, "content", None): + yield MessageAttributes.MESSAGE_CONTENT, content + if audio := getattr(message, "audio", None): + # Handle audio attributes + yield f"{MessageAttributes.MESSAGE_CONTENT}.0.{MessageContentAttributes.MESSAGE_CONTENT_TYPE}", "audio" + if audio_data := getattr(audio, "data", None): + yield f"{MessageAttributes.MESSAGE_CONTENT}.0.{MessageContentAttributes.MESSAGE_CONTENT_AUDIO}", audio_data + if transcript := getattr(audio, "transcript", None): + yield f"{MessageAttributes.MESSAGE_CONTENT}.0.{MessageContentAttributes.MESSAGE_AUDIO_TRANSCRIPT}", transcript + if function_call := getattr(message, "function_call", None): + if name := getattr(function_call, "name", None): + yield MessageAttributes.MESSAGE_FUNCTION_CALL_NAME, name + if arguments := getattr(function_call, "arguments", None): + yield MessageAttributes.MESSAGE_FUNCTION_CALL_ARGUMENTS_JSON, arguments + if ( + _get_openai_version() >= (1, 1, 0) + and (tool_calls := getattr(message, "tool_calls", None)) + and isinstance(tool_calls, Iterable) + ): + for index, tool_call in enumerate(tool_calls): + if (tool_call_id := getattr(tool_call, "id", None)) is not None: + yield ( + f"{MessageAttributes.MESSAGE_TOOL_CALLS}.{index}." + f"{ToolCallAttributes.TOOL_CALL_ID}", + tool_call_id, + ) + if function := getattr(tool_call, "function", None): + if name := getattr(function, "name", None): + yield ( + ( + f"{MessageAttributes.MESSAGE_TOOL_CALLS}.{index}." + f"{ToolCallAttributes.TOOL_CALL_FUNCTION_NAME}" + ), + name, + ) + if arguments := getattr(function, "arguments", None): + yield ( + f"{MessageAttributes.MESSAGE_TOOL_CALLS}.{index}." + f"{ToolCallAttributes.TOOL_CALL_FUNCTION_ARGUMENTS_JSON}", + arguments, + ) + + def _get_attributes_from_completion_usage( + self, + usage: object, + ) -> Iterator[Tuple[str, AttributeValue]]: + if (total_tokens := getattr(usage, "total_tokens", None)) is not None: + yield SpanAttributes.GEN_AI_USAGE_TOTAL_TOKENS, total_tokens + if (prompt_tokens := getattr(usage, "prompt_tokens", None)) is not None: + yield SpanAttributes.GEN_AI_USAGE_INPUT_TOKENS, prompt_tokens + if (completion_tokens := getattr(usage, "completion_tokens", None)) is not None: + yield SpanAttributes.GEN_AI_USAGE_OUTPUT_TOKENS, completion_tokens + + def _get_attributes_from_embedding_usage( + self, + usage: object, + ) -> Iterator[Tuple[str, AttributeValue]]: + if (total_tokens := getattr(usage, "total_tokens", None)) is not None: + yield SpanAttributes.GEN_AI_USAGE_TOTAL_TOKENS, total_tokens + if (prompt_tokens := getattr(usage, "prompt_tokens", None)) is not None: + yield SpanAttributes.GEN_AI_USAGE_INPUT_TOKENS, prompt_tokens + + def _get_attributes_from_image_generation( + self, + data: Iterable[object], + request_parameters: Mapping[str, Any], + ) -> Iterator[Tuple[str, AttributeValue]]: + + if model := request_parameters.get("model"): + yield SpanAttributes.GEN_AI_REQUEST_MODEL, model + + for index, obj in enumerate(data): + yield f"{SpanAttributes.GEN_AI_OUTPUT_MESSAGES}.{index}.{MessageAttributes.MESSAGE_ROLE}", "assistant" + if image := getattr(obj, "url", None): + yield f"{SpanAttributes.GEN_AI_OUTPUT_MESSAGES}.{index}.{MessageAttributes.MESSAGE_CONTENT}.0.{MessageContentAttributes.MESSAGE_CONTENT_TYPE}", "image" + yield f"{SpanAttributes.GEN_AI_OUTPUT_MESSAGES}.{index}.{MessageAttributes.MESSAGE_CONTENT}.0.{MessageContentAttributes.MESSAGE_CONTENT_IMAGE}", image + elif b64_json := getattr(obj, "b64_json", None): + yield f"{SpanAttributes.GEN_AI_OUTPUT_MESSAGES}.{index}.{MessageAttributes.MESSAGE_CONTENT}.0.{MessageContentAttributes.MESSAGE_CONTENT_TYPE}", "image" + yield f"{SpanAttributes.GEN_AI_OUTPUT_MESSAGES}.{index}.{MessageAttributes.MESSAGE_CONTENT}.0.{MessageContentAttributes.MESSAGE_CONTENT_IMAGE}", b64_json diff --git a/python/frameworks/azure_openai/traceai_azure_openai/_span_io_handler.py b/python/frameworks/azure_openai/traceai_azure_openai/_span_io_handler.py new file mode 100644 index 00000000..8d2cbcd3 --- /dev/null +++ b/python/frameworks/azure_openai/traceai_azure_openai/_span_io_handler.py @@ -0,0 +1,167 @@ +import json +import logging +from typing import Any + +from fi_instrumentation.fi_types import SpanAttributes +from traceai_azure_openai._with_span import _WithSpan + +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + + +def _process_input_data(input_data: Any, span: _WithSpan) -> None: + """Process the input data and add relevant attributes to the span. + + Args: + input_data: Input data (messages or prompt) + span: The span to add attributes to + """ + if isinstance(input_data, list): + input_content = [] + input_images = [] + eval_input = [] + for msg in input_data: + if isinstance(msg, dict): + msg_content = msg.get("content", "") + if isinstance(msg_content, list): + filtered_content = [] + for item in msg_content: + if isinstance(item, dict): + if item.get("type") == "text": + filtered_content.append(item) + eval_input.append(item.get("text", "")) + elif item.get("type") == "image_url": + url_data = item.get("image_url", {}) + url = url_data.get("url") + if url: + input_images.append(url) + image_index = len(input_images) - 1 + eval_input.append( + "{{" + + f"{SpanAttributes.INPUT_IMAGES}.{image_index}" + + "}}" + ) + if filtered_content: + msg_dict = msg.copy() + msg_dict["content"] = filtered_content + input_content.append(msg_dict) + else: + continue + else: + input_content.append(msg) + eval_input.append(msg_content) + if input_content: + input_value = json.dumps(input_content, ensure_ascii=False) + span.set_attribute(SpanAttributes.INPUT_VALUE, input_value) + if input_images: + images_value = json.dumps(input_images, ensure_ascii=False) + span.set_attribute(SpanAttributes.INPUT_IMAGES, images_value) + if eval_input: + eval_input_str = " \n ".join(map(str, eval_input)) + span.set_attribute(SpanAttributes.INPUT_VALUE, eval_input_str) + if eval_input and len(eval_input) > 0: + span.set_attribute(SpanAttributes.INPUT_VALUE, eval_input[0]) + else: + try: + input_str = json.dumps(input_data, ensure_ascii=False).strip() + except (TypeError, ValueError): + input_str = str(input_data).strip() + span.set_attribute(SpanAttributes.INPUT_VALUE, input_str) + + +def add_io_to_span_attributes( + span: _WithSpan, input_data: Any, output_data: Any, is_streaming: bool = False +) -> None: + """Add input/output data to span attributes. + + Args: + span: The span to add attributes to + input_data: Input data (messages or prompt) + output_data: Output data from the response + is_streaming: Whether this is a streaming response + """ + try: + # Process input data + if input_data is not None: + _process_input_data(input_data, span) + + # Process output data if not streaming + if output_data is not None and not is_streaming: + # Handle APIResponse object + if hasattr(output_data, "model_dump"): + output_data = output_data.model_dump() + elif hasattr(output_data, "parse"): + output_data = output_data.parse() + + # Extract output content based on response type + output_content = None + if hasattr(output_data, "choices") and output_data.choices: + first_choice = output_data.choices[0] + if hasattr(first_choice, "message"): + message = first_choice.message + if hasattr(message, "tool_calls") and message.tool_calls: + # Handle tool calls + tool_calls = [ + f"Function: {tc.function.name}({tc.function.arguments})" + for tc in message.tool_calls + if tc.type == "function" + ] + output_content = " \n ".join(tool_calls) + elif getattr(message, "content", None): + output_content = message.content + elif hasattr(first_choice, "text"): + output_content = first_choice.text + elif hasattr(first_choice, "delta"): + output_content = getattr(first_choice.delta, "content", "") + else: + output_content = str(first_choice) + elif isinstance(output_data, dict) and "choices" in output_data: + first_choice = output_data["choices"][0] + message = first_choice.get("message", {}) + if "tool_calls" in message and message["tool_calls"]: + tool_calls = [ + f"Function: {tc['function']['name']}({tc['function']['arguments']})" + for tc in message["tool_calls"] + if tc.get("type") == "function" + ] + output_content = " \n ".join(tool_calls) + elif "content" in message: + output_content = message["content"] + else: + output_content = str(output_data) + + # Set output.value to just the assistant's response content + if output_content: + span.set_attribute(SpanAttributes.OUTPUT_VALUE, output_content) + + except Exception as e: + logger.exception(f"Error adding I/O to span attributes: {e}") + + +def add_stream_output_to_span(span: _WithSpan, chunk_data: Any) -> None: + """Add streaming output chunk to span attributes. + + Args: + span: The span to add attributes to + chunk_data: The streaming chunk data to process + """ + try: + output_content = None + + # Extract content from chunk + if hasattr(chunk_data, "choices") and chunk_data.choices: + first_choice = chunk_data.choices[0] + if hasattr(first_choice, "delta"): + output_content = getattr(first_choice.delta, "content", "") + elif hasattr(first_choice, "text"): + output_content = first_choice.text + else: + output_content = str(first_choice) + + # Only append non-empty content + if output_content and output_content.strip(): + existing_output = span.get_attribute(SpanAttributes.OUTPUT_VALUE) or "" + updated_output = (existing_output + output_content).strip() + span.set_attribute(SpanAttributes.OUTPUT_VALUE, updated_output) + except Exception as e: + logger.exception(f"Error adding streaming output to span attributes: {e}") diff --git a/python/frameworks/azure_openai/traceai_azure_openai/_stream.py b/python/frameworks/azure_openai/traceai_azure_openai/_stream.py new file mode 100644 index 00000000..2f3200e3 --- /dev/null +++ b/python/frameworks/azure_openai/traceai_azure_openai/_stream.py @@ -0,0 +1,179 @@ +import logging +from typing import Any, AsyncIterator, Generator, Iterator, Optional, Protocol, Tuple + +from openai.types.chat import ChatCompletionChunk +from opentelemetry import trace as trace_api +from opentelemetry.util.types import AttributeValue +from traceai_azure_openai._utils import _finish_tracing +from traceai_azure_openai._with_span import _WithSpan +from wrapt import ObjectProxy + +__all__ = ( + "_Stream", + "_ResponseAccumulator", +) + +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + + +class _ResponseAccumulator(Protocol): + def process_chunk(self, chunk: Any) -> None: ... + + def get_attributes(self) -> Iterator[Tuple[str, AttributeValue]]: ... + + def get_extra_attributes(self) -> Iterator[Tuple[str, AttributeValue]]: ... + + +class _Stream(ObjectProxy): # type: ignore + __slots__ = ( + "_self_with_span", + "_self_iteration_count", + "_self_is_finished", + "_self_response_accumulator", + ) + + def __init__( + self, + stream: Optional[Generator[ChatCompletionChunk, None, None]], + with_span: _WithSpan, + response_accumulator: Optional[_ResponseAccumulator] = None, + ) -> None: + super().__init__(stream) + self._self_with_span = with_span + self._self_iteration_count = 0 + self._self_is_finished = with_span.is_finished + self._self_response_accumulator = response_accumulator + + def __iter__(self) -> Iterator[Any]: + return self + + def __next__(self) -> Any: + try: + chunk = self.__wrapped__.__next__() + except StopIteration: + if not self._self_is_finished: + status = trace_api.Status(status_code=trace_api.StatusCode.OK) + self._finish_tracing(status=status) + raise + except Exception as exception: + if not self._self_is_finished: + status = trace_api.Status( + status_code=trace_api.StatusCode.ERROR, + description=f"{type(exception).__name__}: {exception}", + ) + self._self_with_span.record_exception(exception) + self._finish_tracing(status=status) + raise + else: + self._process_chunk(chunk) + return chunk + + def _process_chunk(self, chunk: Any) -> None: + if not self._self_iteration_count: + try: + self._self_with_span.add_event("First Token Stream Event") + except Exception: + logger.exception("Failed to add event to span") + self._self_iteration_count += 1 + if self._self_response_accumulator is not None: + try: + self._self_response_accumulator.process_chunk(chunk) + except Exception as e: + logger.exception("Failed to accumulate response") + + def _finish_tracing( + self, + status: Optional[trace_api.Status] = None, + ) -> None: + _finish_tracing( + status=status, + with_span=self._self_with_span, + has_attributes=self, + ) + self._self_is_finished = True + + def get_attributes(self) -> Iterator[Tuple[str, AttributeValue]]: + if self._self_response_accumulator is not None: + yield from self._self_response_accumulator.get_attributes() + + def get_extra_attributes(self) -> Iterator[Tuple[str, AttributeValue]]: + if self._self_response_accumulator is not None: + yield from self._self_response_accumulator.get_extra_attributes() + + +class _AsyncStream(ObjectProxy): + __slots__ = ( + "_self_with_span", + "_self_iteration_count", + "_self_is_finished", + "_self_response_accumulator", + ) + + def __init__( + self, + stream: AsyncIterator[Any], + with_span: _WithSpan, + response_accumulator: Optional[_ResponseAccumulator] = None, + ) -> None: + super().__init__(stream) + self._self_with_span = with_span + self._self_iteration_count = 0 + self._self_is_finished = with_span.is_finished + self._self_response_accumulator = response_accumulator + + def __aiter__(self): + return self + + async def __anext__(self): + try: + chunk = await self.__wrapped__.__anext__() + except StopAsyncIteration: + if not self._self_is_finished: + status = trace_api.Status(status_code=trace_api.StatusCode.OK) + self._finish_tracing(status=status) + raise + except Exception as exception: + if not self._self_is_finished: + status = trace_api.Status( + status_code=trace_api.StatusCode.ERROR, + description=f"{type(exception).__name__}: {exception}", + ) + self._self_with_span.record_exception(exception) + self._finish_tracing(status=status) + raise + else: + self._process_chunk(chunk) + return chunk + + def _process_chunk(self, chunk: Any) -> None: + if not self._self_iteration_count: + try: + self._self_with_span.add_event("First Token Stream Event") + except Exception: + logger.exception("Failed to add event to span") + self._self_iteration_count += 1 + if self._self_response_accumulator is not None: + try: + self._self_response_accumulator.process_chunk(chunk) + except Exception: + logger.exception("Failed to accumulate response") + + def _finish_tracing( + self, + status: Optional[trace_api.Status] = None, + ) -> None: + _finish_tracing( + status=status, + with_span=self._self_with_span, + has_attributes=self, + ) + self._self_is_finished = True + + def get_attributes(self) -> Iterator[Tuple[str, AttributeValue]]: + if self._self_response_accumulator is not None: + yield from self._self_response_accumulator.get_attributes() + + def get_extra_attributes(self) -> Iterator[Tuple[str, AttributeValue]]: + if self._self_response_accumulator is not None: + yield from self._self_response_accumulator.get_extra_attributes() diff --git a/python/frameworks/azure_openai/traceai_azure_openai/_utils.py b/python/frameworks/azure_openai/traceai_azure_openai/_utils.py new file mode 100644 index 00000000..deff94a6 --- /dev/null +++ b/python/frameworks/azure_openai/traceai_azure_openai/_utils.py @@ -0,0 +1,148 @@ +import logging +import warnings +from functools import lru_cache +from importlib.metadata import version +from typing import ( + Any, + Iterator, + List, + Mapping, + NamedTuple, + Optional, + Protocol, + Sequence, + Tuple, + Union, + cast, +) + +from fi_instrumentation import safe_json_dumps +from fi_instrumentation.fi_types import FiMimeTypeValues, SpanAttributes +from opentelemetry import trace as trace_api +from opentelemetry.util.types import Attributes, AttributeValue +from traceai_azure_openai._with_span import _WithSpan + +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + + +@lru_cache +def _get_openai_version() -> Tuple[int, int, int]: + return cast(Tuple[int, int, int], tuple(map(int, version("openai").split(".")[:3]))) + + +class _ValueAndType(NamedTuple): + value: str + type: FiMimeTypeValues + + +def _io_value_and_type(obj: Any) -> _ValueAndType: + if hasattr(obj, "model_dump_json") and callable(obj.model_dump_json): + try: + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + # `warnings=False` in `model_dump_json()` is only supported in Pydantic v2 + value = obj.model_dump_json(exclude_unset=True) + assert isinstance(value, str) + except Exception: + logger.exception("Failed to get model dump json") + else: + return _ValueAndType(value, FiMimeTypeValues.JSON) + if not isinstance(obj, str) and isinstance(obj, (Sequence, Mapping)): + try: + value = safe_json_dumps(obj) + except Exception: + logger.exception("Failed to dump json") + else: + return _ValueAndType(value, FiMimeTypeValues.JSON) + return _ValueAndType(str(obj), FiMimeTypeValues.TEXT) + + +def _as_input_attributes( + value_and_type: Optional[_ValueAndType], +) -> Iterator[Tuple[str, AttributeValue]]: + if not value_and_type: + return + yield SpanAttributes.INPUT_VALUE, value_and_type.value + # It's assumed to be TEXT by default, so we can skip to save one attribute. + if value_and_type.type is not FiMimeTypeValues.TEXT: + yield SpanAttributes.INPUT_MIME_TYPE, value_and_type.type.value + + +def _as_output_attributes( + value_and_type: Optional[_ValueAndType], +) -> Iterator[Tuple[str, AttributeValue]]: + if not value_and_type: + return + yield SpanAttributes.OUTPUT_VALUE, value_and_type.value + # It's assumed to be TEXT by default, so we can skip to save one attribute. + if value_and_type.type is not FiMimeTypeValues.TEXT: + yield SpanAttributes.OUTPUT_MIME_TYPE, value_and_type.type.value + + +class _HasAttributes(Protocol): + def get_attributes(self) -> Iterator[Tuple[str, AttributeValue]]: ... + + def get_extra_attributes(self) -> Iterator[Tuple[str, AttributeValue]]: ... + + +def _finish_tracing( + with_span: _WithSpan, + has_attributes: _HasAttributes, + status: Optional[trace_api.Status] = None, +) -> None: + try: + attributes: Attributes = dict(has_attributes.get_attributes()) + except Exception: + logger.exception("Failed to get attributes") + attributes = None + try: + extra_attributes: Attributes = dict(has_attributes.get_extra_attributes()) + except Exception: + logger.exception("Failed to get extra attributes") + extra_attributes = None + try: + with_span.finish_tracing( + status=status, + attributes=attributes, + extra_attributes=extra_attributes, + ) + except Exception: + logger.exception("Failed to finish tracing") + + +def _get_texts( + model_input: Optional[Union[str, List[str], List[int], List[List[int]]]], + model: Optional[str], +) -> Iterator[str]: + if not model_input: + return + if isinstance(model_input, str): + text = model_input + yield text + return + if not isinstance(model_input, Sequence): + return + if any(not isinstance(item, str) for item in model_input): + return + for text in cast(List[str], model_input): + yield text + + +def _to_dict(value: Any) -> Any: + """ + Recursively converts objects to dictionaries, focusing on essential data. + """ + if value is None: + return None + if isinstance(value, (str, int, float, bool)): + return value + if isinstance(value, list): + return [_to_dict(item) for item in value] + if isinstance(value, dict): + return {k: _to_dict(v) for k, v in value.items()} + if hasattr(value, "to_dict") and callable(value.to_dict): + return _to_dict(value.to_dict()) + if hasattr(value, "__dict__"): + return _to_dict(vars(value)) + return str(value) diff --git a/python/frameworks/azure_openai/traceai_azure_openai/_with_span.py b/python/frameworks/azure_openai/traceai_azure_openai/_with_span.py new file mode 100644 index 00000000..b2bc6fb2 --- /dev/null +++ b/python/frameworks/azure_openai/traceai_azure_openai/_with_span.py @@ -0,0 +1,95 @@ +import logging +from typing import Any, Optional + +from opentelemetry import trace as trace_api +from opentelemetry.util.types import Attributes + +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + + +class _WithSpan: + __slots__ = ( + "_span", + "_context_attributes", + "_extra_attributes", + "_is_finished", + ) + + def __init__( + self, + span: trace_api.Span, + context_attributes: Attributes = None, + extra_attributes: Attributes = None, + ) -> None: + self._span = span + self._context_attributes = context_attributes + self._extra_attributes = extra_attributes + try: + self._is_finished = not self._span.is_recording() + except Exception: + logger.exception("Failed to check if span is recording") + self._is_finished = True + + @property + def is_finished(self) -> bool: + return self._is_finished + + def record_exception(self, exception: Exception) -> None: + if self._is_finished: + return + try: + self._span.record_exception(exception) + except Exception: + logger.exception("Failed to record exception on span") + + def add_event(self, name: str) -> None: + if self._is_finished: + return + try: + self._span.add_event(name) + except Exception: + logger.exception("Failed to add event to span") + + def finish_tracing( + self, + status: Optional[trace_api.Status] = None, + attributes: Attributes = None, + extra_attributes: Attributes = None, + ) -> None: + if self._is_finished: + return + for mapping in ( + attributes, + self._context_attributes, + self._extra_attributes, + extra_attributes, + ): + if not mapping: + continue + for key, value in mapping.items(): + if value is None: + continue + try: + self._span.set_attribute(key, value) + except Exception: + logger.exception("Failed to set attribute on span") + if status is not None: + try: + self._span.set_status(status=status) + except Exception: + logger.exception("Failed to set status code on span") + try: + self._span.end() + except Exception: + logger.exception("Failed to end span") + self._is_finished = True + + def set_attribute(self, key: str, value: Any) -> None: + """Set a single attribute on the span.""" + if self._is_finished: + return + try: + self._span.set_attribute(key, value) + except Exception: + logger.exception(f"Failed to set attribute {key} on span") diff --git a/python/frameworks/azure_openai/traceai_azure_openai/package.py b/python/frameworks/azure_openai/traceai_azure_openai/package.py new file mode 100644 index 00000000..f724a659 --- /dev/null +++ b/python/frameworks/azure_openai/traceai_azure_openai/package.py @@ -0,0 +1,2 @@ +_instruments = ("openai >= 1.69.0",) +_supports_metrics = False diff --git a/python/frameworks/azure_openai/traceai_azure_openai/py.typed b/python/frameworks/azure_openai/traceai_azure_openai/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/python/frameworks/azure_openai/traceai_azure_openai/version.py b/python/frameworks/azure_openai/traceai_azure_openai/version.py new file mode 100644 index 00000000..3dc1f76b --- /dev/null +++ b/python/frameworks/azure_openai/traceai_azure_openai/version.py @@ -0,0 +1 @@ +__version__ = "0.1.0" diff --git a/python/tests/test_framework_azure_openai.py b/python/tests/test_framework_azure_openai.py new file mode 100644 index 00000000..0c3cf44a --- /dev/null +++ b/python/tests/test_framework_azure_openai.py @@ -0,0 +1,300 @@ +""" +Azure OpenAI Framework Instrumentation Tests + +Tests for the Azure OpenAI framework instrumentation to verify it correctly instruments +Azure OpenAI client calls and generates appropriate spans and attributes. +""" + +import pytest +import os +from unittest.mock import patch, MagicMock + +# Import the instrumentation +from traceai_azure_openai import AzureOpenAIInstrumentor +import openai +from openai import AzureOpenAI, AsyncAzureOpenAI + +from fi_instrumentation.otel import register +from fi_instrumentation.fi_types import ( + EvalName, + EvalSpanKind, + EvalTag, + EvalTagType, + ProjectType, + ModelChoices, +) +from fi_instrumentation.instrumentation.context_attributes import using_attributes + + +class TestAzureOpenAIFramework: + """Test Azure OpenAI framework instrumentation.""" + + @pytest.fixture(autouse=True) + def setup_trace_provider(self): + """Set up trace provider for Azure OpenAI testing.""" + eval_tags = [ + EvalTag( + eval_name=EvalName.TOXICITY, + value=EvalSpanKind.LLM, + type=EvalTagType.OBSERVATION_SPAN, + model=ModelChoices.TURING_LARGE, + mapping={"input": "prompt"}, + custom_eval_name="azure_openai_test_eval" + ) + ] + + with patch('fi_instrumentation.otel.check_custom_eval_config_exists') as mock_check: + mock_check.return_value = False + self.trace_provider = register( + project_type=ProjectType.EXPERIMENT, + eval_tags=eval_tags, + project_name="azure_openai_framework_test", + project_version_name="v1.0", + verbose=False + ) + yield + + @pytest.fixture + def mock_azure_openai_requests(self): + """Mock Azure OpenAI HTTP requests to avoid real API calls.""" + with patch('httpx.Client.send') as mock_request: + # Set up mock response for chat completions + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.headers = {"content-type": "application/json"} + mock_response.text = '''{"id": "chatcmpl-azure123", "object": "chat.completion", "created": 1699999999, "model": "gpt-4o", "choices": [{"index": 0, "message": {"role": "assistant", "content": "This is a test response from Azure OpenAI."}, "finish_reason": "stop"}], "usage": {"prompt_tokens": 15, "completion_tokens": 12, "total_tokens": 27}}''' + mock_response.json.return_value = { + "id": "chatcmpl-azure123", + "object": "chat.completion", + "created": 1699999999, + "model": "gpt-4o", + "choices": [{ + "index": 0, + "message": { + "role": "assistant", + "content": "This is a test response from Azure OpenAI." + }, + "finish_reason": "stop" + }], + "usage": { + "prompt_tokens": 15, + "completion_tokens": 12, + "total_tokens": 27 + } + } + mock_request.return_value = mock_response + yield mock_request + + def test_azure_openai_import(self): + """Test that we can import the Azure OpenAI instrumentor.""" + assert AzureOpenAIInstrumentor is not None + + # Test basic instantiation + instrumentor = AzureOpenAIInstrumentor() + assert instrumentor is not None + + def test_azure_openai_basic_instrumentation(self, mock_azure_openai_requests): + """Test basic Azure OpenAI chat completion instrumentation.""" + instrumentor = AzureOpenAIInstrumentor() + instrumentor.instrument(tracer_provider=self.trace_provider) + + try: + # Create Azure client + client = AzureOpenAI( + azure_endpoint="https://test-resource.openai.azure.com", + api_key="test-key", + api_version="2024-02-01", + ) + + with using_attributes( + session_id="azure-test-session-123", + user_id="azure-test-user-456", + metadata={"test_type": "instrumentation", "framework": "azure_openai"}, + tags=["azure-openai", "test", "gpt-4o"] + ): + response = client.chat.completions.create( + model="gpt-4o", + messages=[ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "Write a short poem about cloud computing."} + ], + max_tokens=50, + temperature=0.7 + ) + + # Verify the HTTP request was made + assert mock_azure_openai_requests.called + + # Parse the response to get the actual ChatCompletion object + parsed_response = response.parse() + + # Verify response structure (parsed from mock JSON) + assert parsed_response.choices[0].message.content == "This is a test response from Azure OpenAI." + assert parsed_response.model == "gpt-4o" + assert parsed_response.usage.total_tokens == 27 + + finally: + instrumentor.uninstrument() + + def test_azure_openai_function_calling(self, mock_azure_openai_requests): + """Test Azure OpenAI instrumentation with function calling.""" + mock_azure_openai_requests.return_value.text = '''{"id": "chatcmpl-azurefunc123", "object": "chat.completion", "created": 1699999999, "model": "gpt-4o", "choices": [{"index": 0, "message": {"role": "assistant", "content": null, "tool_calls": [{"id": "call_azure_123", "type": "function", "function": {"name": "get_weather", "arguments": "{\\"location\\": \\"Seattle\\"}"}}]}, "finish_reason": "tool_calls"}], "usage": {"prompt_tokens": 20, "completion_tokens": 15, "total_tokens": 35}}''' + mock_azure_openai_requests.return_value.json.return_value = { + "id": "chatcmpl-azurefunc123", + "object": "chat.completion", + "created": 1699999999, + "model": "gpt-4o", + "choices": [{ + "index": 0, + "message": { + "role": "assistant", + "content": None, + "tool_calls": [{ + "id": "call_azure_123", + "type": "function", + "function": { + "name": "get_weather", + "arguments": '{"location": "Seattle"}' + } + }] + }, + "finish_reason": "tool_calls" + }], + "usage": { + "prompt_tokens": 20, + "completion_tokens": 15, + "total_tokens": 35 + } + } + + instrumentor = AzureOpenAIInstrumentor() + instrumentor.instrument(tracer_provider=self.trace_provider) + + try: + client = AzureOpenAI( + azure_endpoint="https://test-resource.openai.azure.com", + api_key="test-key", + api_version="2024-02-01", + ) + + tools = [{ + "type": "function", + "function": { + "name": "get_weather", + "description": "Get current weather for a location", + "parameters": { + "type": "object", + "properties": { + "location": {"type": "string", "description": "City name"} + }, + "required": ["location"] + } + } + }] + + with using_attributes(session_id="azure-function-test-session"): + response = client.chat.completions.create( + model="gpt-4o", + messages=[{"role": "user", "content": "What's the weather in Seattle?"}], + tools=tools, + max_tokens=50 + ) + + parsed_response = response.parse() + + # Verify function calling was captured + assert parsed_response.choices[0].message.tool_calls[0].function.name == "get_weather" + assert mock_azure_openai_requests.called + + finally: + instrumentor.uninstrument() + + @pytest.mark.asyncio + async def test_azure_openai_async_instrumentation(self, mock_azure_openai_requests): + """Test Azure OpenAI async instrumentation.""" + instrumentor = AzureOpenAIInstrumentor() + instrumentor.instrument(tracer_provider=self.trace_provider) + + try: + client = AsyncAzureOpenAI( + azure_endpoint="https://test-resource.openai.azure.com", + api_key="test-key", + api_version="2024-02-01", + ) + + # Test that the async client is properly instrumented + assert hasattr(client, 'request') + assert type(client.request).__name__ == 'BoundFunctionWrapper' + + # Test context attributes work + with using_attributes(session_id="azure-async-test-session"): + pass + + finally: + instrumentor.uninstrument() + + def test_azure_openai_streaming(self, mock_azure_openai_requests): + """Test Azure OpenAI streaming instrumentation setup.""" + instrumentor = AzureOpenAIInstrumentor() + instrumentor.instrument(tracer_provider=self.trace_provider) + + try: + client = AzureOpenAI( + azure_endpoint="https://test-resource.openai.azure.com", + api_key="test-key", + api_version="2024-02-01", + ) + + # Verify the client is properly instrumented for streaming + assert hasattr(client, 'request') + assert type(client.request).__name__ == 'BoundFunctionWrapper' + + with using_attributes(session_id="azure-streaming-test-session"): + pass + + finally: + instrumentor.uninstrument() + + def test_instrumentor_uninstrumentation(self): + """Test that uninstrumentation properly restores original behavior.""" + instrumentor = AzureOpenAIInstrumentor() + + # Store original methods + original_request = openai.OpenAI.request + original_async_request = openai.AsyncOpenAI.request + + # Instrument + instrumentor.instrument(tracer_provider=self.trace_provider) + + # Methods should be wrapped (different types) + assert type(openai.OpenAI.request).__name__ == 'BoundFunctionWrapper' + assert type(openai.AsyncOpenAI.request).__name__ == 'BoundFunctionWrapper' + + # Uninstrument + instrumentor.uninstrument() + + # Methods should be restored (back to functions) + assert openai.OpenAI.request == original_request + assert openai.AsyncOpenAI.request == original_async_request + + def test_regular_openai_not_instrumented(self, mock_azure_openai_requests): + """Test that regular OpenAI clients are NOT instrumented by AzureOpenAIInstrumentor.""" + instrumentor = AzureOpenAIInstrumentor() + instrumentor.instrument(tracer_provider=self.trace_provider) + + try: + # A regular OpenAI client should pass through without tracing + regular_client = openai.OpenAI(api_key="test-key") + + with using_attributes(session_id="regular-openai-session"): + response = regular_client.chat.completions.create( + model="gpt-4o", + messages=[{"role": "user", "content": "Hello"}], + max_tokens=10 + ) + + # The call should still work (passed through) + assert mock_azure_openai_requests.called + + finally: + instrumentor.uninstrument()