diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/channel_service_adapter.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/channel_service_adapter.py index 20539ee6..f1b1c98a 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/channel_service_adapter.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/channel_service_adapter.py @@ -386,7 +386,7 @@ async def process_activity( if claims_identity.is_agent_claim(): outgoing_audience = claims_identity.get_token_audience() - activity.caller_id = f"{CallerIdConstants.agent_to_agent_prefix}{claims_identity.get_outgoing_app_id()}" + activity.caller_id = f"{CallerIdConstants.agent_to_agent_prefix.value}{claims_identity.get_outgoing_app_id()}" else: outgoing_audience = AuthenticationConstants.AGENTS_SDK_SCOPE diff --git a/test_samples/echo-a365-telemetry/.env.example b/test_samples/echo-a365-telemetry/.env.example new file mode 100644 index 00000000..4a05d679 --- /dev/null +++ b/test_samples/echo-a365-telemetry/.env.example @@ -0,0 +1,54 @@ +# ============================================================================= +# M365 Agents SDK — service principal credentials +# ============================================================================= +CONNECTIONS__SERVICE_CONNECTION__SETTINGS__CLIENTID=your-client-id +CONNECTIONS__SERVICE_CONNECTION__SETTINGS__CLIENTSECRET=your-client-secret +CONNECTIONS__SERVICE_CONNECTION__SETTINGS__TENANTID=your-tenant-id + +# ============================================================================= +# Azure OpenAI +# Endpoint format: https://.openai.azure.com +# ============================================================================= +AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com +AZURE_OPENAI_API_KEY=your-api-key +AZURE_OPENAI_DEPLOYMENT=gpt-4o-mini +AZURE_OPENAI_API_VERSION=2024-12-01-preview + +# Optional: override the system prompt sent to the model +# AGENT_SYSTEM_PROMPT=You are a helpful assistant. Respond concisely. + +# ============================================================================= +# Agent settings +# ============================================================================= +AGENT_NAME=EchoAgent +ENVIRONMENT=development +HOST=localhost +PORT=3978 + +# ============================================================================= +# OpenTelemetry — exporters +# ============================================================================= + +# OTLP (Aspire dashboard, Jaeger, OpenTelemetry Collector) +OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 +ENABLE_OTLP_EXPORTER=True +OTEL_EXPORTER_OTLP_INSECURE=true + +# Capture all request/response headers in spans +OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST=".*" +OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE=".*" +OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_REQUEST=".*" +OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_RESPONSE=".*" + +# Azure Monitor / Application Insights (optional — leave blank to skip) +# APPLICATIONINSIGHTS_CONNECTION_STRING=InstrumentationKey=your-key-here + +# ============================================================================= +# A365 observability SDK settings +# ============================================================================= +ENABLE_OBSERVABILITY=true +ENABLE_OPENTELEMETRY_SWITCH=true +ENABLE_OTEL=true +ENABLE_SENSITIVE_DATA=true +OBSERVABILITY_SERVICE_NAME=echo-a365-telemetry +OBSERVABILITY_SERVICE_NAMESPACE=agents-framework.samples diff --git a/test_samples/echo-a365-telemetry/app.py b/test_samples/echo-a365-telemetry/app.py new file mode 100644 index 00000000..2168eb9c --- /dev/null +++ b/test_samples/echo-a365-telemetry/app.py @@ -0,0 +1,139 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Echo Agent — Microsoft 365 Agents SDK with full A365 telemetry. + +Startup order (mirrors the C# sample): + 1. configure_otel_providers() — A365 observability framework bootstrap + 2. configure_opentelemetry() — local TracerProvider / MeterProvider / log bridge + 3. M365 SDK wiring — storage, adapter, auth, AgentApplication + 4. Route registration — /api/messages, /health, /alive +""" + +import logging +from os import environ, path + +from aiohttp import web +from aiohttp.web import Application, Request, Response +from dotenv import load_dotenv + +from agent_framework.observability import configure_otel_providers + +# Phase 1 — must run before any OTel proxy object is accessed +configure_otel_providers() + +logger = logging.getLogger(__name__) + +load_dotenv(path.join(path.dirname(__file__), ".env")) + +# --------------------------------------------------------------------------- +# Phase 2 — set up TracerProvider / MeterProvider / log bridge. +# OTel proxy objects in telemetry/agent_metrics.py resolve to real +# implementations once configure_opentelemetry() has run. +# --------------------------------------------------------------------------- +from telemetry import ( + configure_opentelemetry, + create_aiohttp_tracing_middleware, + setup_health_routes, +) + +configure_opentelemetry( + app_name=environ.get("AGENT_NAME", "EchoAgent"), + environment=environ.get("ENVIRONMENT", "development"), + otlp_endpoint=environ.get("OTEL_EXPORTER_OTLP_ENDPOINT"), + azure_monitor_connection_string=environ.get("APPLICATIONINSIGHTS_CONNECTION_STRING"), +) + +# --------------------------------------------------------------------------- +# Phase 3 — M365 Agents SDK +# --------------------------------------------------------------------------- +from microsoft_agents.hosting.core import ( + AgentApplication, + MemoryStorage, + TurnContext, + TurnState, +) +from microsoft_agents.hosting.aiohttp import ( + CloudAdapter, + jwt_authorization_middleware, + start_agent_process, +) +from microsoft_agents.hosting.core.app.oauth.authorization import Authorization +from microsoft_agents.activity import ActivityTypes, load_configuration_from_env +from microsoft_agents.authentication.msal import MsalConnectionManager + +from echo_agent import EchoAgent + + +async def messages_endpoint(request: Request) -> Response: + agent_app: AgentApplication = request.app["agent_app"] + adapter: CloudAdapter = request.app["adapter"] + return await start_agent_process(request, agent_app, adapter) + + +def create_app() -> Application: + agents_sdk_config = load_configuration_from_env(environ) + + storage = MemoryStorage() + connection_manager = MsalConnectionManager(**agents_sdk_config) + adapter = CloudAdapter(connection_manager=connection_manager) + authorization = Authorization(storage, connection_manager, **agents_sdk_config) + + agent_app = AgentApplication[TurnState]( + storage=storage, + adapter=adapter, + authorization=authorization, + **agents_sdk_config, + ) + + # The echo agent receives user_authorization so the A365 wrapper can cache + # the observability token — equivalent to injecting IExporterTokenCache in C#. + echo = EchoAgent(user_authorization=agent_app.auth) + + @agent_app.activity(ActivityTypes.message) + async def on_message(context: TurnContext, state: TurnState): + # Pre-fetch the agentic token so it is warm in the cache before the + # A365 wrapper tries to use it (mirrors the weather agent pattern). + # await agent_app.auth.get_token(context, "AGENTIC") + await echo.handle_message(context, state) + + @agent_app.conversation_update("membersAdded") + async def on_members_added(context: TurnContext, state: TurnState): + await echo.send_welcome(context, state) + + app = Application( + middlewares=[create_aiohttp_tracing_middleware(), jwt_authorization_middleware] + ) + + app.router.add_post("/api/messages", messages_endpoint) + app.router.add_get("/", lambda _: Response(text="Echo Agent is running", status=200)) + + # Register /health and /alive endpoints (development only). + is_development = environ.get("ENVIRONMENT", "development").lower() == "development" + setup_health_routes(app, development=is_development) + + app["agent_app"] = agent_app + app["adapter"] = adapter + app["agent_configuration"] = connection_manager.get_default_connection_configuration() + + return app + + +def main() -> None: + agent_name = environ.get("AGENT_NAME", "EchoAgent") + host = environ.get("HOST", "localhost") + port = int(environ.get("PORT", 3978)) + + print(f"\n{'='*60}") + print(f"Starting {agent_name} (M365 SDK + A365 Telemetry)") + print(f"{'='*60}") + print(f"Endpoint: http://{host}:{port}/api/messages") + print(f"Health: http://{host}:{port}/health") + print(f"Alive: http://{host}:{port}/alive") + print(f"{'='*60}\n") + + web.run_app(create_app(), host=host, port=port) + + +if __name__ == "__main__": + main() diff --git a/test_samples/echo-a365-telemetry/echo_agent.py b/test_samples/echo-a365-telemetry/echo_agent.py new file mode 100644 index 00000000..0ab2fc36 --- /dev/null +++ b/test_samples/echo-a365-telemetry/echo_agent.py @@ -0,0 +1,104 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Echo Agent — relays user text to an Azure OpenAI endpoint. + +The entire agent logic is intentionally minimal so that the telemetry +infrastructure (spans, metrics, baggage, token caching) stays in the +foreground. The AI call is a single-turn, stateless chat completion: + + User text → Azure OpenAI (openai) → reply to user + +No conversation history, no tools, no state persistence. +""" + +import logging +import traceback +from os import environ + +from openai import AzureOpenAI + +from microsoft_agents.hosting.core import Authorization, TurnContext, TurnState + +from telemetry import invoke_observed_agent_operation_with_context + +logger = logging.getLogger(__name__) + +_SYSTEM_PROMPT = environ.get( + "AGENT_SYSTEM_PROMPT", + "You are a helpful assistant. Respond concisely to the user's message.", +) +_WELCOME_MESSAGE = environ.get( + "AGENT_WELCOME_MESSAGE", + "Hello! I'm the Echo Agent. Send me any message and I'll relay it to Azure OpenAI.", +) + + +def _build_client() -> AzureOpenAI: + """Build an AzureOpenAI client. + + Requires AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY to be set. + """ + return AzureOpenAI( + azure_endpoint=environ["AZURE_OPENAI_ENDPOINT"], + api_key=environ["AZURE_OPENAI_API_KEY"], + api_version=environ.get("AZURE_OPENAI_API_VERSION", "2024-12-01-preview"), + ) + + +class EchoAgent: + """Stateless agent that echoes user text through Azure OpenAI.""" + + def __init__(self, user_authorization: Authorization = None): + self._user_authorization = user_authorization + self._client = _build_client() + self._model = environ.get("AZURE_OPENAI_DEPLOYMENT", "gpt-4o-mini") + logger.info("EchoAgent initialised (model=%s)", self._model) + + async def send_welcome(self, context: TurnContext, state: TurnState) -> None: + """Greet each new participant when they join the conversation.""" + for member in context.activity.members_added or []: + if member.id != context.activity.recipient.id: + await context.send_activity(_WELCOME_MESSAGE) + + async def handle_message(self, context: TurnContext, state: TurnState) -> None: + """Process an incoming message and relay the OpenAI reply to the user. + + The core logic is wrapped inside + ``invoke_observed_agent_operation_with_context`` so that every + invocation produces: + - a distributed trace span with activity attributes + - W3C baggage carrying tenant.id / agent.id + - per-turn observability token caching + - message counters and duration histograms + """ + user_text = (context.activity.text or "").strip() + if not user_text: + return + + logger.info("Received: %s", user_text) + + async def _process() -> None: + response = self._client.chat.completions.create( + model=self._model, + messages=[ + {"role": "system", "content": _SYSTEM_PROMPT}, + {"role": "user", "content": user_text}, + ], + ) + reply = response.choices[0].message.content + logger.info("OpenAI reply: %s", reply) + await context.send_activity(reply) + + try: + await invoke_observed_agent_operation_with_context( + "OnMessageActivity", + context, + state, + _process, + user_authorization=self._user_authorization, + ) + except Exception as exc: + logger.error("Error processing message: %s", exc) + traceback.print_exc() + await context.send_activity(f"Sorry, I encountered an error: {exc}") diff --git a/test_samples/echo-a365-telemetry/requirements.txt b/test_samples/echo-a365-telemetry/requirements.txt new file mode 100644 index 00000000..34ebf696 --- /dev/null +++ b/test_samples/echo-a365-telemetry/requirements.txt @@ -0,0 +1,39 @@ +# M365 Agents SDK +microsoft-agents-hosting-aiohttp +microsoft-agents-authentication-msal + +# Azure OpenAI +openai + +# Web framework +aiohttp + +# Configuration +python-dotenv + +# JWT decoding (for A365 observability wrapper) +PyJWT + +# OpenTelemetry — core +opentelemetry-api +opentelemetry-sdk + +# OpenTelemetry — exporters +# OTLP (traces, metrics, logs) — for Aspire dashboard, Jaeger, etc. +opentelemetry-exporter-otlp +# Azure Monitor / Application Insights +azure-monitor-opentelemetry-exporter + +# OpenTelemetry — library auto-instrumentation +opentelemetry-instrumentation-aiohttp-server +opentelemetry-instrumentation-aiohttp-client +opentelemetry-instrumentation-requests +opentelemetry-instrumentation-logging + +# A365 observability — AgentFramework automatic instrumentation +microsoft-agents-a365-runtime==0.2.1.dev43 +microsoft-agents-a365-observability-core==0.2.1.dev43 +microsoft-agents-a365-observability-extensions-agent-framework>=0.2.1.dev43 + +# Agent Framework (required for configure_otel_providers) +agent-framework-azure-ai diff --git a/test_samples/echo-a365-telemetry/telemetry/__init__.py b/test_samples/echo-a365-telemetry/telemetry/__init__.py new file mode 100644 index 00000000..caec5c4a --- /dev/null +++ b/test_samples/echo-a365-telemetry/telemetry/__init__.py @@ -0,0 +1,73 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Telemetry package for the Python Echo Agent. + +Python port of sample-agent/telemetry (AgentMetrics.cs, A365OtelWrapper.cs, +AgentOTELExtensions.cs). + +Typical usage in app.py:: + + from telemetry import configure_opentelemetry, setup_health_routes + from telemetry import create_aiohttp_tracing_middleware + from telemetry import invoke_observed_agent_operation_with_context + + # 1. Configure providers once at startup (before any span/metric activity) + configure_opentelemetry(app_name="EchoAgent", environment="development") + + # 2. Add tracing middleware to the aiohttp app + app = web.Application(middlewares=[create_aiohttp_tracing_middleware()]) + + # 3. Register /health and /alive endpoints (development only) + setup_health_routes(app, development=True) + + # 4. Wrap message handlers with observed operations + await invoke_observed_agent_operation_with_context( + "OnMessageActivity", turn_context, turn_state, handler_func + ) +""" + +from .a365_otel_wrapper import invoke_observed_agent_operation_with_context +from .agent_metrics import ( + SOURCE_NAME, + active_conversations, + finalize_message_handling_span, + initialize_message_handling_span, + invoke_observed_agent_operation, + invoke_observed_http_operation, + message_processed_counter, + message_processing_duration, + meter, + route_executed_counter, + route_execution_duration, + tracer, +) +from .agent_otel_extensions import ( + configure_opentelemetry, + create_aiohttp_tracing_middleware, + instrument_libraries, + setup_health_routes, +) + +__all__ = [ + # agent_metrics + "SOURCE_NAME", + "tracer", + "meter", + "message_processed_counter", + "route_executed_counter", + "message_processing_duration", + "route_execution_duration", + "active_conversations", + "initialize_message_handling_span", + "finalize_message_handling_span", + "invoke_observed_http_operation", + "invoke_observed_agent_operation", + # a365_otel_wrapper + "invoke_observed_agent_operation_with_context", + # agent_otel_extensions + "configure_opentelemetry", + "instrument_libraries", + "setup_health_routes", + "create_aiohttp_tracing_middleware", +] diff --git a/test_samples/echo-a365-telemetry/telemetry/a365_otel_wrapper.py b/test_samples/echo-a365-telemetry/telemetry/a365_otel_wrapper.py new file mode 100644 index 00000000..e3ba8d18 --- /dev/null +++ b/test_samples/echo-a365-telemetry/telemetry/a365_otel_wrapper.py @@ -0,0 +1,147 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Azure 365 observability wrapper for agent operations. + +Python port of A365OtelWrapper.cs from sample-agent/telemetry. + +The C# original depends on the internal +``Microsoft.Agents.A365.Observability`` packages which are .NET-only. +This Python port preserves the observable contract: + +1. Resolve the tenant ID and agent ID from the current turn activity. +2. Propagate them as `OpenTelemetry baggage + `_ so that + downstream services can consume the context. +3. Delegate to :func:`~telemetry.agent_metrics.invoke_observed_agent_operation` + for span creation and metric recording. +4. Cache the observability token per-turn using the activity-derived IDs, + equivalent to ``agentTokenCache.RegisterObservability()`` in C#. +""" + +import logging +import uuid +import jwt # PyJWT library +from typing import Awaitable, Callable, Optional + +from microsoft_agents.hosting.core import Authorization, TurnContext +from opentelemetry import baggage +from opentelemetry import context as otel_context + +from microsoft_agents.hosting.core import AccessTokenProviderBase + +from .agent_metrics import invoke_observed_agent_operation +from .token_cache import cache_agentic_token + +logger = logging.getLogger(__name__) + +_EMPTY_GUID = str(uuid.UUID(int=0)) + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +async def invoke_observed_agent_operation_with_context( + operation_name: str, + turn_context: TurnContext, + turn_state, + func: Callable[[], Awaitable[None]], + user_authorization: Optional[Authorization] = None, + auth_handler: Optional[str] = None, +) -> None: + """Wrap an agent operation with A365 observability baggage context. + + Equivalent to ``A365OtelWrapper.InvokeObservedAgentOperation()`` in C#. + + Resolves the tenant ID and agent ID from the activity, sets them as + OpenTelemetry baggage (equivalent to the C# ``BaggageBuilder``), caches + the observability token per-turn when ``user_authorization`` is provided (equivalent + to ``agentTokenCache.RegisterObservability()``), then delegates to + :func:`~telemetry.agent_metrics.invoke_observed_agent_operation` for span + management and metric recording. + + Args: + operation_name: Human-readable name of the operation / handler. + turn_context: The current :class:`TurnContext`. + turn_state: The current :class:`TurnState`. + func: Async function containing the agent logic to execute. + user_authorization: Optional MSAL authentication provider used to fetch and + cache the observability token for the activity-derived IDs. + auth_handler: Optional string representing the authentication handler. + """ + agent_id, tenant_id = await _resolve_tenant_and_agent_id(turn_context, user_authorization, auth_handler) + + if user_authorization is not None: + await _cache_observability_token(tenant_id, agent_id, user_authorization) + + async def _with_baggage(): + # Set tenant.id and agent.id as baggage — equivalent to BaggageBuilder + # in C#, which adds these values to the W3C baggage header so that + # downstream services can read them. + ctx = baggage.set_baggage("tenant.id", tenant_id) + ctx = baggage.set_baggage("agent.id", agent_id, context=ctx) + token = otel_context.attach(ctx) + try: + await func() + finally: + otel_context.detach(token) + + await invoke_observed_agent_operation(operation_name, turn_context, _with_baggage) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +async def _cache_observability_token(tenant_id: str, agent_id: str, authorization: Authorization) -> None: + """Fetch and cache the observability token using activity-derived IDs. + + Equivalent to ``agentTokenCache.RegisterObservability()`` in C#. + MSAL caches tokens internally, so repeated calls within the token lifetime + do not incur additional network requests. + """ + try: + token = await authorization.get_token(tenant_id, agent_id) + cache_agentic_token(tenant_id, agent_id, token) + logger.debug("Observability token cached (tenant=%s, agent=%s)", tenant_id, agent_id) + except Exception as exc: + logger.warning("Failed to cache observability token: %s", exc) + + +async def _resolve_tenant_and_agent_id(turn_context: TurnContext, user_authorization: Optional[Authorization] = None, auth_handler: Optional[str] = None) -> tuple[str, str]: + """Extract tenant and agent IDs from the turn activity. + + Args: + turn_context: The current :class:`TurnContext`. + user_authorization: Optional MSAL authentication provider used to fetch the token. + auth_handler: Optional string representing the authentication handler. + + Returns: + ``(agent_id, tenant_id)`` as strings. + """ + activity = turn_context.activity + if activity is None: + return _EMPTY_GUID, _EMPTY_GUID + + # agentic_token = await user_authorization.get_token(turn_context, auth_handler or "AGENTIC") if user_authorization else None + + agent_id = activity.get_agentic_instance_id() + agent_id = agent_id or _EMPTY_GUID + + tenant_id = activity.conversation.tenant_id if activity.conversation and activity.conversation.tenant_id else _EMPTY_GUID + + return agent_id, tenant_id + + +def _get_app_id_from_token(token: str) -> str: + """Extract the app ID from the JWT token.""" + try: + decoded = jwt.decode(token, options={"verify_signature": False}) + app_id = decoded.get("appid") or decoded.get("azp") or _EMPTY_GUID + return str(app_id) + except Exception as exc: + logger.warning("Failed to decode token for app ID extraction: %s", exc) + return _EMPTY_GUID diff --git a/test_samples/echo-a365-telemetry/telemetry/agent_metrics.py b/test_samples/echo-a365-telemetry/telemetry/agent_metrics.py new file mode 100644 index 00000000..e1fe4230 --- /dev/null +++ b/test_samples/echo-a365-telemetry/telemetry/agent_metrics.py @@ -0,0 +1,236 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Agent metrics and distributed tracing for OpenTelemetry instrumentation. + +Python port of AgentMetrics.cs from sample-agent/telemetry. + +Provides: +- An ActivitySource-equivalent tracer named "A365.AgentFramework" +- A Meter with the same name for counters, histograms, and up-down counters +- Helper functions to start/finalize message-handling spans +- Observed-operation wrappers (sync and async) +""" + +import logging +import time +from typing import Awaitable, Callable + +from opentelemetry import context as otel_context +from opentelemetry import metrics, trace +from opentelemetry.trace import StatusCode + +logger = logging.getLogger(__name__) + +# Equivalent to ActivitySource name in C# +SOURCE_NAME = "A365.AgentFramework" + +# Tracer — equivalent to `new ActivitySource(SourceName)` in C# +# Uses a ProxyTracer until configure_opentelemetry() sets a real TracerProvider. +tracer = trace.get_tracer(SOURCE_NAME, "1.0.0") + +# Meter — equivalent to `new Meter("A365.AgentFramework", "1.0.0")` in C# +meter = metrics.get_meter(SOURCE_NAME, "1.0.0") + +# --------------------------------------------------------------------------- +# Metrics — mirrors the static Counter/Histogram/UpDownCounter fields in C# +# --------------------------------------------------------------------------- + +message_processed_counter = meter.create_counter( + "agent.messages.processed", + unit="messages", + description="Number of messages processed by the agent", +) + +route_executed_counter = meter.create_counter( + "agent.routes.executed", + unit="routes", + description="Number of routes executed by the agent", +) + +message_processing_duration = meter.create_histogram( + "agent.message.processing.duration", + unit="ms", + description="Duration of message processing in milliseconds", +) + +route_execution_duration = meter.create_histogram( + "agent.route.execution.duration", + unit="ms", + description="Duration of route execution in milliseconds", +) + +active_conversations = meter.create_up_down_counter( + "agent.conversations.active", + unit="conversations", + description="Number of active conversations", +) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +def _get_activity_attrs(context) -> dict: + """Extract activity attributes from a TurnContext.""" + attrs: dict = {} + activity = getattr(context, "activity", None) + if activity is None: + return attrs + + attrs["Activity.Type"] = str(getattr(activity, "type", "") or "") + + # 'from' is a Python keyword; the SDK stores it as from_property or _from. + from_obj = ( + getattr(activity, "from_property", None) + or getattr(activity, "_from", None) + # getattr with string "from" works at runtime despite being a keyword + or getattr(activity, "from", None) + ) + attrs["Caller.Id"] = str(getattr(from_obj, "id", "") or "") + + conversation = getattr(activity, "conversation", None) + attrs["Conversation.Id"] = str(getattr(conversation, "id", "") or "") + attrs["Channel.Id"] = str(getattr(activity, "channel_id", "") or "") + + text = getattr(activity, "text", "") or "" + attrs["Message.Text.Length"] = len(text) + attrs["Message.Id"] = str(getattr(activity, "id", "") or "") + attrs["Message.Text"] = text[:200] # truncate to avoid oversized attributes + + return attrs + + +# --------------------------------------------------------------------------- +# Public API — mirrors AgentMetrics static methods in C# +# --------------------------------------------------------------------------- + +def initialize_message_handling_span(handler_name: str, context) -> trace.Span: + """Start a tracing span with contextual tags from the turn activity. + + Equivalent to ``InitializeMessageHandlingActivity()`` in C#. + + The caller is responsible for ending the span (use + ``finalize_message_handling_span`` or call ``span.end()`` directly). + + Args: + handler_name: Name used as the span name (e.g. ``"OnMessageActivity"``). + context: TurnContext whose activity fields are attached as span attributes. + + Returns: + A started (but not yet current) :class:`opentelemetry.trace.Span`. + """ + span = tracer.start_span(handler_name) + attrs = _get_activity_attrs(context) + + # Set individual attributes on the span (mirrors activity?.SetTag calls in C#) + for key in ("Activity.Type", "Caller.Id", "Conversation.Id", "Channel.Id"): + span.set_attribute(key, attrs.get(key, "")) + span.set_attribute("Message.Text.Length", attrs.get("Message.Text.Length", 0)) + # Tag whether the request came from an agentic caller + span.set_attribute("Agent.IsAgentic", bool(getattr(getattr(context, "activity", None), "is_agentic", False))) + + # Equivalent to activity?.AddEvent(new ActivityEvent("Message.Processed", ...)) + span.add_event( + "Message.Processed", + attributes={ + "Caller.Id": attrs.get("Caller.Id", ""), + "Channel.Id": attrs.get("Channel.Id", ""), + "Message.Id": attrs.get("Message.Id", ""), + "Message.Text": attrs.get("Message.Text", ""), + }, + ) + return span + + +def finalize_message_handling_span( + span: trace.Span, + context, + duration_ms: float, + success: bool, +) -> None: + """Record duration metrics and end the span. + + Equivalent to ``FinalizeMessageHandlingActivity()`` in C#. + + Args: + span: The span returned by :func:`initialize_message_handling_span`. + context: TurnContext used to label the metric dimensions. + duration_ms: Elapsed time in milliseconds. + success: ``True`` → span status OK; ``False`` → span status ERROR. + """ + attrs = _get_activity_attrs(context) + conversation_id = attrs.get("Conversation.Id") or "unknown" + channel_id = attrs.get("Channel.Id") or "unknown" + + message_processing_duration.record( + duration_ms, + {"Conversation.Id": conversation_id, "Channel.Id": channel_id}, + ) + + route_executed_counter.add( + 1, + {"Route.Type": "message_handler", "Conversation.Id": conversation_id}, + ) + + span.set_status(StatusCode.OK if success else StatusCode.ERROR) + span.end() + + +def invoke_observed_http_operation(operation_name: str, func: Callable) -> None: + """Wrap a synchronous callable with a tracing span. + + Equivalent to ``InvokeObservedHttpOperation()`` in C#. + + Args: + operation_name: Span name. + func: Synchronous callable to execute. + """ + with tracer.start_as_current_span(operation_name) as span: + try: + func() + span.set_status(StatusCode.OK) + except Exception as ex: + span.set_status(StatusCode.ERROR, str(ex)) + span.record_exception(ex) + raise + + +async def invoke_observed_agent_operation( + operation_name: str, + context, + func: Callable[[], Awaitable[None]], +) -> None: + """Async wrapper for an agent operation with full metrics and tracing. + + Equivalent to ``AgentMetrics.InvokeObservedAgentOperation()`` in C#. + + Increments the message-processed counter, opens a span, sets it as the + current context, awaits *func*, then records the duration and finalises + the span. + + Args: + operation_name: Span / operation name. + context: TurnContext used for span attributes and metric labels. + func: Async function containing the agent logic. + """ + message_processed_counter.add(1) + + span = initialize_message_handling_span(operation_name, context) + # Make the span the active span for the duration of the call + ctx = trace.set_span_in_context(span) + token = otel_context.attach(ctx) + + start_time = time.monotonic() + success = True + try: + await func() + except Exception as ex: + success = False + span.set_status(StatusCode.ERROR, str(ex)) + span.record_exception(ex) + raise + finally: + otel_context.detach(token) + duration_ms = (time.monotonic() - start_time) * 1000 + finalize_message_handling_span(span, context, duration_ms, success) diff --git a/test_samples/echo-a365-telemetry/telemetry/agent_otel_extensions.py b/test_samples/echo-a365-telemetry/telemetry/agent_otel_extensions.py new file mode 100644 index 00000000..0c4cf9c9 --- /dev/null +++ b/test_samples/echo-a365-telemetry/telemetry/agent_otel_extensions.py @@ -0,0 +1,505 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""OpenTelemetry configuration helpers for the Python Echo Agent. + +Python port of AgentOTELExtensions.cs from sample-agent/telemetry. + +Adds common observability services: + - TracerProvider with resource metadata + - MeterProvider with custom agent meters + - aiohttp tracing middleware (excludes health-check paths) + - /health and /alive endpoint registration + +Exporter support (install the matching package to activate): + - OTLP (Aspire, Jaeger, etc.): + pip install opentelemetry-exporter-otlp-proto-grpc + - Azure Monitor / Application Insights: + pip install azure-monitor-opentelemetry-exporter + - Console (default when no other exporter is configured) + +To learn more about the local Aspire dashboard, see: + https://learn.microsoft.com/dotnet/aspire/fundamentals/dashboard/standalone +""" + +import logging +import os +import socket +from typing import Optional + +from opentelemetry import trace +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.sdk.resources import ( + SERVICE_INSTANCE_ID, + SERVICE_NAME, + SERVICE_VERSION, + Resource, +) +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.trace import StatusCode + +logger = logging.getLogger(__name__) + +try: + from microsoft_agents_a365.observability.extensions.agentframework import AgentFrameworkInstrumentor as _AgentFrameworkInstrumentor + from microsoft_agents_a365.observability.core.config import configure as _configure_agent_framework_observability + _HAS_AGENT_FRAMEWORK_INSTRUMENTOR = True +except ImportError as exc: + logger.debug( + "AgentFrameworkInstrumentor not available — Agent Framework-specific telemetry will be disabled. " + f"ImportError: {exc}" + ) + _HAS_AGENT_FRAMEWORK_INSTRUMENTOR = False + +HEALTH_ENDPOINT_PATH = "/health" +ALIVENESS_ENDPOINT_PATH = "/alive" + +_SERVICE_NAMESPACE = "Microsoft.Agents" +_SOURCE_NAME = "A365.AgentFramework" + + +# --------------------------------------------------------------------------- +# Primary public API +# --------------------------------------------------------------------------- + + +def instrument_libraries() -> None: + """Instrument common HTTP libraries for automatic OpenTelemetry tracing. + + Equivalent to the ``Use*Instrumentation()`` calls in the C# host builder. + Instruments: + - aiohttp server (inbound requests) + - aiohttp client (outbound requests) with URL enrichment hooks + - requests library (outbound requests) with URL enrichment hooks + + Call this once at startup, before the server starts handling requests. + Each instrumentor is guarded with a try/except so missing optional packages + do not prevent the server from starting. + """ + # aiohttp server + try: + from opentelemetry.instrumentation.aiohttp_server import AioHttpServerInstrumentor + AioHttpServerInstrumentor().instrument() + logger.debug("aiohttp server instrumentation enabled") + except ImportError: + logger.debug( + "opentelemetry-instrumentation-aiohttp-server not installed. " + "Install: pip install opentelemetry-instrumentation-aiohttp-server" + ) + + # aiohttp client + try: + import aiohttp + from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor + from opentelemetry.trace import Span + + def _aiohttp_client_request_hook(span: Span, params: aiohttp.TraceRequestStartParams): + if span and span.is_recording(): + span.set_attribute("http.url", str(params.url)) + + def _aiohttp_client_response_hook(span: Span, params): + if span and span.is_recording(): + span.set_attribute("http.url", str(params.url)) + + AioHttpClientInstrumentor().instrument( + request_hook=_aiohttp_client_request_hook, + response_hook=_aiohttp_client_response_hook, + ) + logger.debug("aiohttp client instrumentation enabled") + except ImportError: + logger.debug( + "opentelemetry-instrumentation-aiohttp-client not installed. " + "Install: pip install opentelemetry-instrumentation-aiohttp-client" + ) + + # requests library + try: + import requests as _requests + from opentelemetry.instrumentation.requests import RequestsInstrumentor + from opentelemetry.trace import Span + + def _requests_request_hook(span: Span, request: _requests.Request): + if span and span.is_recording(): + span.set_attribute("http.url", request.url) + + def _requests_response_hook(span: Span, request: _requests.Request, response: _requests.Response): + if span and span.is_recording(): + span.set_attribute("http.url", response.url) + + RequestsInstrumentor().instrument( + request_hook=_requests_request_hook, + response_hook=_requests_response_hook, + ) + logger.debug("requests instrumentation enabled") + except ImportError: + logger.debug( + "opentelemetry-instrumentation-requests not installed. " + "Install: pip install opentelemetry-instrumentation-requests" + ) + + # AgentFramework instrumentor (optional A365 package) + if _HAS_AGENT_FRAMEWORK_INSTRUMENTOR: + try: + from .token_cache import get_cached_agentic_token + + def _token_resolver(agent_id: str, tenant_id: str) -> str | None: + return get_cached_agentic_token(tenant_id, agent_id) + + _configure_agent_framework_observability( + service_name="AgentFrameworkTracingWithAzureAIFoundry", + service_namespace="AgentFrameworkTesting", + token_resolver=_token_resolver, + ) + try: + _AgentFrameworkInstrumentor().instrument( + skip_dep_check=True, + ) + except TypeError: + _AgentFrameworkInstrumentor().instrument(skip_dep_check=True) + logger.debug("AgentFramework instrumentation enabled") + except Exception as exc: + logger.warning("AgentFramework instrumentation failed: %s", exc) + + +def configure_opentelemetry( + app_name: str = "A365.AgentFramework", + environment: str = "development", + otlp_endpoint: Optional[str] = None, + azure_monitor_connection_string: Optional[str] = None, +) -> None: + """Configure global TracerProvider, MeterProvider, and logging bridge. + + Equivalent to ``ConfigureOpenTelemetry()`` in C#. + + Call this **once at startup**, before any code creates spans or records + metrics. The OTel proxy objects in *agent_metrics.py* will automatically + route to the real providers once this function has run. + + Args: + app_name: Service name shown in traces/dashboards. + environment: Deployment environment tag (e.g. ``"development"``, + ``"production"``). + otlp_endpoint: OTLP collector URL. Falls back to the + ``OTEL_EXPORTER_OTLP_ENDPOINT`` environment variable when *None*. + azure_monitor_connection_string: Application Insights connection + string. Falls back to ``APPLICATIONINSIGHTS_CONNECTION_STRING`` + when *None*. + """ + resource = Resource.create( + { + SERVICE_NAME: _SOURCE_NAME, + SERVICE_VERSION: "1.0.0", + SERVICE_INSTANCE_ID: socket.gethostname(), + "deployment.environment": environment, + "service.namespace": _SERVICE_NAMESPACE, + } + ) + + _configure_tracing(resource, app_name, otlp_endpoint, azure_monitor_connection_string) + _configure_metrics(resource, otlp_endpoint, azure_monitor_connection_string) + _configure_logging(resource, otlp_endpoint) + instrument_libraries() + + logger.info( + "OpenTelemetry configured — service: %s, environment: %s", + app_name, + environment, + ) + + +def setup_health_routes(app, development: bool = True) -> None: + """Register ``/health`` and ``/alive`` endpoints on an aiohttp Application. + + Equivalent to ``MapDefaultEndpoints()`` in C#. Mirrors the C# behaviour + of only registering the endpoints in non-production environments. + + Args: + app: :class:`aiohttp.web.Application` instance. + development: When ``False`` the endpoints are **not** registered + (matches the C# guard on ``IsDevelopment()``). + """ + if not development: + return + + from aiohttp import web + + async def health_handler(_request): + return web.Response( + text='{"status":"Healthy"}', + content_type="application/json", + ) + + async def alive_handler(_request): + return web.Response( + text='{"status":"Alive"}', + content_type="application/json", + ) + + app.router.add_get(HEALTH_ENDPOINT_PATH, health_handler) + app.router.add_get(ALIVENESS_ENDPOINT_PATH, alive_handler) + logger.info( + "Health check endpoints registered: %s, %s", + HEALTH_ENDPOINT_PATH, + ALIVENESS_ENDPOINT_PATH, + ) + + +def create_aiohttp_tracing_middleware(): + """Return an aiohttp middleware that traces every non-health-check request. + + Equivalent to the ``AddAspNetCoreInstrumentation()`` configuration in C#: + - Filters out ``/health`` and ``/alive`` paths. + - Enriches spans with ``http.request.body.size`` and ``user_agent`` on + request, and ``http.status_code`` / ``http.response.body.size`` on + response. + - Records exceptions and sets the span status accordingly. + """ + from aiohttp import web + + _tracer = trace.get_tracer(_SOURCE_NAME, "1.0.0") + + @web.middleware + async def tracing_middleware(request, handler): + # Exclude health check requests from tracing (mirrors C# filter lambda) + if request.path.startswith(HEALTH_ENDPOINT_PATH) or request.path.startswith( + ALIVENESS_ENDPOINT_PATH + ): + return await handler(request) + + span_name = f"{request.method} {request.path}" + with _tracer.start_as_current_span(span_name) as span: + # Enrich with request details — equivalent to EnrichWithHttpRequest + span.set_attribute("http.method", request.method) + span.set_attribute("http.url", str(request.url)) + span.set_attribute("http.request.body.size", request.content_length or 0) + user_agent = request.headers.get("User-Agent", "") + if user_agent: + span.set_attribute("user_agent", user_agent) + + try: + response = await handler(request) + # Enrich with response details — equivalent to EnrichWithHttpResponse + span.set_attribute("http.status_code", response.status) + span.set_attribute( + "http.response.body.size", response.content_length or 0 + ) + span.set_status(StatusCode.OK) + return response + except Exception as ex: + span.set_status(StatusCode.ERROR, str(ex)) + span.record_exception(ex) + raise + + return tracing_middleware + + +# --------------------------------------------------------------------------- +# Internal helpers — mirrors private methods in AgentOTELExtensions.cs +# --------------------------------------------------------------------------- + + +def _configure_tracing( + resource: Resource, + app_name: str, + otlp_endpoint: Optional[str], + azure_monitor_connection_string: Optional[str], +) -> None: + """Build and register the global TracerProvider.""" + tracer_provider = TracerProvider(resource=resource) + + has_real_exporter = False + + # OTLP exporter — equivalent to UseOtlpExporter() in C# + resolved_otlp = otlp_endpoint or os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") + if resolved_otlp: + _add_otlp_trace_exporter(tracer_provider, resolved_otlp) + has_real_exporter = True + + # Azure Monitor exporter — equivalent to UseAzureMonitor() in C# + resolved_az = azure_monitor_connection_string or os.environ.get( + "APPLICATIONINSIGHTS_CONNECTION_STRING" + ) + if resolved_az: + _add_azure_monitor_trace_exporter(tracer_provider, resolved_az) + has_real_exporter = True + + # Console exporter for local development (no production exporter configured) + if not has_real_exporter: + _add_console_trace_exporter(tracer_provider) + + trace.set_tracer_provider(tracer_provider) + + +def _configure_metrics( + resource: Resource, + otlp_endpoint: Optional[str], + azure_monitor_connection_string: Optional[str], +) -> None: + """Build and register the global MeterProvider.""" + from opentelemetry import metrics + + readers = [] + + resolved_otlp = otlp_endpoint or os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") + if resolved_otlp: + reader = _create_otlp_metric_reader(resolved_otlp) + if reader: + readers.append(reader) + + resolved_az = azure_monitor_connection_string or os.environ.get( + "APPLICATIONINSIGHTS_CONNECTION_STRING" + ) + if resolved_az: + reader = _create_azure_monitor_metric_reader(resolved_az) + if reader: + readers.append(reader) + + if not readers: + readers.append(_create_console_metric_reader()) + + meter_provider = MeterProvider(resource=resource, metric_readers=readers) + metrics.set_meter_provider(meter_provider) + + +def _configure_logging( + resource: Resource, + otlp_endpoint: Optional[str], +) -> None: + """Configure OTEL LoggerProvider and bridge Python logging. + + Equivalent to ``builder.Logging.AddOpenTelemetry(...)`` in C#. + + When an OTLP endpoint is available, a full LoggerProvider is created and + log records are exported via OTLP (matching the otel sample behaviour). + In all cases, Python log records are correlated with the active trace + context when ``opentelemetry-instrumentation-logging`` is installed. + + Args: + resource: The shared OTel Resource created in configure_opentelemetry. + otlp_endpoint: OTLP collector URL, or None to skip OTLP log export. + """ + resolved_otlp = otlp_endpoint or os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") + if resolved_otlp: + try: + from opentelemetry._logs import set_logger_provider + from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler + from opentelemetry.sdk._logs.export import BatchLogRecordProcessor + from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter + + logger_provider = LoggerProvider(resource=resource) + logger_provider.add_log_record_processor( + BatchLogRecordProcessor(OTLPLogExporter(endpoint=resolved_otlp)) + ) + set_logger_provider(logger_provider) + + handler = LoggingHandler(level=logging.NOTSET, logger_provider=logger_provider) + logging.getLogger().addHandler(handler) + logger.info("OTLP log exporter → %s", resolved_otlp) + except ImportError: + logger.warning( + "OTLP log exporter requested but package not found. " + "Install: pip install opentelemetry-exporter-otlp-proto-grpc" + ) + + # Bridge Python logging to trace context (adds trace_id/span_id to log records) + try: + from opentelemetry.instrumentation.logging import LoggingInstrumentor + + LoggingInstrumentor().instrument(set_logging_format=True) + logger.debug("OTEL logging instrumentation enabled") + except ImportError: + logger.debug( + "opentelemetry-instrumentation-logging not installed — " + "Python log records will not be correlated with traces. " + "Install with: pip install opentelemetry-instrumentation-logging" + ) + + +# -- Trace exporter helpers -------------------------------------------------- + + +def _add_otlp_trace_exporter( + tracer_provider: TracerProvider, endpoint: str +) -> None: + try: + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, + ) + + exporter = OTLPSpanExporter(endpoint=endpoint) + tracer_provider.add_span_processor(BatchSpanProcessor(exporter)) + logger.info("OTLP trace exporter → %s", endpoint) + except ImportError: + logger.warning( + "OTLP trace exporter requested but package not found. " + "Install: pip install opentelemetry-exporter-otlp-proto-grpc" + ) + + +def _add_azure_monitor_trace_exporter( + tracer_provider: TracerProvider, connection_string: str +) -> None: + try: + from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter + + exporter = AzureMonitorTraceExporter(connection_string=connection_string) + tracer_provider.add_span_processor(BatchSpanProcessor(exporter)) + logger.info("Azure Monitor trace exporter configured") + except ImportError: + logger.warning( + "Azure Monitor trace exporter requested but package not found. " + "Install: pip install azure-monitor-opentelemetry-exporter" + ) + + +def _add_console_trace_exporter(tracer_provider: TracerProvider) -> None: + from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor + + tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) + logger.debug("Console span exporter active (development mode)") + + +# -- Metric reader helpers ---------------------------------------------------- + + +def _create_otlp_metric_reader(endpoint: str) -> Optional[PeriodicExportingMetricReader]: + try: + from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( + OTLPMetricExporter, + ) + + exporter = OTLPMetricExporter(endpoint=endpoint) + logger.info("OTLP metric exporter → %s", endpoint) + return PeriodicExportingMetricReader(exporter) + except ImportError: + logger.warning( + "OTLP metric exporter requested but package not found. " + "Install: pip install opentelemetry-exporter-otlp-proto-grpc" + ) + return None + + +def _create_azure_monitor_metric_reader( + connection_string: str, +) -> Optional[PeriodicExportingMetricReader]: + try: + from azure.monitor.opentelemetry.exporter import AzureMonitorMetricsExporter + + exporter = AzureMonitorMetricsExporter(connection_string=connection_string) + logger.info("Azure Monitor metric exporter configured") + return PeriodicExportingMetricReader(exporter) + except ImportError: + logger.warning( + "Azure Monitor metric exporter requested but package not found. " + "Install: pip install azure-monitor-opentelemetry-exporter" + ) + return None + + +def _create_console_metric_reader() -> PeriodicExportingMetricReader: + from opentelemetry.sdk.metrics.export import ConsoleMetricExporter + + logger.debug("Console metric exporter active (development mode)") + return PeriodicExportingMetricReader(ConsoleMetricExporter()) diff --git a/test_samples/echo-a365-telemetry/telemetry/token_cache.py b/test_samples/echo-a365-telemetry/telemetry/token_cache.py new file mode 100644 index 00000000..0fb9872e --- /dev/null +++ b/test_samples/echo-a365-telemetry/telemetry/token_cache.py @@ -0,0 +1,31 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +""" +Token caching utilities for Agent 365 Observability exporter authentication. +""" + +import logging + +logger = logging.getLogger(__name__) + +# Global token cache for Agent 365 Observability exporter +_agentic_token_cache = {} + + +def cache_agentic_token(tenant_id: str, agent_id: str, token: str) -> None: + """Cache the agentic token for use by Agent 365 Observability exporter.""" + key = f"{tenant_id}:{agent_id}" + _agentic_token_cache[key] = token + logger.debug(f"Cached agentic token for {key}") + + +def get_cached_agentic_token(tenant_id: str, agent_id: str) -> str | None: + """Retrieve cached agentic token for Agent 365 Observability exporter.""" + key = f"{tenant_id}:{agent_id}" + token = _agentic_token_cache.get(key) + if token: + logger.debug(f"Retrieved cached agentic token for {key}") + else: + logger.debug(f"No cached token found for {key}") + return token diff --git a/test_samples/weather-agent-framework/.env.example b/test_samples/weather-agent-framework/.env.example new file mode 100644 index 00000000..12294cd8 --- /dev/null +++ b/test_samples/weather-agent-framework/.env.example @@ -0,0 +1,41 @@ +# M365 Agents SDK configuration +CONNECTIONS__SERVICE_CONNECTION__SETTINGS__CLIENTID=client-id-here +CONNECTIONS__SERVICE_CONNECTION__SETTINGS__CLIENTSECRET=client-secret-here +CONNECTIONS__SERVICE_CONNECTION__SETTINGS__TENANTID=tenant-id-here + +# Azure OpenAI Configuration +AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/ +AZURE_OPENAI_API_KEY=your-api-key-here +AZURE_OPENAI_DEPLOYMENT=gpt-4o-mini +AZURE_OPENAI_API_VERSION=2024-12-01-preview + +# OpenWeatherMap API Key +# Get your free API key from: https://openweathermap.org/price +OPENWEATHER_API_KEY=your-openweather-api-key + + +# Observability +OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 +ENABLE_OTLP_EXPORTER=True +OTEL_EXPORTER_OTLP_INSECURE=true + +OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST=".*" +OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE=".*" + +OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_REQUEST=".*" +OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_CLIENT_RESPONSE=".*" +# APPLICATIONINSIGHTS_CONNECTION_STRING=InstrumentationKey=your-key-here + +# Required for observability SDK +ENABLE_OBSERVABILITY=true +#ENABLE_A365_OBSERVABILITY_EXPORTER=false +PYTHON_ENVIRONMENT=development +ENABLE_OPENTELEMETRY_SWITCH=true + +# Observability Configuration +OBSERVABILITY_SERVICE_NAME=agent-framework-sample +OBSERVABILITY_SERVICE_NAMESPACE=agents-framework.samples + +# Enable otel logs on AgentFramework SDK. Required for auto instrumentation +ENABLE_OTEL=true +ENABLE_SENSITIVE_DATA=true diff --git a/test_samples/weather-agent-framework/README.md b/test_samples/weather-agent-framework/README.md new file mode 100644 index 00000000..919a01f4 --- /dev/null +++ b/test_samples/weather-agent-framework/README.md @@ -0,0 +1,145 @@ +# Weather Agent Sample + +A sample agent built with the Microsoft 365 Agents SDK for Python. It uses Azure OpenAI for natural language understanding, OpenWeatherMap for weather lookups, and exports telemetry to an Aspire dashboard via OpenTelemetry. + +## Prerequisites + +- **Python 3.11+** (3.10+ supported) +- **Docker** (for the Aspire observability dashboard) +- **Azure OpenAI** deployment +- **OpenWeatherMap API key** — get a free key at +- **Azure AD app registration** (for M365/Teams auth) — or use the default dev values + +## 1. Start the Aspire Dashboard + +The sample exports traces, metrics, and logs via OTLP. Run the Aspire dashboard container so there is a collector listening: + +```bash +docker run --rm -it -p 18888:18888 -p 4317:18889 \ + --name aspire-dashboard \ + mcr.microsoft.com/dotnet/aspire-dashboard:latest +``` + +Once running, open **http://localhost:18888** in your browser to view telemetry. + +> The agent sends OTLP data to `localhost:4317`, which the container maps to its internal port `18889`. + +## 2. Create a Virtual Environment & Install Dependencies + +From the repository root: + +```bash +python -m venv .venv +# Linux / macOS +source .venv/bin/activate +# Windows +.venv\Scripts\activate +``` + +Then install the SDK libraries in editable mode (if not already): + +```bash +pip install -e ./libraries/microsoft-agents-activity/ --config-settings editable_mode=compat +pip install -e ./libraries/microsoft-agents-hosting-core/ --config-settings editable_mode=compat +pip install -e ./libraries/microsoft-agents-authentication-msal/ --config-settings editable_mode=compat +pip install -e ./libraries/microsoft-agents-hosting-aiohttp/ --config-settings editable_mode=compat +``` + +Install the sample's own dependencies: + +```bash +pip install -r test_samples/weather-agent-framework/requirements.txt +``` + +## 3. Configure the `.env` File + +Copy the example and fill in your values: + +```bash +cp test_samples/weather-agent-framework/.env.example test_samples/weather-agent-framework/.env +``` + +Open the `.env` file and set the following variables: + +### M365 Agents SDK / Azure AD + +| Variable | Where to get it | +|---|---| +| `CONNECTIONS__SERVICE_CONNECTION__SETTINGS__CLIENTID` | Azure Portal → **App registrations** → your app → **Application (client) ID** | +| `CONNECTIONS__SERVICE_CONNECTION__SETTINGS__CLIENTSECRET` | Azure Portal → **App registrations** → your app → **Certificates & secrets** → create a new client secret | +| `CONNECTIONS__SERVICE_CONNECTION__SETTINGS__TENANTID` | Azure Portal → **App registrations** → your app → **Directory (tenant) ID** | + +> For local development without Teams/M365 auth you can leave these as the placeholder values. + +### Azure OpenAI + +| Variable | Where to get it | +|---|---| +| `AZURE_OPENAI_ENDPOINT` | Azure Portal → **Azure OpenAI** resource → **Keys and Endpoint** → copy the endpoint URL | +| `AZURE_OPENAI_API_KEY` | Same page → copy **Key 1** or **Key 2** | +| `AZURE_OPENAI_DEPLOYMENT` | Azure Portal → **Azure OpenAI** → **Model deployments** → the deployment name (e.g. `gpt-4o-mini`) | +| `AZURE_OPENAI_API_VERSION` | Use `2024-12-01-preview` or a later supported version | + +### OpenWeatherMap + +| Variable | Where to get it | +|---|---| +| `OPENWEATHER_API_KEY` | → sign up for a free account, then copy the API key from your account page | + +### Observability (pre-configured defaults) + +The remaining variables in `.env.example` point to the local Aspire dashboard and enable OpenTelemetry export. The defaults work out of the box when the Docker container from Step 1 is running. No changes needed unless you want to send telemetry elsewhere (e.g. Azure Monitor via `APPLICATIONINSIGHTS_CONNECTION_STRING`). + +## 4. Run the Agent + +```bash +cd test_samples/weather-agent-framework +python app.py +``` + +The agent starts at **http://localhost:3978/api/messages**. + +## 5. Test the Agent + +Use [M365 Agents Playground](https://github.com/OfficeDev/microsoft-365-agents-toolkit) or a Teams channel pointed at a dev tunnel: + +```bash +# Dev tunnel setup (optional, for Teams testing) +devtunnel user login +devtunnel create my-tunnel -a +devtunnel port create -p 3978 my-tunnel +devtunnel host -a my-tunnel +``` + +Example queries: +- *"What's the weather in Seattle, Washington?"* +- *"What's the forecast for New York, New York?"* +- *"What's the current date and time?"* + +## Project Structure + +``` +weather-agent-framework/ +├── app.py # aiohttp entry point +├── .env.example # Environment variable template +├── requirements.txt # Python dependencies +├── agents/ +│ ├── __init__.py +│ └── weather_agent.py # Agent logic (Azure OpenAI + tool calls) +├── tools/ +│ ├── __init__.py +│ ├── weather_tools.py # Current weather & 5-day forecast +│ └── datetime_tools.py # Current date/time helper +└── telemetry/ + ├── __init__.py + ├── agent_otel_extensions.py # OTel setup helpers + ├── agent_metrics.py # Custom metrics + └── a365_otel_wrapper.py # A365 observability wrapper +``` + +## Further Reading + +- [Microsoft 365 Agents SDK Documentation](https://learn.microsoft.com/en-us/microsoft-365/agents-sdk/) +- [Azure OpenAI Service](https://learn.microsoft.com/en-us/azure/ai-services/openai/) +- [OpenWeatherMap API](https://openweathermap.org/api) +- [.NET Aspire Dashboard](https://learn.microsoft.com/en-us/dotnet/aspire/fundamentals/dashboard/standalone) diff --git a/test_samples/weather-agent-framework/agents/__init__.py b/test_samples/weather-agent-framework/agents/__init__.py new file mode 100644 index 00000000..942727eb --- /dev/null +++ b/test_samples/weather-agent-framework/agents/__init__.py @@ -0,0 +1,3 @@ +from .weather_agent import WeatherAgent + +__all__ = ["WeatherAgent"] diff --git a/test_samples/weather-agent-framework/agents/weather_agent.py b/test_samples/weather-agent-framework/agents/weather_agent.py new file mode 100644 index 00000000..86a45d90 --- /dev/null +++ b/test_samples/weather-agent-framework/agents/weather_agent.py @@ -0,0 +1,253 @@ +""" +Weather Agent implementation. + +Handles incoming messages and uses Azure OpenAI to process +user requests with weather lookup tools. +""" +import json +import traceback +from os import environ +from openai import AzureOpenAI +from azure.identity import DefaultAzureCredential + +from microsoft_agents.hosting.core import Authorization, TurnContext, TurnState, StoreItem + +from tools.weather_tools import get_current_weather_for_location, get_weather_forecast_for_location +from tools.datetime_tools import get_date_time +from telemetry import invoke_observed_agent_operation_with_context + +_WELCOME_MESSAGE = "Hello! I'm your Weather Agent. Ask me about the current weather or forecast for any city." +_INSTRUCTIONS = ( + "You are a helpful weather assistant. Use the provided tools to look up " + "current weather conditions and forecasts for locations requested by the user." +) + + +_TOOLS = [ + { + "type": "function", + "function": { + "name": "get_current_weather_for_location", + "description": "Retrieves the current weather for a location", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "The city name", + }, + "state": { + "type": "string", + "description": "The US state name or empty string for international cities", + }, + }, + "required": ["location"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "get_weather_forecast_for_location", + "description": "Retrieves the 5-day weather forecast for a location", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "The city name", + }, + "state": { + "type": "string", + "description": "The US state name or empty string for international cities", + }, + }, + "required": ["location"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "get_date_time", + "description": "Get the current date and time", + "parameters": { + "type": "object", + "properties": { + "input_text": { + "type": "string", + "description": "User input (not used)", + } + }, + }, + }, + }, +] + + +class ConversationHistoryStoreItem(StoreItem): + """Wraps the OpenAI message list so it can be persisted via AgentState.""" + + def __init__(self, messages: list = None): + self.messages = messages or [] + + def store_item_to_json(self) -> dict: + return {"messages": self.messages} + + @staticmethod + def from_json_to_store_item(json_data: dict) -> "ConversationHistoryStoreItem": + return ConversationHistoryStoreItem(messages=json_data.get("messages", [])) + + +class WeatherAgent: + """Weather Agent that processes user messages with Azure OpenAI and weather tools.""" + + def __init__(self, user_authorization: Authorization=None): + self._user_authorization = user_authorization + endpoint = environ["AZURE_OPENAI_ENDPOINT"] + api_version = environ.get("AZURE_OPENAI_API_VERSION", "2024-12-01-preview") + api_key = environ.get("AZURE_OPENAI_API_KEY") + + if api_key: + self.openai_client = AzureOpenAI( + azure_endpoint=endpoint, + api_key=api_key, + api_version=api_version, + ) + else: + self.openai_client = AzureOpenAI( + azure_endpoint=endpoint, + azure_ad_token_provider=DefaultAzureCredential(), + api_version=api_version, + ) + + self._deployment = environ["AZURE_OPENAI_DEPLOYMENT"] + self._instructions = environ.get("AGENT_INSTRUCTIONS", _INSTRUCTIONS) + self._welcome_message = environ.get("AGENT_WELCOME_MESSAGE", _WELCOME_MESSAGE) + + print(f"✅ {environ.get('AGENT_NAME', 'WeatherAgent')} initialized") + + async def send_welcome(self, context: TurnContext, state: TurnState): + """Send a welcome message to new conversation members.""" + for member in context.activity.members_added or []: + if member.id != context.activity.recipient.id: + await context.send_activity(self._welcome_message) + + async def handle_message(self, context: TurnContext, state: TurnState): + """ + Process an incoming user message. + + Wraps the message logic with A365 observability — equivalent to + ``A365OtelWrapper.InvokeObservedAgentOperation`` in WeatherAgent.cs. + """ + user_text = (context.activity.text or "").strip() + + if not user_text: + return + + print(f"Received: {user_text}") + + async def _process(): + history_item = state.get_value( + "ConversationState.history", + lambda: ConversationHistoryStoreItem(), + target_cls=ConversationHistoryStoreItem, + ) + conversation_history = history_item.messages + conversation_history.append({"role": "user", "content": user_text}) + + messages = [ + {"role": "system", "content": self._instructions}, + *conversation_history, + ] + + response = self.openai_client.chat.completions.create( + model=self._deployment, + messages=messages, + tools=_TOOLS, + tool_choice="auto", + temperature=0.2, + ) + + response_message = response.choices[0].message + tool_calls = response_message.tool_calls + + if tool_calls: + conversation_history.append({ + "role": "assistant", + "content": response_message.content, + "tool_calls": [ + { + "id": tc.id, + "type": "function", + "function": { + "name": tc.function.name, + "arguments": tc.function.arguments, + }, + } + for tc in tool_calls + ], + }) + + for tool_call in tool_calls: + function_name = tool_call.function.name + function_args = json.loads(tool_call.function.arguments) + + print(f"Calling tool: {function_name}({function_args})") + + if function_name == "get_current_weather_for_location": + result = get_current_weather_for_location( + location=function_args.get("location", ""), + state=function_args.get("state", ""), + ) + elif function_name == "get_weather_forecast_for_location": + result = get_weather_forecast_for_location( + location=function_args.get("location", ""), + state=function_args.get("state", ""), + ) + elif function_name == "get_date_time": + result = get_date_time() + else: + result = f"Unknown function: {function_name}" + + conversation_history.append({ + "role": "tool", + "content": result, + "tool_call_id": tool_call.id, + }) + + second_response = self.openai_client.chat.completions.create( + model=self._deployment, + messages=[ + {"role": "system", "content": self._instructions}, + *conversation_history, + ], + temperature=0.2, + ) + final_message = second_response.choices[0].message.content + else: + final_message = response_message.content + + conversation_history.append({"role": "assistant", "content": final_message}) + + if len(conversation_history) > 10: + conversation_history = conversation_history[-10:] + + history_item.messages = conversation_history + state.set_value("ConversationState.history", history_item) + + await context.send_activity(final_message) + print("Sent response") + + try: + await invoke_observed_agent_operation_with_context( + "OnMessageActivity", + context, + state, + _process, + user_authorization=self._user_authorization, + ) + except Exception as e: + print(f"Error: {e}") + traceback.print_exc() + await context.send_activity(f"Sorry, I encountered an error: {str(e)}") diff --git a/test_samples/weather-agent-framework/app.py b/test_samples/weather-agent-framework/app.py new file mode 100644 index 00000000..7e03cb56 --- /dev/null +++ b/test_samples/weather-agent-framework/app.py @@ -0,0 +1,151 @@ +""" +Weather Agent using Microsoft 365 Agents SDK with aiohttp. +""" +import logging +from os import environ, path +from aiohttp import web +from aiohttp.web import Request, Response, Application +from dotenv import load_dotenv + +from agent_framework.observability import configure_otel_providers + +configure_otel_providers() + +logger = logging.getLogger(__name__) + +load_dotenv(path.join(path.dirname(__file__), ".env")) + +# --------------------------------------------------------------------------- +# OpenTelemetry — configure providers FIRST so that the proxy tracers/meters +# in telemetry/agent_metrics.py resolve to real implementations. +# Equivalent to ConfigureOpenTelemetry() called in Program.cs before building +# the host in the C# sample. +# --------------------------------------------------------------------------- +from telemetry import ( + configure_opentelemetry, + create_aiohttp_tracing_middleware, + setup_health_routes, +) + +configure_opentelemetry( + app_name=environ.get("AGENT_NAME", "WeatherAgent"), + environment=environ.get("ENVIRONMENT", "development"), + otlp_endpoint=environ.get("OTEL_EXPORTER_OTLP_ENDPOINT"), + azure_monitor_connection_string=environ.get("APPLICATIONINSIGHTS_CONNECTION_STRING"), +) + +# M365 Agents SDK imports +from microsoft_agents.hosting.core import ( + AgentApplication, + TurnContext, + TurnState, + MemoryStorage, +) +from microsoft_agents.hosting.aiohttp import ( + CloudAdapter, + jwt_authorization_middleware, + start_agent_process, +) +from microsoft_agents.hosting.core.app.oauth.authorization import Authorization +from microsoft_agents.activity import ActivityTypes, load_configuration_from_env +from microsoft_agents.authentication.msal import MsalConnectionManager +from agents import WeatherAgent + + +async def messages_endpoint(request: Request) -> Response: + """ + Handle POST requests to /api/messages endpoint. + + Args: + request: The incoming HTTP request. + + Returns: + HTTP response. + """ + agent: AgentApplication = request.app["agent_app"] + adapter: CloudAdapter = request.app["adapter"] + + return await start_agent_process(request, agent, adapter) + + +def create_app() -> Application: + """ + Create and configure the aiohttp application. + + Returns: + Configured aiohttp Application. + """ + agents_sdk_config = load_configuration_from_env(environ) + # Create storage + storage = MemoryStorage() + + # Create connection manager for MSAL-based authentication + connection_manager = MsalConnectionManager(**agents_sdk_config) + + # Create adapter + adapter = CloudAdapter(connection_manager=connection_manager) + + #Create authorization + authorization = Authorization(storage, connection_manager, **agents_sdk_config) + + # Create agent application + agent_app = AgentApplication[TurnState]( + storage=storage, + adapter=adapter, + authorization=authorization, + **agents_sdk_config + ) + + # Instantiate our weather agent, passing msal_auth for per-turn observability + # token caching — mirrors C# injecting IExporterTokenCache + # into WeatherAgent and passing UserAuthorization to InvokeObservedAgentOperation. + weather_agent = WeatherAgent(user_authorization=agent_app.auth) + + # Register event handlers + @agent_app.activity(ActivityTypes.message, auth_handlers=["AGENTIC"]) + async def on_message(context: TurnContext, state: TurnState): + aau_token = await agent_app.auth.get_token(context, "AGENTIC") + await weather_agent.handle_message(context, state) + + @agent_app.conversation_update("membersAdded") + async def on_members_added(context: TurnContext, state: TurnState): + await weather_agent.send_welcome(context, state) + + # Create aiohttp app with tracing middleware. + # Equivalent to AddAspNetCoreInstrumentation() + health-check filter in C#. + app = Application(middlewares=[create_aiohttp_tracing_middleware(), jwt_authorization_middleware]) + + # Add routes + app.router.add_post("/api/messages", messages_endpoint) + app.router.add_get("/", lambda _: Response(text="Weather Agent is running", status=200)) + + # Register /health and /alive endpoints (development only). + is_development = environ.get("ENVIRONMENT", "development").lower() == "development" + setup_health_routes(app, development=is_development) + + # Store agent components + app["agent_app"] = agent_app + app["adapter"] = adapter + app["agent_configuration"] = connection_manager.get_default_connection_configuration() + + return app + + +def main(): + """Main application entry point.""" + agent_name = environ.get("AGENT_NAME", "WeatherAgent") + host = environ.get("HOST", "localhost") + port = int(environ.get("PORT", 3978)) + + print(f"\n{'='*60}") + print(f"Starting {agent_name} (M365 SDK)") + print(f"{'='*60}") + print(f"Endpoint: http://{host}:{port}/api/messages") + print(f"{'='*60}\n") + + app = create_app() + web.run_app(app, host=host, port=port) + + +if __name__ == "__main__": + main() diff --git a/test_samples/weather-agent-framework/requirements.txt b/test_samples/weather-agent-framework/requirements.txt new file mode 100644 index 00000000..30fce16a --- /dev/null +++ b/test_samples/weather-agent-framework/requirements.txt @@ -0,0 +1,50 @@ +# M365 Agents SDK for Teams/M365 integration +#microsoft-agents-hosting-aiohttp +#microsoft-agents-authentication-msal # uncomment to enable MSAL-based auth + +# Azure services +azure-identity + +# OpenAI +openai + +# Web framework +aiohttp + +# Configuration +python-dotenv +pydantic + +# Weather API client +pyowm + +# Utilities +typing-extensions + +# OpenTelemetry — core (required for telemetry/) +opentelemetry-api +opentelemetry-sdk + +# OpenTelemetry — exporters +# OTLP (traces, metrics, logs) — for Aspire dashboard, Jaeger, etc. +opentelemetry-exporter-otlp +# Azure Monitor / Application Insights +azure-monitor-opentelemetry-exporter + +# OpenTelemetry — library auto-instrumentation +# aiohttp server (inbound) and client (outbound) +opentelemetry-instrumentation-aiohttp-server +opentelemetry-instrumentation-aiohttp-client +# requests library (outbound HTTP) +opentelemetry-instrumentation-requests +# Correlate Python log records with trace context +opentelemetry-instrumentation-logging + +# A365 observability — AgentFramework automatic instrumentation +# Provides AgentFrameworkInstrumentor used in telemetry/agent_otel_extensions.py +microsoft-agents-a365-runtime==0.2.1.dev43 +microsoft-agents-a365-observability-core==0.2.1.dev43 +microsoft-agents-a365-observability-extensions-agent-framework>=0.2.1.dev43 + +# Agent Framework +agent-framework-azure-ai diff --git a/test_samples/weather-agent-framework/telemetry/__init__.py b/test_samples/weather-agent-framework/telemetry/__init__.py new file mode 100644 index 00000000..73502d42 --- /dev/null +++ b/test_samples/weather-agent-framework/telemetry/__init__.py @@ -0,0 +1,73 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Telemetry package for the Python Weather Agent. + +Python port of sample-agent/telemetry (AgentMetrics.cs, A365OtelWrapper.cs, +AgentOTELExtensions.cs). + +Typical usage in app.py:: + + from telemetry import configure_opentelemetry, setup_health_routes + from telemetry import create_aiohttp_tracing_middleware + from telemetry import invoke_observed_agent_operation_with_context + + # 1. Configure providers once at startup (before any span/metric activity) + configure_opentelemetry(app_name="WeatherAgent", environment="development") + + # 2. Add tracing middleware to the aiohttp app + app = web.Application(middlewares=[create_aiohttp_tracing_middleware()]) + + # 3. Register /health and /alive endpoints (development only) + setup_health_routes(app, development=True) + + # 4. Wrap message handlers with observed operations + await invoke_observed_agent_operation_with_context( + "OnMessageActivity", turn_context, turn_state, handler_func + ) +""" + +from .a365_otel_wrapper import invoke_observed_agent_operation_with_context +from .agent_metrics import ( + SOURCE_NAME, + active_conversations, + finalize_message_handling_span, + initialize_message_handling_span, + invoke_observed_agent_operation, + invoke_observed_http_operation, + message_processed_counter, + message_processing_duration, + meter, + route_executed_counter, + route_execution_duration, + tracer, +) +from .agent_otel_extensions import ( + configure_opentelemetry, + create_aiohttp_tracing_middleware, + instrument_libraries, + setup_health_routes, +) + +__all__ = [ + # agent_metrics + "SOURCE_NAME", + "tracer", + "meter", + "message_processed_counter", + "route_executed_counter", + "message_processing_duration", + "route_execution_duration", + "active_conversations", + "initialize_message_handling_span", + "finalize_message_handling_span", + "invoke_observed_http_operation", + "invoke_observed_agent_operation", + # a365_otel_wrapper + "invoke_observed_agent_operation_with_context", + # agent_otel_extensions + "configure_opentelemetry", + "instrument_libraries", + "setup_health_routes", + "create_aiohttp_tracing_middleware", +] diff --git a/test_samples/weather-agent-framework/telemetry/a365_otel_wrapper.py b/test_samples/weather-agent-framework/telemetry/a365_otel_wrapper.py new file mode 100644 index 00000000..3ed4ea58 --- /dev/null +++ b/test_samples/weather-agent-framework/telemetry/a365_otel_wrapper.py @@ -0,0 +1,147 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Azure 365 observability wrapper for agent operations. + +Python port of A365OtelWrapper.cs from sample-agent/telemetry. + +The C# original depends on the internal +``Microsoft.Agents.A365.Observability`` packages which are .NET-only. +This Python port preserves the observable contract: + +1. Resolve the tenant ID and agent ID from the current turn activity. +2. Propagate them as `OpenTelemetry baggage + `_ so that + downstream services can consume the context. +3. Delegate to :func:`~telemetry.agent_metrics.invoke_observed_agent_operation` + for span creation and metric recording. +4. Cache the observability token per-turn using the activity-derived IDs, + equivalent to ``agentTokenCache.RegisterObservability()`` in C#. +""" + +import logging +import uuid +import jwt # PyJWT library +from typing import Awaitable, Callable, Optional + +from microsoft_agents.hosting.core import Authorization, TurnContext +from opentelemetry import baggage +from opentelemetry import context as otel_context + +from microsoft_agents.hosting.core import AccessTokenProviderBase + +from .agent_metrics import invoke_observed_agent_operation +from .token_cache import cache_agentic_token + +logger = logging.getLogger(__name__) + +_EMPTY_GUID = str(uuid.UUID(int=0)) + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +async def invoke_observed_agent_operation_with_context( + operation_name: str, + turn_context: TurnContext, + turn_state, + func: Callable[[], Awaitable[None]], + user_authorization: Optional[Authorization] = None, + auth_handler: Optional[str] = None, +) -> None: + """Wrap an agent operation with A365 observability baggage context. + + Equivalent to ``A365OtelWrapper.InvokeObservedAgentOperation()`` in C#. + + Resolves the tenant ID and agent ID from the activity, sets them as + OpenTelemetry baggage (equivalent to the C# ``BaggageBuilder``), caches + the observability token per-turn when ``user_authorization`` is provided (equivalent + to ``agentTokenCache.RegisterObservability()``), then delegates to + :func:`~telemetry.agent_metrics.invoke_observed_agent_operation` for span + management and metric recording. + + Args: + operation_name: Human-readable name of the operation / handler. + turn_context: The current :class:`TurnContext`. + turn_state: The current :class:`TurnState`. + func: Async function containing the agent logic to execute. + user_authorization: Optional MSAL authentication provider used to fetch and + cache the observability token for the activity-derived IDs. + """ + agent_id, tenant_id = await _resolve_tenant_and_agent_id(turn_context, user_authorization, auth_handler) + + if user_authorization is not None: + await _cache_observability_token(tenant_id, agent_id, user_authorization) + + async def _with_baggage(): + # Set tenant.id and agent.id as baggage — equivalent to BaggageBuilder + # in C#, which adds these values to the W3C baggage header so that + # downstream services can read them. + ctx = baggage.set_baggage("tenant.id", tenant_id) + ctx = baggage.set_baggage("agent.id", agent_id, context=ctx) + token = otel_context.attach(ctx) + try: + await func() + finally: + otel_context.detach(token) + + await invoke_observed_agent_operation(operation_name, turn_context, _with_baggage) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +async def _cache_observability_token(tenant_id: str, agent_id: str, authorization: Authorization) -> None: + """Fetch and cache the observability token using activity-derived IDs. + + Equivalent to ``agentTokenCache.RegisterObservability()`` in C#. + MSAL caches tokens internally, so repeated calls within the token lifetime + do not incur additional network requests. + """ + + try: + token = await authorization.get_token(tenant_id, agent_id) + cache_agentic_token(tenant_id, agent_id, token) + logger.debug("Observability token cached (tenant=%s, agent=%s)", tenant_id, agent_id) + except Exception as exc: + logger.warning("Failed to cache observability token: %s", exc) + + +async def _resolve_tenant_and_agent_id(turn_context: TurnContext, user_authorization: Optional[Authorization] = None, auth_handler: Optional[str] = None) -> tuple[str, str]: + """Extract tenant and agent IDs from the turn activity. + + Args: + turn_context: The current :class:`TurnContext`. + user_authorization: Optional MSAL authentication provider used to fetch the token. + auth_handler: Optional string representing the authentication handler. + + Returns: + ``(agent_id, tenant_id)`` as strings. + """ + activity = turn_context.activity + if activity is None: + return _EMPTY_GUID, _EMPTY_GUID + + agentic_token = await user_authorization.get_token(turn_context, auth_handler or "AGENTIC") if user_authorization else None + + + agent_id = activity.get_agentic_instance_id() if activity.is_agentic_request() else _get_app_id_from_token(agentic_token) + agent_id = agent_id or _EMPTY_GUID + + tenant_id = activity.conversation.tenant_id if activity.conversation and activity.conversation.tenant_id else _EMPTY_GUID + + return agent_id, tenant_id + +def _get_app_id_from_token(token: str) -> str: + """Extract the app ID from the JWT token.""" + try: + decoded = jwt.decode(token, options={"verify_signature": False}) + app_id = decoded.get("appid") or decoded.get("azp") or _EMPTY_GUID + return str(app_id) + except Exception as exc: + logger.warning("Failed to decode token for app ID extraction: %s", exc) + return _EMPTY_GUID diff --git a/test_samples/weather-agent-framework/telemetry/agent_metrics.py b/test_samples/weather-agent-framework/telemetry/agent_metrics.py new file mode 100644 index 00000000..e1fe4230 --- /dev/null +++ b/test_samples/weather-agent-framework/telemetry/agent_metrics.py @@ -0,0 +1,236 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Agent metrics and distributed tracing for OpenTelemetry instrumentation. + +Python port of AgentMetrics.cs from sample-agent/telemetry. + +Provides: +- An ActivitySource-equivalent tracer named "A365.AgentFramework" +- A Meter with the same name for counters, histograms, and up-down counters +- Helper functions to start/finalize message-handling spans +- Observed-operation wrappers (sync and async) +""" + +import logging +import time +from typing import Awaitable, Callable + +from opentelemetry import context as otel_context +from opentelemetry import metrics, trace +from opentelemetry.trace import StatusCode + +logger = logging.getLogger(__name__) + +# Equivalent to ActivitySource name in C# +SOURCE_NAME = "A365.AgentFramework" + +# Tracer — equivalent to `new ActivitySource(SourceName)` in C# +# Uses a ProxyTracer until configure_opentelemetry() sets a real TracerProvider. +tracer = trace.get_tracer(SOURCE_NAME, "1.0.0") + +# Meter — equivalent to `new Meter("A365.AgentFramework", "1.0.0")` in C# +meter = metrics.get_meter(SOURCE_NAME, "1.0.0") + +# --------------------------------------------------------------------------- +# Metrics — mirrors the static Counter/Histogram/UpDownCounter fields in C# +# --------------------------------------------------------------------------- + +message_processed_counter = meter.create_counter( + "agent.messages.processed", + unit="messages", + description="Number of messages processed by the agent", +) + +route_executed_counter = meter.create_counter( + "agent.routes.executed", + unit="routes", + description="Number of routes executed by the agent", +) + +message_processing_duration = meter.create_histogram( + "agent.message.processing.duration", + unit="ms", + description="Duration of message processing in milliseconds", +) + +route_execution_duration = meter.create_histogram( + "agent.route.execution.duration", + unit="ms", + description="Duration of route execution in milliseconds", +) + +active_conversations = meter.create_up_down_counter( + "agent.conversations.active", + unit="conversations", + description="Number of active conversations", +) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +def _get_activity_attrs(context) -> dict: + """Extract activity attributes from a TurnContext.""" + attrs: dict = {} + activity = getattr(context, "activity", None) + if activity is None: + return attrs + + attrs["Activity.Type"] = str(getattr(activity, "type", "") or "") + + # 'from' is a Python keyword; the SDK stores it as from_property or _from. + from_obj = ( + getattr(activity, "from_property", None) + or getattr(activity, "_from", None) + # getattr with string "from" works at runtime despite being a keyword + or getattr(activity, "from", None) + ) + attrs["Caller.Id"] = str(getattr(from_obj, "id", "") or "") + + conversation = getattr(activity, "conversation", None) + attrs["Conversation.Id"] = str(getattr(conversation, "id", "") or "") + attrs["Channel.Id"] = str(getattr(activity, "channel_id", "") or "") + + text = getattr(activity, "text", "") or "" + attrs["Message.Text.Length"] = len(text) + attrs["Message.Id"] = str(getattr(activity, "id", "") or "") + attrs["Message.Text"] = text[:200] # truncate to avoid oversized attributes + + return attrs + + +# --------------------------------------------------------------------------- +# Public API — mirrors AgentMetrics static methods in C# +# --------------------------------------------------------------------------- + +def initialize_message_handling_span(handler_name: str, context) -> trace.Span: + """Start a tracing span with contextual tags from the turn activity. + + Equivalent to ``InitializeMessageHandlingActivity()`` in C#. + + The caller is responsible for ending the span (use + ``finalize_message_handling_span`` or call ``span.end()`` directly). + + Args: + handler_name: Name used as the span name (e.g. ``"OnMessageActivity"``). + context: TurnContext whose activity fields are attached as span attributes. + + Returns: + A started (but not yet current) :class:`opentelemetry.trace.Span`. + """ + span = tracer.start_span(handler_name) + attrs = _get_activity_attrs(context) + + # Set individual attributes on the span (mirrors activity?.SetTag calls in C#) + for key in ("Activity.Type", "Caller.Id", "Conversation.Id", "Channel.Id"): + span.set_attribute(key, attrs.get(key, "")) + span.set_attribute("Message.Text.Length", attrs.get("Message.Text.Length", 0)) + # Tag whether the request came from an agentic caller + span.set_attribute("Agent.IsAgentic", bool(getattr(getattr(context, "activity", None), "is_agentic", False))) + + # Equivalent to activity?.AddEvent(new ActivityEvent("Message.Processed", ...)) + span.add_event( + "Message.Processed", + attributes={ + "Caller.Id": attrs.get("Caller.Id", ""), + "Channel.Id": attrs.get("Channel.Id", ""), + "Message.Id": attrs.get("Message.Id", ""), + "Message.Text": attrs.get("Message.Text", ""), + }, + ) + return span + + +def finalize_message_handling_span( + span: trace.Span, + context, + duration_ms: float, + success: bool, +) -> None: + """Record duration metrics and end the span. + + Equivalent to ``FinalizeMessageHandlingActivity()`` in C#. + + Args: + span: The span returned by :func:`initialize_message_handling_span`. + context: TurnContext used to label the metric dimensions. + duration_ms: Elapsed time in milliseconds. + success: ``True`` → span status OK; ``False`` → span status ERROR. + """ + attrs = _get_activity_attrs(context) + conversation_id = attrs.get("Conversation.Id") or "unknown" + channel_id = attrs.get("Channel.Id") or "unknown" + + message_processing_duration.record( + duration_ms, + {"Conversation.Id": conversation_id, "Channel.Id": channel_id}, + ) + + route_executed_counter.add( + 1, + {"Route.Type": "message_handler", "Conversation.Id": conversation_id}, + ) + + span.set_status(StatusCode.OK if success else StatusCode.ERROR) + span.end() + + +def invoke_observed_http_operation(operation_name: str, func: Callable) -> None: + """Wrap a synchronous callable with a tracing span. + + Equivalent to ``InvokeObservedHttpOperation()`` in C#. + + Args: + operation_name: Span name. + func: Synchronous callable to execute. + """ + with tracer.start_as_current_span(operation_name) as span: + try: + func() + span.set_status(StatusCode.OK) + except Exception as ex: + span.set_status(StatusCode.ERROR, str(ex)) + span.record_exception(ex) + raise + + +async def invoke_observed_agent_operation( + operation_name: str, + context, + func: Callable[[], Awaitable[None]], +) -> None: + """Async wrapper for an agent operation with full metrics and tracing. + + Equivalent to ``AgentMetrics.InvokeObservedAgentOperation()`` in C#. + + Increments the message-processed counter, opens a span, sets it as the + current context, awaits *func*, then records the duration and finalises + the span. + + Args: + operation_name: Span / operation name. + context: TurnContext used for span attributes and metric labels. + func: Async function containing the agent logic. + """ + message_processed_counter.add(1) + + span = initialize_message_handling_span(operation_name, context) + # Make the span the active span for the duration of the call + ctx = trace.set_span_in_context(span) + token = otel_context.attach(ctx) + + start_time = time.monotonic() + success = True + try: + await func() + except Exception as ex: + success = False + span.set_status(StatusCode.ERROR, str(ex)) + span.record_exception(ex) + raise + finally: + otel_context.detach(token) + duration_ms = (time.monotonic() - start_time) * 1000 + finalize_message_handling_span(span, context, duration_ms, success) diff --git a/test_samples/weather-agent-framework/telemetry/agent_otel_extensions.py b/test_samples/weather-agent-framework/telemetry/agent_otel_extensions.py new file mode 100644 index 00000000..2429504e --- /dev/null +++ b/test_samples/weather-agent-framework/telemetry/agent_otel_extensions.py @@ -0,0 +1,507 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""OpenTelemetry configuration helpers for the Python Weather Agent. + +Python port of AgentOTELExtensions.cs from sample-agent/telemetry. + +Adds common observability services: + - TracerProvider with resource metadata + - MeterProvider with custom agent meters + - aiohttp tracing middleware (excludes health-check paths) + - /health and /alive endpoint registration + +Exporter support (install the matching package to activate): + - OTLP (Aspire, Jaeger, etc.): + pip install opentelemetry-exporter-otlp-proto-grpc + - Azure Monitor / Application Insights: + pip install azure-monitor-opentelemetry-exporter + - Console (default when no other exporter is configured) + +To learn more about the local Aspire dashboard, see: + https://learn.microsoft.com/dotnet/aspire/fundamentals/dashboard/standalone +""" + +import logging +import os +import socket +from typing import Optional + +from opentelemetry import trace +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.sdk.resources import ( + SERVICE_INSTANCE_ID, + SERVICE_NAME, + SERVICE_VERSION, + Resource, +) +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.trace import StatusCode + +logger = logging.getLogger(__name__) + +try: + from microsoft_agents_a365.observability.extensions.agentframework import AgentFrameworkInstrumentor as _AgentFrameworkInstrumentor + from microsoft_agents_a365.observability.core.config import configure as _configure_agent_framework_observability + _HAS_AGENT_FRAMEWORK_INSTRUMENTOR = True +except ImportError as exc: + logger.debug( + "AgentFrameworkInstrumentor not available — Agent Framework-specific telemetry will be disabled. " + f"ImportError: {exc}" + ) + _HAS_AGENT_FRAMEWORK_INSTRUMENTOR = False + +HEALTH_ENDPOINT_PATH = "/health" +ALIVENESS_ENDPOINT_PATH = "/alive" + +_SERVICE_NAMESPACE = "Microsoft.Agents" +_SOURCE_NAME = "A365.AgentFramework" + + +# --------------------------------------------------------------------------- +# Primary public API +# --------------------------------------------------------------------------- + + +def instrument_libraries() -> None: + """Instrument common HTTP libraries for automatic OpenTelemetry tracing. + + Equivalent to the ``Use*Instrumentation()`` calls in the C# host builder. + Instruments: + - aiohttp server (inbound requests) + - aiohttp client (outbound requests) with URL enrichment hooks + - requests library (outbound requests) with URL enrichment hooks + + Call this once at startup, before the server starts handling requests. + Each instrumentor is guarded with a try/except so missing optional packages + do not prevent the server from starting. + """ + # aiohttp server + try: + from opentelemetry.instrumentation.aiohttp_server import AioHttpServerInstrumentor + AioHttpServerInstrumentor().instrument() + logger.debug("aiohttp server instrumentation enabled") + except ImportError: + logger.debug( + "opentelemetry-instrumentation-aiohttp-server not installed. " + "Install: pip install opentelemetry-instrumentation-aiohttp-server" + ) + + # aiohttp client + try: + import aiohttp + from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor + from opentelemetry.trace import Span + + def _aiohttp_client_request_hook(span: Span, params: aiohttp.TraceRequestStartParams): + if span and span.is_recording(): + span.set_attribute("http.url", str(params.url)) + + def _aiohttp_client_response_hook(span: Span, params): + if span and span.is_recording(): + span.set_attribute("http.url", str(params.url)) + + AioHttpClientInstrumentor().instrument( + request_hook=_aiohttp_client_request_hook, + response_hook=_aiohttp_client_response_hook, + ) + logger.debug("aiohttp client instrumentation enabled") + except ImportError: + logger.debug( + "opentelemetry-instrumentation-aiohttp-client not installed. " + "Install: pip install opentelemetry-instrumentation-aiohttp-client" + ) + + # requests library + try: + import requests as _requests + from opentelemetry.instrumentation.requests import RequestsInstrumentor + from opentelemetry.trace import Span + + def _requests_request_hook(span: Span, request: _requests.Request): + if span and span.is_recording(): + span.set_attribute("http.url", request.url) + + def _requests_response_hook(span: Span, request: _requests.Request, response: _requests.Response): + if span and span.is_recording(): + span.set_attribute("http.url", response.url) + + RequestsInstrumentor().instrument( + request_hook=_requests_request_hook, + response_hook=_requests_response_hook, + ) + logger.debug("requests instrumentation enabled") + except ImportError: + logger.debug( + "opentelemetry-instrumentation-requests not installed. " + "Install: pip install opentelemetry-instrumentation-requests" + ) + + # AgentFramework instrumentor (optional A365 package) + if _HAS_AGENT_FRAMEWORK_INSTRUMENTOR: + try: + from .token_cache import get_cached_agentic_token + + def _token_resolver(agent_id: str, tenant_id: str) -> str | None: + # Note: get_cached_agentic_token takes (tenant_id, agent_id) + return get_cached_agentic_token(tenant_id, agent_id) + + _configure_agent_framework_observability( + service_name="AgentFrameworkTracingWithAzureOpenAI", + service_namespace="AgentFrameworkTesting", + token_resolver=_token_resolver, + ) + try: + _AgentFrameworkInstrumentor().instrument( + skip_dep_check=True, + ) + except TypeError as exc: + # Older versions of the instrumentor may not support token_resolver + _AgentFrameworkInstrumentor().instrument(skip_dep_check=True) + logger.debug("AgentFramework instrumentation enabled") + except Exception as exc: + logger.warning("AgentFramework instrumentation failed: %s", exc) + + +def configure_opentelemetry( + app_name: str = "A365.AgentFramework", + environment: str = "development", + otlp_endpoint: Optional[str] = None, + azure_monitor_connection_string: Optional[str] = None, +) -> None: + """Configure global TracerProvider, MeterProvider, and logging bridge. + + Equivalent to ``ConfigureOpenTelemetry()`` in C#. + + Call this **once at startup**, before any code creates spans or records + metrics. The OTel proxy objects in *agent_metrics.py* will automatically + route to the real providers once this function has run. + + Args: + app_name: Service name shown in traces/dashboards. + environment: Deployment environment tag (e.g. ``"development"``, + ``"production"``). + otlp_endpoint: OTLP collector URL. Falls back to the + ``OTEL_EXPORTER_OTLP_ENDPOINT`` environment variable when *None*. + azure_monitor_connection_string: Application Insights connection + string. Falls back to ``APPLICATIONINSIGHTS_CONNECTION_STRING`` + when *None*. + """ + resource = Resource.create( + { + SERVICE_NAME: _SOURCE_NAME, + SERVICE_VERSION: "1.0.0", + SERVICE_INSTANCE_ID: socket.gethostname(), + "deployment.environment": environment, + "service.namespace": _SERVICE_NAMESPACE, + } + ) + + _configure_tracing(resource, app_name, otlp_endpoint, azure_monitor_connection_string) + _configure_metrics(resource, otlp_endpoint, azure_monitor_connection_string) + _configure_logging(resource, otlp_endpoint) + instrument_libraries() + + logger.info( + "OpenTelemetry configured — service: %s, environment: %s", + app_name, + environment, + ) + + +def setup_health_routes(app, development: bool = True) -> None: + """Register ``/health`` and ``/alive`` endpoints on an aiohttp Application. + + Equivalent to ``MapDefaultEndpoints()`` in C#. Mirrors the C# behaviour + of only registering the endpoints in non-production environments. + + Args: + app: :class:`aiohttp.web.Application` instance. + development: When ``False`` the endpoints are **not** registered + (matches the C# guard on ``IsDevelopment()``). + """ + if not development: + return + + from aiohttp import web + + async def health_handler(_request): + return web.Response( + text='{"status":"Healthy"}', + content_type="application/json", + ) + + async def alive_handler(_request): + return web.Response( + text='{"status":"Alive"}', + content_type="application/json", + ) + + app.router.add_get(HEALTH_ENDPOINT_PATH, health_handler) + app.router.add_get(ALIVENESS_ENDPOINT_PATH, alive_handler) + logger.info( + "Health check endpoints registered: %s, %s", + HEALTH_ENDPOINT_PATH, + ALIVENESS_ENDPOINT_PATH, + ) + + +def create_aiohttp_tracing_middleware(): + """Return an aiohttp middleware that traces every non-health-check request. + + Equivalent to the ``AddAspNetCoreInstrumentation()`` configuration in C#: + - Filters out ``/health`` and ``/alive`` paths. + - Enriches spans with ``http.request.body.size`` and ``user_agent`` on + request, and ``http.status_code`` / ``http.response.body.size`` on + response. + - Records exceptions and sets the span status accordingly. + """ + from aiohttp import web + + _tracer = trace.get_tracer(_SOURCE_NAME, "1.0.0") + + @web.middleware + async def tracing_middleware(request, handler): + # Exclude health check requests from tracing (mirrors C# filter lambda) + if request.path.startswith(HEALTH_ENDPOINT_PATH) or request.path.startswith( + ALIVENESS_ENDPOINT_PATH + ): + return await handler(request) + + span_name = f"{request.method} {request.path}" + with _tracer.start_as_current_span(span_name) as span: + # Enrich with request details — equivalent to EnrichWithHttpRequest + span.set_attribute("http.method", request.method) + span.set_attribute("http.url", str(request.url)) + span.set_attribute("http.request.body.size", request.content_length or 0) + user_agent = request.headers.get("User-Agent", "") + if user_agent: + span.set_attribute("user_agent", user_agent) + + try: + response = await handler(request) + # Enrich with response details — equivalent to EnrichWithHttpResponse + span.set_attribute("http.status_code", response.status) + span.set_attribute( + "http.response.body.size", response.content_length or 0 + ) + span.set_status(StatusCode.OK) + return response + except Exception as ex: + span.set_status(StatusCode.ERROR, str(ex)) + span.record_exception(ex) + raise + + return tracing_middleware + + +# --------------------------------------------------------------------------- +# Internal helpers — mirrors private methods in AgentOTELExtensions.cs +# --------------------------------------------------------------------------- + + +def _configure_tracing( + resource: Resource, + app_name: str, + otlp_endpoint: Optional[str], + azure_monitor_connection_string: Optional[str], +) -> None: + """Build and register the global TracerProvider.""" + tracer_provider = TracerProvider(resource=resource) + + has_real_exporter = False + + # OTLP exporter — equivalent to UseOtlpExporter() in C# + resolved_otlp = otlp_endpoint or os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") + if resolved_otlp: + _add_otlp_trace_exporter(tracer_provider, resolved_otlp) + has_real_exporter = True + + # Azure Monitor exporter — equivalent to UseAzureMonitor() in C# + resolved_az = azure_monitor_connection_string or os.environ.get( + "APPLICATIONINSIGHTS_CONNECTION_STRING" + ) + if resolved_az: + _add_azure_monitor_trace_exporter(tracer_provider, resolved_az) + has_real_exporter = True + + # Console exporter for local development (no production exporter configured) + if not has_real_exporter: + _add_console_trace_exporter(tracer_provider) + + trace.set_tracer_provider(tracer_provider) + + +def _configure_metrics( + resource: Resource, + otlp_endpoint: Optional[str], + azure_monitor_connection_string: Optional[str], +) -> None: + """Build and register the global MeterProvider.""" + from opentelemetry import metrics + + readers = [] + + resolved_otlp = otlp_endpoint or os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") + if resolved_otlp: + reader = _create_otlp_metric_reader(resolved_otlp) + if reader: + readers.append(reader) + + resolved_az = azure_monitor_connection_string or os.environ.get( + "APPLICATIONINSIGHTS_CONNECTION_STRING" + ) + if resolved_az: + reader = _create_azure_monitor_metric_reader(resolved_az) + if reader: + readers.append(reader) + + if not readers: + readers.append(_create_console_metric_reader()) + + meter_provider = MeterProvider(resource=resource, metric_readers=readers) + metrics.set_meter_provider(meter_provider) + + +def _configure_logging( + resource: Resource, + otlp_endpoint: Optional[str], +) -> None: + """Configure OTEL LoggerProvider and bridge Python logging. + + Equivalent to ``builder.Logging.AddOpenTelemetry(...)`` in C#. + + When an OTLP endpoint is available, a full LoggerProvider is created and + log records are exported via OTLP (matching the otel sample behaviour). + In all cases, Python log records are correlated with the active trace + context when ``opentelemetry-instrumentation-logging`` is installed. + + Args: + resource: The shared OTel Resource created in configure_opentelemetry. + otlp_endpoint: OTLP collector URL, or None to skip OTLP log export. + """ + resolved_otlp = otlp_endpoint or os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") + if resolved_otlp: + try: + from opentelemetry._logs import set_logger_provider + from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler + from opentelemetry.sdk._logs.export import BatchLogRecordProcessor + from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter + + logger_provider = LoggerProvider(resource=resource) + logger_provider.add_log_record_processor( + BatchLogRecordProcessor(OTLPLogExporter(endpoint=resolved_otlp)) + ) + set_logger_provider(logger_provider) + + handler = LoggingHandler(level=logging.NOTSET, logger_provider=logger_provider) + logging.getLogger().addHandler(handler) + logger.info("OTLP log exporter → %s", resolved_otlp) + except ImportError: + logger.warning( + "OTLP log exporter requested but package not found. " + "Install: pip install opentelemetry-exporter-otlp-proto-grpc" + ) + + # Bridge Python logging to trace context (adds trace_id/span_id to log records) + try: + from opentelemetry.instrumentation.logging import LoggingInstrumentor + + LoggingInstrumentor().instrument(set_logging_format=True) + logger.debug("OTEL logging instrumentation enabled") + except ImportError: + logger.debug( + "opentelemetry-instrumentation-logging not installed — " + "Python log records will not be correlated with traces. " + "Install with: pip install opentelemetry-instrumentation-logging" + ) + + +# -- Trace exporter helpers -------------------------------------------------- + + +def _add_otlp_trace_exporter( + tracer_provider: TracerProvider, endpoint: str +) -> None: + try: + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, + ) + + exporter = OTLPSpanExporter(endpoint=endpoint) + tracer_provider.add_span_processor(BatchSpanProcessor(exporter)) + logger.info("OTLP trace exporter → %s", endpoint) + except ImportError: + logger.warning( + "OTLP trace exporter requested but package not found. " + "Install: pip install opentelemetry-exporter-otlp-proto-grpc" + ) + + +def _add_azure_monitor_trace_exporter( + tracer_provider: TracerProvider, connection_string: str +) -> None: + try: + from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter + + exporter = AzureMonitorTraceExporter(connection_string=connection_string) + tracer_provider.add_span_processor(BatchSpanProcessor(exporter)) + logger.info("Azure Monitor trace exporter configured") + except ImportError: + logger.warning( + "Azure Monitor trace exporter requested but package not found. " + "Install: pip install azure-monitor-opentelemetry-exporter" + ) + + +def _add_console_trace_exporter(tracer_provider: TracerProvider) -> None: + from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor + + tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) + logger.debug("Console span exporter active (development mode)") + + +# -- Metric reader helpers ---------------------------------------------------- + + +def _create_otlp_metric_reader(endpoint: str) -> Optional[PeriodicExportingMetricReader]: + try: + from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( + OTLPMetricExporter, + ) + + exporter = OTLPMetricExporter(endpoint=endpoint) + logger.info("OTLP metric exporter → %s", endpoint) + return PeriodicExportingMetricReader(exporter) + except ImportError: + logger.warning( + "OTLP metric exporter requested but package not found. " + "Install: pip install opentelemetry-exporter-otlp-proto-grpc" + ) + return None + + +def _create_azure_monitor_metric_reader( + connection_string: str, +) -> Optional[PeriodicExportingMetricReader]: + try: + from azure.monitor.opentelemetry.exporter import AzureMonitorMetricsExporter + + exporter = AzureMonitorMetricsExporter(connection_string=connection_string) + logger.info("Azure Monitor metric exporter configured") + return PeriodicExportingMetricReader(exporter) + except ImportError: + logger.warning( + "Azure Monitor metric exporter requested but package not found. " + "Install: pip install azure-monitor-opentelemetry-exporter" + ) + return None + + +def _create_console_metric_reader() -> PeriodicExportingMetricReader: + from opentelemetry.sdk.metrics.export import ConsoleMetricExporter + + logger.debug("Console metric exporter active (development mode)") + return PeriodicExportingMetricReader(ConsoleMetricExporter()) diff --git a/test_samples/weather-agent-framework/telemetry/token_cache.py b/test_samples/weather-agent-framework/telemetry/token_cache.py new file mode 100644 index 00000000..31339a13 --- /dev/null +++ b/test_samples/weather-agent-framework/telemetry/token_cache.py @@ -0,0 +1,31 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +""" +Token caching utilities for Agent 365 Observability exporter authentication. +""" + +import logging + +logger = logging.getLogger(__name__) + +# Global token cache for Agent 365 Observability exporter +_agentic_token_cache = {} + + +def cache_agentic_token(tenant_id: str, agent_id: str, token: str) -> None: + """Cache the agentic token for use by Agent 365 Observability exporter.""" + key = f"{tenant_id}:{agent_id}" + _agentic_token_cache[key] = token + logger.debug(f"Cached agentic token for {key}") + + +def get_cached_agentic_token(tenant_id: str, agent_id: str) -> str | None: + """Retrieve cached agentic token for Agent 365 Observability exporter.""" + key = f"{tenant_id}:{agent_id}" + token = _agentic_token_cache.get(key) + if token: + logger.debug(f"Retrieved cached agentic token for {key}") + else: + logger.debug(f"No cached token found for {key}") + return token \ No newline at end of file diff --git a/test_samples/weather-agent-framework/tools/__init__.py b/test_samples/weather-agent-framework/tools/__init__.py new file mode 100644 index 00000000..dce73e80 --- /dev/null +++ b/test_samples/weather-agent-framework/tools/__init__.py @@ -0,0 +1,9 @@ +"""Tools package for Weather Agent.""" +from .weather_tools import get_current_weather_for_location, get_weather_forecast_for_location +from .datetime_tools import get_date_time + +__all__ = [ + "get_current_weather_for_location", + "get_weather_forecast_for_location", + "get_date_time", +] diff --git a/test_samples/weather-agent-framework/tools/datetime_tools.py b/test_samples/weather-agent-framework/tools/datetime_tools.py new file mode 100644 index 00000000..2f746b08 --- /dev/null +++ b/test_samples/weather-agent-framework/tools/datetime_tools.py @@ -0,0 +1,25 @@ +"""DateTime helper tools.""" +from datetime import datetime +from typing import Annotated +from pydantic import Field + + +def get_date_time( + input_text: Annotated[str, Field(description="User input (not used, can be empty)")] = "" +) -> str: + """ + Get the current date and time. + + Args: + input_text: Optional user input (not used by this function). + + Returns: + A formatted string with the current date and time. + """ + now = datetime.now() + # Format similar to C#: DateTimeOffset.Now.ToString("D", null) + # "D" format is long date pattern + formatted_date = now.strftime("%A, %B %d, %Y") + formatted_time = now.strftime("%I:%M:%S %p") + + return f"{formatted_date} at {formatted_time}" diff --git a/test_samples/weather-agent-framework/tools/weather_tools.py b/test_samples/weather-agent-framework/tools/weather_tools.py new file mode 100644 index 00000000..d8fa868d --- /dev/null +++ b/test_samples/weather-agent-framework/tools/weather_tools.py @@ -0,0 +1,138 @@ +"""Weather lookup tools using OpenWeatherMap API.""" +from os import environ +from typing import Annotated, Dict, List +from pydantic import Field +from pyowm import OWM +from pyowm.weatherapi30.weather import Weather +from pyowm.weatherapi30.forecast import Forecast + + +def get_current_weather_for_location( + location: Annotated[str, Field(description="The city name")], + state: Annotated[str, Field(description="The US state name or empty string for international cities")] +) -> str: + """ + Retrieves the current weather for a specified location. + + This function uses the OpenWeatherMap API to fetch current weather data for a given city and state. + + Args: + location: The name of the city for which to retrieve the weather. + state: The name of the state where the city is located (US only, empty for international). + + Returns: + A formatted string containing the current weather details including temperature, + conditions, humidity, and wind speed. + + Raises: + ValueError: If the location cannot be found or API key is invalid. + """ + print(f"Looking up current weather in {location}, {state if state else 'international'}") + + try: + # Initialize OpenWeatherMap client + owm = OWM(environ["OPENWEATHER_API_KEY"]) + mgr = owm.weather_manager() + + # Build location query + query = f"{location},{state},US" if state else location + + # Get current weather + observation = mgr.weather_at_place(query) + weather: Weather = observation.weather + + # Extract weather details + temp = weather.temperature('fahrenheit') + current_temp = temp.get('temp', 'N/A') + temp_min = temp.get('temp_min', 'N/A') + temp_max = temp.get('temp_max', 'N/A') + + humidity = weather.humidity + wind = weather.wind().get('speed', 'N/A') + status = weather.detailed_status + + # Format response + result = f"""Current Weather for {location}, {state if state else 'international'}: +- Temperature: {current_temp}°F +- Low: {temp_min}°F / High: {temp_max}°F +- Conditions: {status.capitalize()} +- Humidity: {humidity}% +- Wind Speed: {wind} mph""" + + print(f"Successfully retrieved weather for {location}") + return result + + except Exception as e: + error_msg = f"Unable to retrieve weather for {location}, {state}: {str(e)}" + print(error_msg) + return error_msg + + +def get_weather_forecast_for_location( + location: Annotated[str, Field(description="The city name")], + state: Annotated[str, Field(description="The US state name or empty string for international cities")] +) -> str: + """ + Retrieves the 5-day weather forecast for a specified location. + + This function uses the OpenWeatherMap API to fetch forecast data for a given city and state. + + Args: + location: The name of the city for which to retrieve the forecast. + state: The name of the state where the city is located (US only, empty for international). + + Returns: + A formatted string containing the 5-day forecast with dates, temperatures, and conditions. + + Raises: + ValueError: If the location cannot be found or API key is invalid. + """ + print(f"Looking up weather forecast in {location}, {state if state else 'international'}") + + try: + # Initialize OpenWeatherMap client + owm = OWM(environ["OPENWEATHER_API_KEY"]) + mgr = owm.weather_manager() + + # Build location query + query = f"{location},{state},US" if state else location + + # Get forecast + forecast: Forecast = mgr.forecast_at_place(query, '3h').forecast + + # Process forecast data - get daily forecasts + daily_forecasts: Dict[str, List[Weather]] = {} + + for weather in forecast.weathers: + date_str = weather.reference_time('iso').split('T')[0] + if date_str not in daily_forecasts: + daily_forecasts[date_str] = [] + daily_forecasts[date_str].append(weather) + + # Format forecast for up to 5 days + result_lines = [f"5-Day Weather Forecast for {location}, {state if state else 'international'}:\n"] + + for i, (date, weathers) in enumerate(list(daily_forecasts.items())[:5]): + # Get temperature range for the day + temps = [w.temperature('fahrenheit').get('temp', 0) for w in weathers] + temp_min = min(temps) if temps else 'N/A' + temp_max = max(temps) if temps else 'N/A' + + # Get most common weather condition + statuses = [w.detailed_status for w in weathers] + status = max(set(statuses), key=statuses.count) if statuses else 'Unknown' + + result_lines.append( + f"Day {i+1} ({date}):\n" + f" Low: {temp_min:.1f}°F / High: {temp_max:.1f}°F\n" + f" Conditions: {status.capitalize()}" + ) + + result = "\n".join(result_lines) + print(f"Successfully retrieved forecast for {location}") + return result + + except Exception as e: + error_msg = f"Unable to retrieve forecast for {location}, {state}: {str(e)}" + print(error_msg) + return error_msg