From 318a0ceffffef8eb3809bdba6d8bcccf5739ed68 Mon Sep 17 00:00:00 2001 From: Axel Suarez Martinez Date: Mon, 3 Nov 2025 21:38:19 -0800 Subject: [PATCH 1/6] Http framework integration refactor WIP --- .../hosting/aiohttp/_start_agent_process.py | 25 +- .../hosting/aiohttp/agent_http_adapter.py | 16 +- .../aiohttp/channel_service_route_table.py | 143 +++---- .../hosting/aiohttp/cloud_adapter.py | 131 ++----- .../microsoft_agents/hosting/core/__init__.py | 14 + .../microsoft_agents/hosting/core/http.py | 360 ++++++++++++++++++ .../hosting/fastapi/_start_agent_process.py | 25 +- .../hosting/fastapi/agent_http_adapter.py | 11 +- .../fastapi/channel_service_route_table.py | 144 +++---- .../hosting/fastapi/cloud_adapter.py | 130 ++----- 10 files changed, 592 insertions(+), 407 deletions(-) create mode 100644 libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http.py diff --git a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/_start_agent_process.py b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/_start_agent_process.py index 0aa93f8b..d135d866 100644 --- a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/_start_agent_process.py +++ b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/_start_agent_process.py @@ -1,6 +1,12 @@ from typing import Optional + from aiohttp.web import Request, Response -from microsoft_agents.hosting.core.app import AgentApplication + +from microsoft_agents.hosting.core import ( + AgentApplication, + start_agent_process as core_start_agent_process, +) + from .cloud_adapter import CloudAdapter @@ -9,18 +15,5 @@ async def start_agent_process( agent_application: AgentApplication, adapter: CloudAdapter, ) -> Optional[Response]: - """Starts the agent host with the provided adapter and agent application. - Args: - adapter (CloudAdapter): The adapter to use for the agent host. - agent_application (AgentApplication): The agent application to run. - """ - if not adapter: - raise TypeError("start_agent_process: adapter can't be None") - if not agent_application: - raise TypeError("start_agent_process: agent_application can't be None") - - # Start the agent application with the provided adapter - return await adapter.process( - request, - agent_application, - ) + """Starts the agent host with the provided adapter and agent application.""" + return await core_start_agent_process(request, agent_application, adapter) diff --git a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/agent_http_adapter.py b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/agent_http_adapter.py index a0351aa0..8896f8f3 100644 --- a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/agent_http_adapter.py +++ b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/agent_http_adapter.py @@ -1,18 +1,12 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -from abc import abstractmethod -from typing import Optional, Protocol +from typing import Protocol -from aiohttp.web import ( - Request, - Response, -) +from aiohttp.web import Request, Response -from microsoft_agents.hosting.core import Agent +from microsoft_agents.hosting.core import AgentHttpAdapterProtocol -class AgentHttpAdapter(Protocol): - @abstractmethod - async def process(self, request: Request, agent: Agent) -> Optional[Response]: - raise NotImplementedError() +class AgentHttpAdapter(AgentHttpAdapterProtocol[Request, Response], Protocol): + """Framework specific alias for the shared AgentHttpAdapter protocol.""" diff --git a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/channel_service_route_table.py b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/channel_service_route_table.py index 4a8a193b..bace245b 100644 --- a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/channel_service_route_table.py +++ b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/channel_service_route_table.py @@ -1,102 +1,80 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. import json -from typing import List, Union, Type +from typing import Any -from aiohttp.web import RouteTableDef, Request, Response +from aiohttp.web import HTTPUnsupportedMediaType, Request, Response, RouteTableDef -from microsoft_agents.activity import ( - AgentsModel, - Activity, - AttachmentData, - ConversationParameters, - Transcript, +from microsoft_agents.hosting.core import ( + ChannelApiHandlerProtocol, + ChannelServiceOperations, + serialize_agents_model, ) -from microsoft_agents.hosting.core import ChannelApiHandlerProtocol -async def deserialize_from_body( - request: Request, target_model: Type[AgentsModel] -) -> Activity: - if "application/json" in request.headers["Content-Type"]: - body = await request.json() - else: - return Response(status=415) +async def _read_payload(request: Request) -> Any: + if "application/json" not in request.headers.get("Content-Type", ""): + raise HTTPUnsupportedMediaType() + return await request.json() - return target_model.model_validate(body) +def _json_response(result: Any) -> Response: + if result is None: + return Response() -def get_serialized_response( - model_or_list: Union[AgentsModel, List[AgentsModel]], -) -> Response: - if isinstance(model_or_list, AgentsModel): - json_obj = model_or_list.model_dump( - mode="json", exclude_unset=True, by_alias=True - ) - else: - json_obj = [ - model.model_dump(mode="json", exclude_unset=True, by_alias=True) - for model in model_or_list - ] - - return Response(body=json.dumps(json_obj), content_type="application/json") + payload = serialize_agents_model(result) + return Response(body=json.dumps(payload), content_type="application/json") def channel_service_route_table( handler: ChannelApiHandlerProtocol, base_url: str = "" ) -> RouteTableDef: - # pylint: disable=unused-variable routes = RouteTableDef() + operations = ChannelServiceOperations(handler) @routes.post(base_url + "/v3/conversations/{conversation_id}/activities") async def send_to_conversation(request: Request): - activity = await deserialize_from_body(request, Activity) - result = await handler.on_send_to_conversation( + payload = await _read_payload(request) + result = await operations.send_to_conversation( request.get("claims_identity"), request.match_info["conversation_id"], - activity, + payload, ) - - return get_serialized_response(result) + return _json_response(result) @routes.post( base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}" ) async def reply_to_activity(request: Request): - activity = await deserialize_from_body(request, Activity) - result = await handler.on_reply_to_activity( + payload = await _read_payload(request) + result = await operations.reply_to_activity( request.get("claims_identity"), request.match_info["conversation_id"], request.match_info["activity_id"], - activity, + payload, ) - - return get_serialized_response(result) + return _json_response(result) @routes.put( base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}" ) async def update_activity(request: Request): - activity = await deserialize_from_body(request, Activity) - result = await handler.on_update_activity( + payload = await _read_payload(request) + result = await operations.update_activity( request.get("claims_identity"), request.match_info["conversation_id"], request.match_info["activity_id"], - activity, + payload, ) - - return get_serialized_response(result) + return _json_response(result) @routes.delete( base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}" ) async def delete_activity(request: Request): - await handler.on_delete_activity( + await operations.delete_activity( request.get("claims_identity"), request.match_info["conversation_id"], request.match_info["activity_id"], ) - return Response() @routes.get( @@ -104,91 +82,82 @@ async def delete_activity(request: Request): + "/v3/conversations/{conversation_id}/activities/{activity_id}/members" ) async def get_activity_members(request: Request): - result = await handler.on_get_activity_members( + result = await operations.get_activity_members( request.get("claims_identity"), request.match_info["conversation_id"], request.match_info["activity_id"], ) - - return get_serialized_response(result) + return _json_response(result) @routes.post(base_url + "/") async def create_conversation(request: Request): - conversation_parameters = deserialize_from_body(request, ConversationParameters) - result = await handler.on_create_conversation( - request.get("claims_identity"), conversation_parameters + payload = await _read_payload(request) + result = await operations.create_conversation( + request.get("claims_identity"), + payload, ) - - return get_serialized_response(result) + return _json_response(result) @routes.get(base_url + "/") async def get_conversation(request: Request): - # TODO: continuation token? conversation_id? - result = await handler.on_get_conversations( - request.get("claims_identity"), None + result = await operations.get_conversations( + request.get("claims_identity"), + None, ) - - return get_serialized_response(result) + return _json_response(result) @routes.get(base_url + "/v3/conversations/{conversation_id}/members") async def get_conversation_members(request: Request): - result = await handler.on_get_conversation_members( + result = await operations.get_conversation_members( request.get("claims_identity"), request.match_info["conversation_id"], ) - - return get_serialized_response(result) + return _json_response(result) @routes.get(base_url + "/v3/conversations/{conversation_id}/members/{member_id}") async def get_conversation_member(request: Request): - result = await handler.on_get_conversation_member( + result = await operations.get_conversation_member( request.get("claims_identity"), request.match_info["member_id"], request.match_info["conversation_id"], ) - - return get_serialized_response(result) + return _json_response(result) @routes.get(base_url + "/v3/conversations/{conversation_id}/pagedmembers") async def get_conversation_paged_members(request: Request): - # TODO: continuation token? page size? - result = await handler.on_get_conversation_paged_members( + result = await operations.get_conversation_paged_members( request.get("claims_identity"), request.match_info["conversation_id"], ) - - return get_serialized_response(result) + return _json_response(result) @routes.delete(base_url + "/v3/conversations/{conversation_id}/members/{member_id}") async def delete_conversation_member(request: Request): - result = await handler.on_delete_conversation_member( + result = await operations.delete_conversation_member( request.get("claims_identity"), request.match_info["conversation_id"], request.match_info["member_id"], ) - - return get_serialized_response(result) + return _json_response(result) @routes.post(base_url + "/v3/conversations/{conversation_id}/activities/history") async def send_conversation_history(request: Request): - transcript = deserialize_from_body(request, Transcript) - result = await handler.on_send_conversation_history( + payload = await _read_payload(request) + result = await operations.send_conversation_history( request.get("claims_identity"), request.match_info["conversation_id"], - transcript, + payload, ) - - return get_serialized_response(result) + return _json_response(result) @routes.post(base_url + "/v3/conversations/{conversation_id}/attachments") async def upload_attachment(request: Request): - attachment_data = deserialize_from_body(request, AttachmentData) - result = await handler.on_upload_attachment( + payload = await _read_payload(request) + result = await operations.upload_attachment( request.get("claims_identity"), request.match_info["conversation_id"], - attachment_data, + payload, ) - - return get_serialized_response(result) + return _json_response(result) return routes diff --git a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/cloud_adapter.py b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/cloud_adapter.py index 1ef106c3..ae1a3bee 100644 --- a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/cloud_adapter.py +++ b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/cloud_adapter.py @@ -1,118 +1,67 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. -from traceback import format_exc -from typing import Optional +from typing import Any, Optional from aiohttp.web import ( - Request, - Response, - json_response, HTTPBadRequest, HTTPMethodNotAllowed, HTTPUnauthorized, HTTPUnsupportedMediaType, + Request, + Response, + json_response, ) -from microsoft_agents.hosting.core.authorization import ( - ClaimsIdentity, - Connections, -) -from microsoft_agents.activity import ( - Activity, - DeliveryModes, -) + from microsoft_agents.hosting.core import ( - Agent, - ChannelServiceAdapter, + CloudAdapterBase, + Connections, ChannelServiceClientFactoryBase, - MessageFactory, - RestChannelServiceClientFactory, - TurnContext, ) +from microsoft_agents.hosting.core.authorization import ClaimsIdentity from .agent_http_adapter import AgentHttpAdapter -class CloudAdapter(ChannelServiceAdapter, AgentHttpAdapter): +class CloudAdapter(CloudAdapterBase[Request, Response], AgentHttpAdapter): def __init__( self, *, - connection_manager: Connections = None, - channel_service_client_factory: ChannelServiceClientFactoryBase = None, - ): - """ - Initializes a new instance of the CloudAdapter class. - - :param channel_service_client_factory: The factory to use to create the channel service client. - """ - - async def on_turn_error(context: TurnContext, error: Exception): - error_message = f"Exception caught : {error}" - print(format_exc()) - - await context.send_activity(MessageFactory.text(error_message)) - - # Send a trace activity - await context.send_trace_activity( - "OnTurnError Trace", - error_message, - "https://www.botframework.com/schemas/error", - "TurnError", - ) - - self.on_turn_error = on_turn_error - - channel_service_client_factory = ( - channel_service_client_factory - or RestChannelServiceClientFactory(connection_manager) + connection_manager: Connections | None = None, + channel_service_client_factory: ChannelServiceClientFactoryBase | None = None, + ) -> None: + super().__init__( + connection_manager=connection_manager, + channel_service_client_factory=channel_service_client_factory, ) - super().__init__(channel_service_client_factory) + def _get_method(self, request: Request) -> str: + return request.method - async def process(self, request: Request, agent: Agent) -> Optional[Response]: - if not request: - raise TypeError("CloudAdapter.process: request can't be None") - if not agent: - raise TypeError("CloudAdapter.process: agent can't be None") + async def _read_json_body(self, request: Request) -> Any: + if "application/json" not in request.headers.get("Content-Type", ""): + raise self._unsupported_media_type_error(request) + return await request.json() - if request.method == "POST": - # Deserialize the incoming Activity - if "application/json" in request.headers["Content-Type"]: - body = await request.json() - else: - raise HTTPUnsupportedMediaType() + def _get_claims_identity(self, request: Request) -> ClaimsIdentity | None: + return request.get("claims_identity", self._default_claims_identity()) - activity: Activity = Activity.model_validate(body) + def _method_not_allowed_error(self, request: Request) -> Exception: + return HTTPMethodNotAllowed(request.method, ["POST"]) - # default to anonymous identity with no claims - claims_identity: ClaimsIdentity = request.get( - "claims_identity", ClaimsIdentity({}, False) - ) + def _unsupported_media_type_error(self, request: Request) -> Exception: + return HTTPUnsupportedMediaType() - # A POST request must contain an Activity - if ( - not activity.type - or not activity.conversation - or not activity.conversation.id - ): - raise HTTPBadRequest + def _bad_request_error(self, request: Request) -> Exception: + return HTTPBadRequest() - try: - # Process the inbound activity with the agent - invoke_response = await self.process_activity( - claims_identity, activity, agent.on_turn - ) + def _unauthorized_error(self, request: Request) -> Exception: + return HTTPUnauthorized() - if ( - activity.type == "invoke" - or activity.delivery_mode == DeliveryModes.expect_replies - ): - # Invoke and ExpectReplies cannot be performed async, the response must be written before the calling thread is released. - return json_response( - data=invoke_response.body, status=invoke_response.status - ) + def _create_invoke_response( + self, request: Request, invoke_response: Any + ) -> Response: + return json_response( + data=invoke_response.body, + status=invoke_response.status, + ) - return Response(status=202) - except PermissionError: - raise HTTPUnauthorized - else: - raise HTTPMethodNotAllowed + def _create_accepted_response(self, request: Request) -> Response: + return Response(status=202) diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/__init__.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/__init__.py index 90d6f0ec..5ae2c23e 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/__init__.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/__init__.py @@ -9,6 +9,14 @@ from .middleware_set import Middleware from .rest_channel_service_client_factory import RestChannelServiceClientFactory from .turn_context import TurnContext +from .http import ( + AgentHttpAdapterProtocol, + ChannelServiceOperations, + CloudAdapterBase, + parse_agents_model, + serialize_agents_model, + start_agent_process, +) # Application Style from .app._type_defs import RouteHandler, RouteSelector, StateT @@ -96,6 +104,12 @@ "Middleware", "RestChannelServiceClientFactory", "TurnContext", + "AgentHttpAdapterProtocol", + "ChannelServiceOperations", + "CloudAdapterBase", + "parse_agents_model", + "serialize_agents_model", + "start_agent_process", "AgentApplication", "ApplicationError", "ApplicationOptions", diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http.py new file mode 100644 index 00000000..abfe8841 --- /dev/null +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http.py @@ -0,0 +1,360 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +"""Shared HTTP hosting utilities used by framework-specific adapters.""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from collections.abc import Iterable +from traceback import format_exc +from typing import Any, Generic, Optional, Protocol, Type, TypeVar + +from microsoft_agents.activity import ( + Activity, + AgentsModel, + AttachmentData, + ConversationParameters, + DeliveryModes, + Transcript, +) + +from .authorization.claims_identity import ClaimsIdentity +from .authorization.connections import Connections +from .channel_api_handler_protocol import ChannelApiHandlerProtocol +from .channel_service_adapter import ChannelServiceAdapter +from .channel_service_client_factory_base import ChannelServiceClientFactoryBase +from .message_factory import MessageFactory +from .rest_channel_service_client_factory import RestChannelServiceClientFactory +from .turn_context import TurnContext + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: # pragma: no cover - imported for type checking only + from microsoft_agents.hosting.core.agent import Agent + from microsoft_agents.hosting.core.app.agent_application import AgentApplication + + +TModel = TypeVar("TModel", bound=AgentsModel) +RequestT = TypeVar("RequestT") +ResponseT = TypeVar("ResponseT") + + +class AgentHttpAdapterProtocol(Protocol, Generic[RequestT, ResponseT]): + """Protocol describing the contract for framework specific HTTP adapters.""" + + async def process(self, request: RequestT, agent: "Agent") -> Optional[ResponseT]: + raise NotImplementedError + + +async def start_agent_process( + request: RequestT, + agent_application: "AgentApplication", + adapter: AgentHttpAdapterProtocol[RequestT, ResponseT], +) -> Optional[ResponseT]: + """Start the agent process using the provided adapter and application.""" + if adapter is None: + raise TypeError("start_agent_process: adapter can't be None") + if agent_application is None: + raise TypeError("start_agent_process: agent_application can't be None") + + return await adapter.process(request, agent_application) + + +def parse_agents_model(payload: Any, model_type: Type[TModel]) -> TModel: + """Parse a payload into the requested AgentsModel derived type.""" + return model_type.model_validate(payload) + + +def serialize_agents_model(model_or_list: AgentsModel | Iterable[AgentsModel]) -> Any: + """Serialize AgentsModel instances into JSON serialisable structures.""" + if isinstance(model_or_list, AgentsModel): + return model_or_list.model_dump(mode="json", exclude_unset=True, by_alias=True) + + return [ + model.model_dump(mode="json", exclude_unset=True, by_alias=True) + for model in model_or_list + ] + + +class ChannelServiceOperations: + """Shared activity channel operations used by HTTP frameworks.""" + + def __init__(self, handler: ChannelApiHandlerProtocol) -> None: + self._handler = handler + + async def send_to_conversation( + self, + claims_identity: ClaimsIdentity, + conversation_id: str, + activity_payload: Any, + ): + activity = parse_agents_model(activity_payload, Activity) + return await self._handler.on_send_to_conversation( + claims_identity, + conversation_id, + activity, + ) + + async def reply_to_activity( + self, + claims_identity: ClaimsIdentity, + conversation_id: str, + activity_id: str, + activity_payload: Any, + ): + activity = parse_agents_model(activity_payload, Activity) + return await self._handler.on_reply_to_activity( + claims_identity, + conversation_id, + activity_id, + activity, + ) + + async def update_activity( + self, + claims_identity: ClaimsIdentity, + conversation_id: str, + activity_id: str, + activity_payload: Any, + ): + activity = parse_agents_model(activity_payload, Activity) + return await self._handler.on_update_activity( + claims_identity, + conversation_id, + activity_id, + activity, + ) + + async def delete_activity( + self, + claims_identity: ClaimsIdentity, + conversation_id: str, + activity_id: str, + ) -> None: + await self._handler.on_delete_activity( + claims_identity, + conversation_id, + activity_id, + ) + + async def get_activity_members( + self, + claims_identity: ClaimsIdentity, + conversation_id: str, + activity_id: str, + ): + return await self._handler.on_get_activity_members( + claims_identity, + conversation_id, + activity_id, + ) + + async def create_conversation( + self, + claims_identity: ClaimsIdentity, + parameters_payload: Any, + ): + parameters = parse_agents_model(parameters_payload, ConversationParameters) + return await self._handler.on_create_conversation( + claims_identity, + parameters, + ) + + async def get_conversations( + self, + claims_identity: ClaimsIdentity, + conversation_id: str | None = None, + ): + return await self._handler.on_get_conversations( + claims_identity, + conversation_id, + ) + + async def get_conversation_members( + self, + claims_identity: ClaimsIdentity, + conversation_id: str, + ): + return await self._handler.on_get_conversation_members( + claims_identity, + conversation_id, + ) + + async def get_conversation_member( + self, + claims_identity: ClaimsIdentity, + user_id: str, + conversation_id: str, + ): + return await self._handler.on_get_conversation_member( + claims_identity, + user_id, + conversation_id, + ) + + async def get_conversation_paged_members( + self, + claims_identity: ClaimsIdentity, + conversation_id: str, + page_size: int | None = None, + continuation_token: str | None = None, + ): + return await self._handler.on_get_conversation_paged_members( + claims_identity, + conversation_id, + page_size, + continuation_token, + ) + + async def delete_conversation_member( + self, + claims_identity: ClaimsIdentity, + conversation_id: str, + member_id: str, + ): + return await self._handler.on_delete_conversation_member( + claims_identity, + conversation_id, + member_id, + ) + + async def send_conversation_history( + self, + claims_identity: ClaimsIdentity, + conversation_id: str, + transcript_payload: Any, + ): + transcript = parse_agents_model(transcript_payload, Transcript) + return await self._handler.on_send_conversation_history( + claims_identity, + conversation_id, + transcript, + ) + + async def upload_attachment( + self, + claims_identity: ClaimsIdentity, + conversation_id: str, + attachment_payload: Any, + ): + attachment = parse_agents_model(attachment_payload, AttachmentData) + return await self._handler.on_upload_attachment( + claims_identity, + conversation_id, + attachment, + ) + + +class CloudAdapterBase(ChannelServiceAdapter, Generic[RequestT, ResponseT], ABC): + """Base implementation for framework specific CloudAdapter implementations.""" + + def __init__( + self, + *, + connection_manager: Connections | None = None, + channel_service_client_factory: ChannelServiceClientFactoryBase | None = None, + ) -> None: + async def on_turn_error(context: TurnContext, error: Exception) -> None: + error_message = f"Exception caught : {error}" + print(format_exc()) + + await context.send_activity(MessageFactory.text(error_message)) + + await context.send_trace_activity( + "OnTurnError Trace", + error_message, + "https://www.botframework.com/schemas/error", + "TurnError", + ) + + self.on_turn_error = on_turn_error + + factory = channel_service_client_factory or RestChannelServiceClientFactory( + connection_manager + ) + + super().__init__(factory) + + async def process( + self, request: RequestT, agent: "Agent" + ) -> Optional[ResponseT]: # pragma: no cover - exercised via subclasses + if request is None: + raise TypeError("CloudAdapter.process: request can't be None") + if agent is None: + raise TypeError("CloudAdapter.process: agent can't be None") + + if self._get_method(request) != "POST": + raise self._method_not_allowed_error(request) + + body = await self._read_json_body(request) + activity: Activity = Activity.model_validate(body) + + claims_identity = self._get_claims_identity(request) + if not claims_identity: + claims_identity = self._default_claims_identity() + + if ( + not activity.type + or not activity.conversation + or not activity.conversation.id + ): + raise self._bad_request_error(request) + + try: + invoke_response = await self.process_activity( + claims_identity, + activity, + agent.on_turn, + ) + except PermissionError as error: + raise self._unauthorized_error(request) from error + + if ( + activity.type == "invoke" + or activity.delivery_mode == DeliveryModes.expect_replies + ): + return self._create_invoke_response(request, invoke_response) + + return self._create_accepted_response(request) + + def _default_claims_identity(self) -> ClaimsIdentity: + return ClaimsIdentity({}, False) + + @abstractmethod + def _get_method(self, request: RequestT) -> str: + """Return the HTTP method for the incoming request.""" + + @abstractmethod + async def _read_json_body(self, request: RequestT) -> Any: + """Read and return the JSON payload.""" + + @abstractmethod + def _get_claims_identity(self, request: RequestT) -> ClaimsIdentity | None: + """Extract the claims identity from the request.""" + + @abstractmethod + def _method_not_allowed_error(self, request: RequestT) -> Exception: + """Return the exception raised when the request method is unsupported.""" + + @abstractmethod + def _unsupported_media_type_error(self, request: RequestT) -> Exception: + """Return the exception raised when the content type is unsupported.""" + + @abstractmethod + def _bad_request_error(self, request: RequestT) -> Exception: + """Return the exception raised when the request payload is invalid.""" + + @abstractmethod + def _unauthorized_error(self, request: RequestT) -> Exception: + """Return the exception raised when authorization fails.""" + + @abstractmethod + def _create_invoke_response( + self, request: RequestT, invoke_response: Any + ) -> ResponseT: + """Create the framework specific response for invoke results.""" + + @abstractmethod + def _create_accepted_response(self, request: RequestT) -> ResponseT: + """Create the framework specific HTTP 202 response.""" diff --git a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/_start_agent_process.py b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/_start_agent_process.py index 13396ca8..7d87573e 100644 --- a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/_start_agent_process.py +++ b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/_start_agent_process.py @@ -1,6 +1,12 @@ from typing import Optional + from fastapi import Request, Response -from microsoft_agents.hosting.core.app import AgentApplication + +from microsoft_agents.hosting.core import ( + AgentApplication, + start_agent_process as core_start_agent_process, +) + from .cloud_adapter import CloudAdapter @@ -9,18 +15,5 @@ async def start_agent_process( agent_application: AgentApplication, adapter: CloudAdapter, ) -> Optional[Response]: - """Starts the agent host with the provided adapter and agent application. - Args: - adapter (CloudAdapter): The adapter to use for the agent host. - agent_application (AgentApplication): The agent application to run. - """ - if not adapter: - raise TypeError("start_agent_process: adapter can't be None") - if not agent_application: - raise TypeError("start_agent_process: agent_application can't be None") - - # Start the agent application with the provided adapter - return await adapter.process( - request, - agent_application, - ) + """Starts the agent host with the provided adapter and agent application.""" + return await core_start_agent_process(request, agent_application, adapter) diff --git a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/agent_http_adapter.py b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/agent_http_adapter.py index 2584b272..f1a08425 100644 --- a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/agent_http_adapter.py +++ b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/agent_http_adapter.py @@ -1,15 +1,12 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -from abc import abstractmethod -from typing import Optional, Protocol +from typing import Protocol from fastapi import Request, Response -from microsoft_agents.hosting.core import Agent +from microsoft_agents.hosting.core import AgentHttpAdapterProtocol -class AgentHttpAdapter(Protocol): - @abstractmethod - async def process(self, request: Request, agent: Agent) -> Optional[Response]: - raise NotImplementedError() +class AgentHttpAdapter(AgentHttpAdapterProtocol[Request, Response], Protocol): + """Framework specific alias for the shared AgentHttpAdapter protocol.""" diff --git a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/channel_service_route_table.py b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/channel_service_route_table.py index 2dd009fc..e55c9076 100644 --- a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/channel_service_route_table.py +++ b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/channel_service_route_table.py @@ -1,64 +1,44 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. -import json -from typing import List, Union, Type +from typing import Any -from fastapi import APIRouter, Request, Response, HTTPException, Depends +from fastapi import APIRouter, HTTPException, Request, Response from fastapi.responses import JSONResponse -from microsoft_agents.activity import ( - AgentsModel, - Activity, - AttachmentData, - ConversationParameters, - Transcript, +from microsoft_agents.hosting.core import ( + ChannelApiHandlerProtocol, + ChannelServiceOperations, + serialize_agents_model, ) -from microsoft_agents.hosting.core import ChannelApiHandlerProtocol -async def deserialize_from_body( - request: Request, target_model: Type[AgentsModel] -) -> AgentsModel: - content_type = request.headers.get("Content-Type", "") - if "application/json" in content_type: - body = await request.json() - else: +async def _read_payload(request: Request) -> Any: + if "application/json" not in request.headers.get("Content-Type", ""): raise HTTPException(status_code=415, detail="Unsupported Media Type") + return await request.json() - return target_model.model_validate(body) +def _json_response(result: Any) -> Response: + if result is None: + return Response(status_code=200) -def get_serialized_response( - model_or_list: Union[AgentsModel, List[AgentsModel]], -) -> JSONResponse: - if isinstance(model_or_list, AgentsModel): - json_obj = model_or_list.model_dump( - mode="json", exclude_unset=True, by_alias=True - ) - else: - json_obj = [ - model.model_dump(mode="json", exclude_unset=True, by_alias=True) - for model in model_or_list - ] - - return JSONResponse(content=json_obj) + payload = serialize_agents_model(result) + return JSONResponse(content=payload) def channel_service_route_table( handler: ChannelApiHandlerProtocol, base_url: str = "" ) -> APIRouter: router = APIRouter() + operations = ChannelServiceOperations(handler) @router.post(base_url + "/v3/conversations/{conversation_id}/activities") async def send_to_conversation(conversation_id: str, request: Request): - activity = await deserialize_from_body(request, Activity) - result = await handler.on_send_to_conversation( + payload = await _read_payload(request) + result = await operations.send_to_conversation( getattr(request.state, "claims_identity", None), conversation_id, - activity, + payload, ) - - return get_serialized_response(result) + return _json_response(result) @router.post( base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}" @@ -66,40 +46,37 @@ async def send_to_conversation(conversation_id: str, request: Request): async def reply_to_activity( conversation_id: str, activity_id: str, request: Request ): - activity = await deserialize_from_body(request, Activity) - result = await handler.on_reply_to_activity( + payload = await _read_payload(request) + result = await operations.reply_to_activity( getattr(request.state, "claims_identity", None), conversation_id, activity_id, - activity, + payload, ) - - return get_serialized_response(result) + return _json_response(result) @router.put( base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}" ) async def update_activity(conversation_id: str, activity_id: str, request: Request): - activity = await deserialize_from_body(request, Activity) - result = await handler.on_update_activity( + payload = await _read_payload(request) + result = await operations.update_activity( getattr(request.state, "claims_identity", None), conversation_id, activity_id, - activity, + payload, ) - - return get_serialized_response(result) + return _json_response(result) @router.delete( base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}" ) async def delete_activity(conversation_id: str, activity_id: str, request: Request): - await handler.on_delete_activity( + await operations.delete_activity( getattr(request.state, "claims_identity", None), conversation_id, activity_id, ) - return Response(status_code=200) @router.get( @@ -109,97 +86,86 @@ async def delete_activity(conversation_id: str, activity_id: str, request: Reque async def get_activity_members( conversation_id: str, activity_id: str, request: Request ): - result = await handler.on_get_activity_members( + result = await operations.get_activity_members( getattr(request.state, "claims_identity", None), conversation_id, activity_id, ) - - return get_serialized_response(result) + return _json_response(result) @router.post(base_url + "/") async def create_conversation(request: Request): - conversation_parameters = await deserialize_from_body( - request, ConversationParameters - ) - result = await handler.on_create_conversation( - getattr(request.state, "claims_identity", None), conversation_parameters + payload = await _read_payload(request) + result = await operations.create_conversation( + getattr(request.state, "claims_identity", None), + payload, ) - - return get_serialized_response(result) + return _json_response(result) @router.get(base_url + "/") async def get_conversation(request: Request): - # TODO: continuation token? conversation_id? - result = await handler.on_get_conversations( - getattr(request.state, "claims_identity", None), None + result = await operations.get_conversations( + getattr(request.state, "claims_identity", None), + None, ) - - return get_serialized_response(result) + return _json_response(result) @router.get(base_url + "/v3/conversations/{conversation_id}/members") async def get_conversation_members(conversation_id: str, request: Request): - result = await handler.on_get_conversation_members( + result = await operations.get_conversation_members( getattr(request.state, "claims_identity", None), conversation_id, ) - - return get_serialized_response(result) + return _json_response(result) @router.get(base_url + "/v3/conversations/{conversation_id}/members/{member_id}") async def get_conversation_member( conversation_id: str, member_id: str, request: Request ): - result = await handler.on_get_conversation_member( + result = await operations.get_conversation_member( getattr(request.state, "claims_identity", None), member_id, conversation_id, ) - - return get_serialized_response(result) + return _json_response(result) @router.get(base_url + "/v3/conversations/{conversation_id}/pagedmembers") async def get_conversation_paged_members(conversation_id: str, request: Request): - # TODO: continuation token? page size? - result = await handler.on_get_conversation_paged_members( + result = await operations.get_conversation_paged_members( getattr(request.state, "claims_identity", None), conversation_id, ) - - return get_serialized_response(result) + return _json_response(result) @router.delete(base_url + "/v3/conversations/{conversation_id}/members/{member_id}") async def delete_conversation_member( conversation_id: str, member_id: str, request: Request ): - result = await handler.on_delete_conversation_member( + result = await operations.delete_conversation_member( getattr(request.state, "claims_identity", None), conversation_id, member_id, ) - - return get_serialized_response(result) + return _json_response(result) @router.post(base_url + "/v3/conversations/{conversation_id}/activities/history") async def send_conversation_history(conversation_id: str, request: Request): - transcript = await deserialize_from_body(request, Transcript) - result = await handler.on_send_conversation_history( + payload = await _read_payload(request) + result = await operations.send_conversation_history( getattr(request.state, "claims_identity", None), conversation_id, - transcript, + payload, ) - - return get_serialized_response(result) + return _json_response(result) @router.post(base_url + "/v3/conversations/{conversation_id}/attachments") async def upload_attachment(conversation_id: str, request: Request): - attachment_data = await deserialize_from_body(request, AttachmentData) - result = await handler.on_upload_attachment( + payload = await _read_payload(request) + result = await operations.upload_attachment( getattr(request.state, "claims_identity", None), conversation_id, - attachment_data, + payload, ) - - return get_serialized_response(result) + return _json_response(result) return router diff --git a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/cloud_adapter.py b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/cloud_adapter.py index 3383c793..961804d2 100644 --- a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/cloud_adapter.py +++ b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/cloud_adapter.py @@ -1,112 +1,62 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. -from traceback import format_exc -from typing import Optional +from typing import Any, Optional -from fastapi import Request, Response, HTTPException +from fastapi import HTTPException, Request, Response from fastapi.responses import JSONResponse -from microsoft_agents.hosting.core.authorization import ( - ClaimsIdentity, - Connections, -) -from microsoft_agents.activity import ( - Activity, - DeliveryModes, -) + from microsoft_agents.hosting.core import ( - Agent, - ChannelServiceAdapter, ChannelServiceClientFactoryBase, - MessageFactory, - RestChannelServiceClientFactory, - TurnContext, + CloudAdapterBase, + Connections, ) +from microsoft_agents.hosting.core.authorization import ClaimsIdentity from .agent_http_adapter import AgentHttpAdapter -class CloudAdapter(ChannelServiceAdapter, AgentHttpAdapter): +class CloudAdapter(CloudAdapterBase[Request, Response], AgentHttpAdapter): def __init__( self, *, - connection_manager: Connections = None, - channel_service_client_factory: ChannelServiceClientFactoryBase = None, - ): - """ - Initializes a new instance of the CloudAdapter class. - - :param channel_service_client_factory: The factory to use to create the channel service client. - """ - - async def on_turn_error(context: TurnContext, error: Exception): - error_message = f"Exception caught : {error}" - print(format_exc()) - - await context.send_activity(MessageFactory.text(error_message)) - - # Send a trace activity - await context.send_trace_activity( - "OnTurnError Trace", - error_message, - "https://www.botframework.com/schemas/error", - "TurnError", - ) - - self.on_turn_error = on_turn_error - - channel_service_client_factory = ( - channel_service_client_factory - or RestChannelServiceClientFactory(connection_manager) + connection_manager: Connections | None = None, + channel_service_client_factory: ChannelServiceClientFactoryBase | None = None, + ) -> None: + super().__init__( + connection_manager=connection_manager, + channel_service_client_factory=channel_service_client_factory, ) - super().__init__(channel_service_client_factory) + def _get_method(self, request: Request) -> str: + return request.method - async def process(self, request: Request, agent: Agent) -> Optional[Response]: - if not request: - raise TypeError("CloudAdapter.process: request can't be None") - if not agent: - raise TypeError("CloudAdapter.process: agent can't be None") + async def _read_json_body(self, request: Request) -> Any: + if "application/json" not in request.headers.get("Content-Type", ""): + raise self._unsupported_media_type_error(request) + return await request.json() - if request.method == "POST": - # Deserialize the incoming Activity - content_type = request.headers.get("Content-Type", "") - if "application/json" in content_type: - body = await request.json() - else: - raise HTTPException(status_code=415, detail="Unsupported Media Type") + def _get_claims_identity(self, request: Request) -> ClaimsIdentity | None: + return getattr( + request.state, "claims_identity", self._default_claims_identity() + ) - activity: Activity = Activity.model_validate(body) + def _method_not_allowed_error(self, request: Request) -> Exception: + return HTTPException(status_code=405, detail="Method Not Allowed") - # default to anonymous identity with no claims - claims_identity: ClaimsIdentity = getattr( - request.state, "claims_identity", ClaimsIdentity({}, False) - ) + def _unsupported_media_type_error(self, request: Request) -> Exception: + return HTTPException(status_code=415, detail="Unsupported Media Type") - # A POST request must contain an Activity - if ( - not activity.type - or not activity.conversation - or not activity.conversation.id - ): - raise HTTPException(status_code=400, detail="Bad Request") + def _bad_request_error(self, request: Request) -> Exception: + return HTTPException(status_code=400, detail="Bad Request") - try: - # Process the inbound activity with the agent - invoke_response = await self.process_activity( - claims_identity, activity, agent.on_turn - ) + def _unauthorized_error(self, request: Request) -> Exception: + return HTTPException(status_code=401, detail="Unauthorized") - if ( - activity.type == "invoke" - or activity.delivery_mode == DeliveryModes.expect_replies - ): - # Invoke and ExpectReplies cannot be performed async, the response must be written before the calling thread is released. - return JSONResponse( - content=invoke_response.body, status_code=invoke_response.status - ) + def _create_invoke_response( + self, request: Request, invoke_response: Any + ) -> Response: + return JSONResponse( + content=invoke_response.body, + status_code=invoke_response.status, + ) - return Response(status_code=202) - except PermissionError: - raise HTTPException(status_code=401, detail="Unauthorized") - else: - raise HTTPException(status_code=405, detail="Method Not Allowed") + def _create_accepted_response(self, request: Request) -> Response: + return Response(status_code=202) From 8a7cd5609ba61446d077ec87f9f12f6e3d6c2f5a Mon Sep 17 00:00:00 2001 From: Axel Suarez Date: Mon, 17 Nov 2025 14:26:46 -0800 Subject: [PATCH 2/6] Revert "Http framework integration refactor WIP" This reverts commit 318a0ceffffef8eb3809bdba6d8bcccf5739ed68. --- .../hosting/aiohttp/_start_agent_process.py | 25 +- .../hosting/aiohttp/agent_http_adapter.py | 16 +- .../aiohttp/channel_service_route_table.py | 143 ++++--- .../hosting/aiohttp/cloud_adapter.py | 131 +++++-- .../microsoft_agents/hosting/core/__init__.py | 14 - .../microsoft_agents/hosting/core/http.py | 360 ------------------ .../hosting/fastapi/_start_agent_process.py | 25 +- .../hosting/fastapi/agent_http_adapter.py | 11 +- .../fastapi/channel_service_route_table.py | 144 ++++--- .../hosting/fastapi/cloud_adapter.py | 130 +++++-- 10 files changed, 407 insertions(+), 592 deletions(-) delete mode 100644 libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http.py diff --git a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/_start_agent_process.py b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/_start_agent_process.py index d135d866..0aa93f8b 100644 --- a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/_start_agent_process.py +++ b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/_start_agent_process.py @@ -1,12 +1,6 @@ from typing import Optional - from aiohttp.web import Request, Response - -from microsoft_agents.hosting.core import ( - AgentApplication, - start_agent_process as core_start_agent_process, -) - +from microsoft_agents.hosting.core.app import AgentApplication from .cloud_adapter import CloudAdapter @@ -15,5 +9,18 @@ async def start_agent_process( agent_application: AgentApplication, adapter: CloudAdapter, ) -> Optional[Response]: - """Starts the agent host with the provided adapter and agent application.""" - return await core_start_agent_process(request, agent_application, adapter) + """Starts the agent host with the provided adapter and agent application. + Args: + adapter (CloudAdapter): The adapter to use for the agent host. + agent_application (AgentApplication): The agent application to run. + """ + if not adapter: + raise TypeError("start_agent_process: adapter can't be None") + if not agent_application: + raise TypeError("start_agent_process: agent_application can't be None") + + # Start the agent application with the provided adapter + return await adapter.process( + request, + agent_application, + ) diff --git a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/agent_http_adapter.py b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/agent_http_adapter.py index 8896f8f3..a0351aa0 100644 --- a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/agent_http_adapter.py +++ b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/agent_http_adapter.py @@ -1,12 +1,18 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -from typing import Protocol +from abc import abstractmethod +from typing import Optional, Protocol -from aiohttp.web import Request, Response +from aiohttp.web import ( + Request, + Response, +) -from microsoft_agents.hosting.core import AgentHttpAdapterProtocol +from microsoft_agents.hosting.core import Agent -class AgentHttpAdapter(AgentHttpAdapterProtocol[Request, Response], Protocol): - """Framework specific alias for the shared AgentHttpAdapter protocol.""" +class AgentHttpAdapter(Protocol): + @abstractmethod + async def process(self, request: Request, agent: Agent) -> Optional[Response]: + raise NotImplementedError() diff --git a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/channel_service_route_table.py b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/channel_service_route_table.py index bace245b..4a8a193b 100644 --- a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/channel_service_route_table.py +++ b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/channel_service_route_table.py @@ -1,80 +1,102 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. import json -from typing import Any +from typing import List, Union, Type -from aiohttp.web import HTTPUnsupportedMediaType, Request, Response, RouteTableDef +from aiohttp.web import RouteTableDef, Request, Response -from microsoft_agents.hosting.core import ( - ChannelApiHandlerProtocol, - ChannelServiceOperations, - serialize_agents_model, +from microsoft_agents.activity import ( + AgentsModel, + Activity, + AttachmentData, + ConversationParameters, + Transcript, ) +from microsoft_agents.hosting.core import ChannelApiHandlerProtocol -async def _read_payload(request: Request) -> Any: - if "application/json" not in request.headers.get("Content-Type", ""): - raise HTTPUnsupportedMediaType() - return await request.json() +async def deserialize_from_body( + request: Request, target_model: Type[AgentsModel] +) -> Activity: + if "application/json" in request.headers["Content-Type"]: + body = await request.json() + else: + return Response(status=415) + return target_model.model_validate(body) -def _json_response(result: Any) -> Response: - if result is None: - return Response() - payload = serialize_agents_model(result) - return Response(body=json.dumps(payload), content_type="application/json") +def get_serialized_response( + model_or_list: Union[AgentsModel, List[AgentsModel]], +) -> Response: + if isinstance(model_or_list, AgentsModel): + json_obj = model_or_list.model_dump( + mode="json", exclude_unset=True, by_alias=True + ) + else: + json_obj = [ + model.model_dump(mode="json", exclude_unset=True, by_alias=True) + for model in model_or_list + ] + + return Response(body=json.dumps(json_obj), content_type="application/json") def channel_service_route_table( handler: ChannelApiHandlerProtocol, base_url: str = "" ) -> RouteTableDef: + # pylint: disable=unused-variable routes = RouteTableDef() - operations = ChannelServiceOperations(handler) @routes.post(base_url + "/v3/conversations/{conversation_id}/activities") async def send_to_conversation(request: Request): - payload = await _read_payload(request) - result = await operations.send_to_conversation( + activity = await deserialize_from_body(request, Activity) + result = await handler.on_send_to_conversation( request.get("claims_identity"), request.match_info["conversation_id"], - payload, + activity, ) - return _json_response(result) + + return get_serialized_response(result) @routes.post( base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}" ) async def reply_to_activity(request: Request): - payload = await _read_payload(request) - result = await operations.reply_to_activity( + activity = await deserialize_from_body(request, Activity) + result = await handler.on_reply_to_activity( request.get("claims_identity"), request.match_info["conversation_id"], request.match_info["activity_id"], - payload, + activity, ) - return _json_response(result) + + return get_serialized_response(result) @routes.put( base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}" ) async def update_activity(request: Request): - payload = await _read_payload(request) - result = await operations.update_activity( + activity = await deserialize_from_body(request, Activity) + result = await handler.on_update_activity( request.get("claims_identity"), request.match_info["conversation_id"], request.match_info["activity_id"], - payload, + activity, ) - return _json_response(result) + + return get_serialized_response(result) @routes.delete( base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}" ) async def delete_activity(request: Request): - await operations.delete_activity( + await handler.on_delete_activity( request.get("claims_identity"), request.match_info["conversation_id"], request.match_info["activity_id"], ) + return Response() @routes.get( @@ -82,82 +104,91 @@ async def delete_activity(request: Request): + "/v3/conversations/{conversation_id}/activities/{activity_id}/members" ) async def get_activity_members(request: Request): - result = await operations.get_activity_members( + result = await handler.on_get_activity_members( request.get("claims_identity"), request.match_info["conversation_id"], request.match_info["activity_id"], ) - return _json_response(result) + + return get_serialized_response(result) @routes.post(base_url + "/") async def create_conversation(request: Request): - payload = await _read_payload(request) - result = await operations.create_conversation( - request.get("claims_identity"), - payload, + conversation_parameters = deserialize_from_body(request, ConversationParameters) + result = await handler.on_create_conversation( + request.get("claims_identity"), conversation_parameters ) - return _json_response(result) + + return get_serialized_response(result) @routes.get(base_url + "/") async def get_conversation(request: Request): - result = await operations.get_conversations( - request.get("claims_identity"), - None, + # TODO: continuation token? conversation_id? + result = await handler.on_get_conversations( + request.get("claims_identity"), None ) - return _json_response(result) + + return get_serialized_response(result) @routes.get(base_url + "/v3/conversations/{conversation_id}/members") async def get_conversation_members(request: Request): - result = await operations.get_conversation_members( + result = await handler.on_get_conversation_members( request.get("claims_identity"), request.match_info["conversation_id"], ) - return _json_response(result) + + return get_serialized_response(result) @routes.get(base_url + "/v3/conversations/{conversation_id}/members/{member_id}") async def get_conversation_member(request: Request): - result = await operations.get_conversation_member( + result = await handler.on_get_conversation_member( request.get("claims_identity"), request.match_info["member_id"], request.match_info["conversation_id"], ) - return _json_response(result) + + return get_serialized_response(result) @routes.get(base_url + "/v3/conversations/{conversation_id}/pagedmembers") async def get_conversation_paged_members(request: Request): - result = await operations.get_conversation_paged_members( + # TODO: continuation token? page size? + result = await handler.on_get_conversation_paged_members( request.get("claims_identity"), request.match_info["conversation_id"], ) - return _json_response(result) + + return get_serialized_response(result) @routes.delete(base_url + "/v3/conversations/{conversation_id}/members/{member_id}") async def delete_conversation_member(request: Request): - result = await operations.delete_conversation_member( + result = await handler.on_delete_conversation_member( request.get("claims_identity"), request.match_info["conversation_id"], request.match_info["member_id"], ) - return _json_response(result) + + return get_serialized_response(result) @routes.post(base_url + "/v3/conversations/{conversation_id}/activities/history") async def send_conversation_history(request: Request): - payload = await _read_payload(request) - result = await operations.send_conversation_history( + transcript = deserialize_from_body(request, Transcript) + result = await handler.on_send_conversation_history( request.get("claims_identity"), request.match_info["conversation_id"], - payload, + transcript, ) - return _json_response(result) + + return get_serialized_response(result) @routes.post(base_url + "/v3/conversations/{conversation_id}/attachments") async def upload_attachment(request: Request): - payload = await _read_payload(request) - result = await operations.upload_attachment( + attachment_data = deserialize_from_body(request, AttachmentData) + result = await handler.on_upload_attachment( request.get("claims_identity"), request.match_info["conversation_id"], - payload, + attachment_data, ) - return _json_response(result) + + return get_serialized_response(result) return routes diff --git a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/cloud_adapter.py b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/cloud_adapter.py index ae1a3bee..1ef106c3 100644 --- a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/cloud_adapter.py +++ b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/cloud_adapter.py @@ -1,67 +1,118 @@ -from typing import Any, Optional +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +from traceback import format_exc +from typing import Optional from aiohttp.web import ( + Request, + Response, + json_response, HTTPBadRequest, HTTPMethodNotAllowed, HTTPUnauthorized, HTTPUnsupportedMediaType, - Request, - Response, - json_response, ) - -from microsoft_agents.hosting.core import ( - CloudAdapterBase, +from microsoft_agents.hosting.core.authorization import ( + ClaimsIdentity, Connections, +) +from microsoft_agents.activity import ( + Activity, + DeliveryModes, +) +from microsoft_agents.hosting.core import ( + Agent, + ChannelServiceAdapter, ChannelServiceClientFactoryBase, + MessageFactory, + RestChannelServiceClientFactory, + TurnContext, ) -from microsoft_agents.hosting.core.authorization import ClaimsIdentity from .agent_http_adapter import AgentHttpAdapter -class CloudAdapter(CloudAdapterBase[Request, Response], AgentHttpAdapter): +class CloudAdapter(ChannelServiceAdapter, AgentHttpAdapter): def __init__( self, *, - connection_manager: Connections | None = None, - channel_service_client_factory: ChannelServiceClientFactoryBase | None = None, - ) -> None: - super().__init__( - connection_manager=connection_manager, - channel_service_client_factory=channel_service_client_factory, + connection_manager: Connections = None, + channel_service_client_factory: ChannelServiceClientFactoryBase = None, + ): + """ + Initializes a new instance of the CloudAdapter class. + + :param channel_service_client_factory: The factory to use to create the channel service client. + """ + + async def on_turn_error(context: TurnContext, error: Exception): + error_message = f"Exception caught : {error}" + print(format_exc()) + + await context.send_activity(MessageFactory.text(error_message)) + + # Send a trace activity + await context.send_trace_activity( + "OnTurnError Trace", + error_message, + "https://www.botframework.com/schemas/error", + "TurnError", + ) + + self.on_turn_error = on_turn_error + + channel_service_client_factory = ( + channel_service_client_factory + or RestChannelServiceClientFactory(connection_manager) ) - def _get_method(self, request: Request) -> str: - return request.method + super().__init__(channel_service_client_factory) - async def _read_json_body(self, request: Request) -> Any: - if "application/json" not in request.headers.get("Content-Type", ""): - raise self._unsupported_media_type_error(request) - return await request.json() + async def process(self, request: Request, agent: Agent) -> Optional[Response]: + if not request: + raise TypeError("CloudAdapter.process: request can't be None") + if not agent: + raise TypeError("CloudAdapter.process: agent can't be None") - def _get_claims_identity(self, request: Request) -> ClaimsIdentity | None: - return request.get("claims_identity", self._default_claims_identity()) + if request.method == "POST": + # Deserialize the incoming Activity + if "application/json" in request.headers["Content-Type"]: + body = await request.json() + else: + raise HTTPUnsupportedMediaType() - def _method_not_allowed_error(self, request: Request) -> Exception: - return HTTPMethodNotAllowed(request.method, ["POST"]) + activity: Activity = Activity.model_validate(body) - def _unsupported_media_type_error(self, request: Request) -> Exception: - return HTTPUnsupportedMediaType() + # default to anonymous identity with no claims + claims_identity: ClaimsIdentity = request.get( + "claims_identity", ClaimsIdentity({}, False) + ) - def _bad_request_error(self, request: Request) -> Exception: - return HTTPBadRequest() + # A POST request must contain an Activity + if ( + not activity.type + or not activity.conversation + or not activity.conversation.id + ): + raise HTTPBadRequest - def _unauthorized_error(self, request: Request) -> Exception: - return HTTPUnauthorized() + try: + # Process the inbound activity with the agent + invoke_response = await self.process_activity( + claims_identity, activity, agent.on_turn + ) - def _create_invoke_response( - self, request: Request, invoke_response: Any - ) -> Response: - return json_response( - data=invoke_response.body, - status=invoke_response.status, - ) + if ( + activity.type == "invoke" + or activity.delivery_mode == DeliveryModes.expect_replies + ): + # Invoke and ExpectReplies cannot be performed async, the response must be written before the calling thread is released. + return json_response( + data=invoke_response.body, status=invoke_response.status + ) - def _create_accepted_response(self, request: Request) -> Response: - return Response(status=202) + return Response(status=202) + except PermissionError: + raise HTTPUnauthorized + else: + raise HTTPMethodNotAllowed diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/__init__.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/__init__.py index 5ae2c23e..90d6f0ec 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/__init__.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/__init__.py @@ -9,14 +9,6 @@ from .middleware_set import Middleware from .rest_channel_service_client_factory import RestChannelServiceClientFactory from .turn_context import TurnContext -from .http import ( - AgentHttpAdapterProtocol, - ChannelServiceOperations, - CloudAdapterBase, - parse_agents_model, - serialize_agents_model, - start_agent_process, -) # Application Style from .app._type_defs import RouteHandler, RouteSelector, StateT @@ -104,12 +96,6 @@ "Middleware", "RestChannelServiceClientFactory", "TurnContext", - "AgentHttpAdapterProtocol", - "ChannelServiceOperations", - "CloudAdapterBase", - "parse_agents_model", - "serialize_agents_model", - "start_agent_process", "AgentApplication", "ApplicationError", "ApplicationOptions", diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http.py deleted file mode 100644 index abfe8841..00000000 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http.py +++ /dev/null @@ -1,360 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. - -"""Shared HTTP hosting utilities used by framework-specific adapters.""" - -from __future__ import annotations - -from abc import ABC, abstractmethod -from collections.abc import Iterable -from traceback import format_exc -from typing import Any, Generic, Optional, Protocol, Type, TypeVar - -from microsoft_agents.activity import ( - Activity, - AgentsModel, - AttachmentData, - ConversationParameters, - DeliveryModes, - Transcript, -) - -from .authorization.claims_identity import ClaimsIdentity -from .authorization.connections import Connections -from .channel_api_handler_protocol import ChannelApiHandlerProtocol -from .channel_service_adapter import ChannelServiceAdapter -from .channel_service_client_factory_base import ChannelServiceClientFactoryBase -from .message_factory import MessageFactory -from .rest_channel_service_client_factory import RestChannelServiceClientFactory -from .turn_context import TurnContext - -from typing import TYPE_CHECKING - -if TYPE_CHECKING: # pragma: no cover - imported for type checking only - from microsoft_agents.hosting.core.agent import Agent - from microsoft_agents.hosting.core.app.agent_application import AgentApplication - - -TModel = TypeVar("TModel", bound=AgentsModel) -RequestT = TypeVar("RequestT") -ResponseT = TypeVar("ResponseT") - - -class AgentHttpAdapterProtocol(Protocol, Generic[RequestT, ResponseT]): - """Protocol describing the contract for framework specific HTTP adapters.""" - - async def process(self, request: RequestT, agent: "Agent") -> Optional[ResponseT]: - raise NotImplementedError - - -async def start_agent_process( - request: RequestT, - agent_application: "AgentApplication", - adapter: AgentHttpAdapterProtocol[RequestT, ResponseT], -) -> Optional[ResponseT]: - """Start the agent process using the provided adapter and application.""" - if adapter is None: - raise TypeError("start_agent_process: adapter can't be None") - if agent_application is None: - raise TypeError("start_agent_process: agent_application can't be None") - - return await adapter.process(request, agent_application) - - -def parse_agents_model(payload: Any, model_type: Type[TModel]) -> TModel: - """Parse a payload into the requested AgentsModel derived type.""" - return model_type.model_validate(payload) - - -def serialize_agents_model(model_or_list: AgentsModel | Iterable[AgentsModel]) -> Any: - """Serialize AgentsModel instances into JSON serialisable structures.""" - if isinstance(model_or_list, AgentsModel): - return model_or_list.model_dump(mode="json", exclude_unset=True, by_alias=True) - - return [ - model.model_dump(mode="json", exclude_unset=True, by_alias=True) - for model in model_or_list - ] - - -class ChannelServiceOperations: - """Shared activity channel operations used by HTTP frameworks.""" - - def __init__(self, handler: ChannelApiHandlerProtocol) -> None: - self._handler = handler - - async def send_to_conversation( - self, - claims_identity: ClaimsIdentity, - conversation_id: str, - activity_payload: Any, - ): - activity = parse_agents_model(activity_payload, Activity) - return await self._handler.on_send_to_conversation( - claims_identity, - conversation_id, - activity, - ) - - async def reply_to_activity( - self, - claims_identity: ClaimsIdentity, - conversation_id: str, - activity_id: str, - activity_payload: Any, - ): - activity = parse_agents_model(activity_payload, Activity) - return await self._handler.on_reply_to_activity( - claims_identity, - conversation_id, - activity_id, - activity, - ) - - async def update_activity( - self, - claims_identity: ClaimsIdentity, - conversation_id: str, - activity_id: str, - activity_payload: Any, - ): - activity = parse_agents_model(activity_payload, Activity) - return await self._handler.on_update_activity( - claims_identity, - conversation_id, - activity_id, - activity, - ) - - async def delete_activity( - self, - claims_identity: ClaimsIdentity, - conversation_id: str, - activity_id: str, - ) -> None: - await self._handler.on_delete_activity( - claims_identity, - conversation_id, - activity_id, - ) - - async def get_activity_members( - self, - claims_identity: ClaimsIdentity, - conversation_id: str, - activity_id: str, - ): - return await self._handler.on_get_activity_members( - claims_identity, - conversation_id, - activity_id, - ) - - async def create_conversation( - self, - claims_identity: ClaimsIdentity, - parameters_payload: Any, - ): - parameters = parse_agents_model(parameters_payload, ConversationParameters) - return await self._handler.on_create_conversation( - claims_identity, - parameters, - ) - - async def get_conversations( - self, - claims_identity: ClaimsIdentity, - conversation_id: str | None = None, - ): - return await self._handler.on_get_conversations( - claims_identity, - conversation_id, - ) - - async def get_conversation_members( - self, - claims_identity: ClaimsIdentity, - conversation_id: str, - ): - return await self._handler.on_get_conversation_members( - claims_identity, - conversation_id, - ) - - async def get_conversation_member( - self, - claims_identity: ClaimsIdentity, - user_id: str, - conversation_id: str, - ): - return await self._handler.on_get_conversation_member( - claims_identity, - user_id, - conversation_id, - ) - - async def get_conversation_paged_members( - self, - claims_identity: ClaimsIdentity, - conversation_id: str, - page_size: int | None = None, - continuation_token: str | None = None, - ): - return await self._handler.on_get_conversation_paged_members( - claims_identity, - conversation_id, - page_size, - continuation_token, - ) - - async def delete_conversation_member( - self, - claims_identity: ClaimsIdentity, - conversation_id: str, - member_id: str, - ): - return await self._handler.on_delete_conversation_member( - claims_identity, - conversation_id, - member_id, - ) - - async def send_conversation_history( - self, - claims_identity: ClaimsIdentity, - conversation_id: str, - transcript_payload: Any, - ): - transcript = parse_agents_model(transcript_payload, Transcript) - return await self._handler.on_send_conversation_history( - claims_identity, - conversation_id, - transcript, - ) - - async def upload_attachment( - self, - claims_identity: ClaimsIdentity, - conversation_id: str, - attachment_payload: Any, - ): - attachment = parse_agents_model(attachment_payload, AttachmentData) - return await self._handler.on_upload_attachment( - claims_identity, - conversation_id, - attachment, - ) - - -class CloudAdapterBase(ChannelServiceAdapter, Generic[RequestT, ResponseT], ABC): - """Base implementation for framework specific CloudAdapter implementations.""" - - def __init__( - self, - *, - connection_manager: Connections | None = None, - channel_service_client_factory: ChannelServiceClientFactoryBase | None = None, - ) -> None: - async def on_turn_error(context: TurnContext, error: Exception) -> None: - error_message = f"Exception caught : {error}" - print(format_exc()) - - await context.send_activity(MessageFactory.text(error_message)) - - await context.send_trace_activity( - "OnTurnError Trace", - error_message, - "https://www.botframework.com/schemas/error", - "TurnError", - ) - - self.on_turn_error = on_turn_error - - factory = channel_service_client_factory or RestChannelServiceClientFactory( - connection_manager - ) - - super().__init__(factory) - - async def process( - self, request: RequestT, agent: "Agent" - ) -> Optional[ResponseT]: # pragma: no cover - exercised via subclasses - if request is None: - raise TypeError("CloudAdapter.process: request can't be None") - if agent is None: - raise TypeError("CloudAdapter.process: agent can't be None") - - if self._get_method(request) != "POST": - raise self._method_not_allowed_error(request) - - body = await self._read_json_body(request) - activity: Activity = Activity.model_validate(body) - - claims_identity = self._get_claims_identity(request) - if not claims_identity: - claims_identity = self._default_claims_identity() - - if ( - not activity.type - or not activity.conversation - or not activity.conversation.id - ): - raise self._bad_request_error(request) - - try: - invoke_response = await self.process_activity( - claims_identity, - activity, - agent.on_turn, - ) - except PermissionError as error: - raise self._unauthorized_error(request) from error - - if ( - activity.type == "invoke" - or activity.delivery_mode == DeliveryModes.expect_replies - ): - return self._create_invoke_response(request, invoke_response) - - return self._create_accepted_response(request) - - def _default_claims_identity(self) -> ClaimsIdentity: - return ClaimsIdentity({}, False) - - @abstractmethod - def _get_method(self, request: RequestT) -> str: - """Return the HTTP method for the incoming request.""" - - @abstractmethod - async def _read_json_body(self, request: RequestT) -> Any: - """Read and return the JSON payload.""" - - @abstractmethod - def _get_claims_identity(self, request: RequestT) -> ClaimsIdentity | None: - """Extract the claims identity from the request.""" - - @abstractmethod - def _method_not_allowed_error(self, request: RequestT) -> Exception: - """Return the exception raised when the request method is unsupported.""" - - @abstractmethod - def _unsupported_media_type_error(self, request: RequestT) -> Exception: - """Return the exception raised when the content type is unsupported.""" - - @abstractmethod - def _bad_request_error(self, request: RequestT) -> Exception: - """Return the exception raised when the request payload is invalid.""" - - @abstractmethod - def _unauthorized_error(self, request: RequestT) -> Exception: - """Return the exception raised when authorization fails.""" - - @abstractmethod - def _create_invoke_response( - self, request: RequestT, invoke_response: Any - ) -> ResponseT: - """Create the framework specific response for invoke results.""" - - @abstractmethod - def _create_accepted_response(self, request: RequestT) -> ResponseT: - """Create the framework specific HTTP 202 response.""" diff --git a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/_start_agent_process.py b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/_start_agent_process.py index 7d87573e..13396ca8 100644 --- a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/_start_agent_process.py +++ b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/_start_agent_process.py @@ -1,12 +1,6 @@ from typing import Optional - from fastapi import Request, Response - -from microsoft_agents.hosting.core import ( - AgentApplication, - start_agent_process as core_start_agent_process, -) - +from microsoft_agents.hosting.core.app import AgentApplication from .cloud_adapter import CloudAdapter @@ -15,5 +9,18 @@ async def start_agent_process( agent_application: AgentApplication, adapter: CloudAdapter, ) -> Optional[Response]: - """Starts the agent host with the provided adapter and agent application.""" - return await core_start_agent_process(request, agent_application, adapter) + """Starts the agent host with the provided adapter and agent application. + Args: + adapter (CloudAdapter): The adapter to use for the agent host. + agent_application (AgentApplication): The agent application to run. + """ + if not adapter: + raise TypeError("start_agent_process: adapter can't be None") + if not agent_application: + raise TypeError("start_agent_process: agent_application can't be None") + + # Start the agent application with the provided adapter + return await adapter.process( + request, + agent_application, + ) diff --git a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/agent_http_adapter.py b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/agent_http_adapter.py index f1a08425..2584b272 100644 --- a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/agent_http_adapter.py +++ b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/agent_http_adapter.py @@ -1,12 +1,15 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -from typing import Protocol +from abc import abstractmethod +from typing import Optional, Protocol from fastapi import Request, Response -from microsoft_agents.hosting.core import AgentHttpAdapterProtocol +from microsoft_agents.hosting.core import Agent -class AgentHttpAdapter(AgentHttpAdapterProtocol[Request, Response], Protocol): - """Framework specific alias for the shared AgentHttpAdapter protocol.""" +class AgentHttpAdapter(Protocol): + @abstractmethod + async def process(self, request: Request, agent: Agent) -> Optional[Response]: + raise NotImplementedError() diff --git a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/channel_service_route_table.py b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/channel_service_route_table.py index e55c9076..2dd009fc 100644 --- a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/channel_service_route_table.py +++ b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/channel_service_route_table.py @@ -1,44 +1,64 @@ -from typing import Any +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import json +from typing import List, Union, Type -from fastapi import APIRouter, HTTPException, Request, Response +from fastapi import APIRouter, Request, Response, HTTPException, Depends from fastapi.responses import JSONResponse -from microsoft_agents.hosting.core import ( - ChannelApiHandlerProtocol, - ChannelServiceOperations, - serialize_agents_model, +from microsoft_agents.activity import ( + AgentsModel, + Activity, + AttachmentData, + ConversationParameters, + Transcript, ) +from microsoft_agents.hosting.core import ChannelApiHandlerProtocol -async def _read_payload(request: Request) -> Any: - if "application/json" not in request.headers.get("Content-Type", ""): +async def deserialize_from_body( + request: Request, target_model: Type[AgentsModel] +) -> AgentsModel: + content_type = request.headers.get("Content-Type", "") + if "application/json" in content_type: + body = await request.json() + else: raise HTTPException(status_code=415, detail="Unsupported Media Type") - return await request.json() + return target_model.model_validate(body) -def _json_response(result: Any) -> Response: - if result is None: - return Response(status_code=200) - payload = serialize_agents_model(result) - return JSONResponse(content=payload) +def get_serialized_response( + model_or_list: Union[AgentsModel, List[AgentsModel]], +) -> JSONResponse: + if isinstance(model_or_list, AgentsModel): + json_obj = model_or_list.model_dump( + mode="json", exclude_unset=True, by_alias=True + ) + else: + json_obj = [ + model.model_dump(mode="json", exclude_unset=True, by_alias=True) + for model in model_or_list + ] + + return JSONResponse(content=json_obj) def channel_service_route_table( handler: ChannelApiHandlerProtocol, base_url: str = "" ) -> APIRouter: router = APIRouter() - operations = ChannelServiceOperations(handler) @router.post(base_url + "/v3/conversations/{conversation_id}/activities") async def send_to_conversation(conversation_id: str, request: Request): - payload = await _read_payload(request) - result = await operations.send_to_conversation( + activity = await deserialize_from_body(request, Activity) + result = await handler.on_send_to_conversation( getattr(request.state, "claims_identity", None), conversation_id, - payload, + activity, ) - return _json_response(result) + + return get_serialized_response(result) @router.post( base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}" @@ -46,37 +66,40 @@ async def send_to_conversation(conversation_id: str, request: Request): async def reply_to_activity( conversation_id: str, activity_id: str, request: Request ): - payload = await _read_payload(request) - result = await operations.reply_to_activity( + activity = await deserialize_from_body(request, Activity) + result = await handler.on_reply_to_activity( getattr(request.state, "claims_identity", None), conversation_id, activity_id, - payload, + activity, ) - return _json_response(result) + + return get_serialized_response(result) @router.put( base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}" ) async def update_activity(conversation_id: str, activity_id: str, request: Request): - payload = await _read_payload(request) - result = await operations.update_activity( + activity = await deserialize_from_body(request, Activity) + result = await handler.on_update_activity( getattr(request.state, "claims_identity", None), conversation_id, activity_id, - payload, + activity, ) - return _json_response(result) + + return get_serialized_response(result) @router.delete( base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}" ) async def delete_activity(conversation_id: str, activity_id: str, request: Request): - await operations.delete_activity( + await handler.on_delete_activity( getattr(request.state, "claims_identity", None), conversation_id, activity_id, ) + return Response(status_code=200) @router.get( @@ -86,86 +109,97 @@ async def delete_activity(conversation_id: str, activity_id: str, request: Reque async def get_activity_members( conversation_id: str, activity_id: str, request: Request ): - result = await operations.get_activity_members( + result = await handler.on_get_activity_members( getattr(request.state, "claims_identity", None), conversation_id, activity_id, ) - return _json_response(result) + + return get_serialized_response(result) @router.post(base_url + "/") async def create_conversation(request: Request): - payload = await _read_payload(request) - result = await operations.create_conversation( - getattr(request.state, "claims_identity", None), - payload, + conversation_parameters = await deserialize_from_body( + request, ConversationParameters + ) + result = await handler.on_create_conversation( + getattr(request.state, "claims_identity", None), conversation_parameters ) - return _json_response(result) + + return get_serialized_response(result) @router.get(base_url + "/") async def get_conversation(request: Request): - result = await operations.get_conversations( - getattr(request.state, "claims_identity", None), - None, + # TODO: continuation token? conversation_id? + result = await handler.on_get_conversations( + getattr(request.state, "claims_identity", None), None ) - return _json_response(result) + + return get_serialized_response(result) @router.get(base_url + "/v3/conversations/{conversation_id}/members") async def get_conversation_members(conversation_id: str, request: Request): - result = await operations.get_conversation_members( + result = await handler.on_get_conversation_members( getattr(request.state, "claims_identity", None), conversation_id, ) - return _json_response(result) + + return get_serialized_response(result) @router.get(base_url + "/v3/conversations/{conversation_id}/members/{member_id}") async def get_conversation_member( conversation_id: str, member_id: str, request: Request ): - result = await operations.get_conversation_member( + result = await handler.on_get_conversation_member( getattr(request.state, "claims_identity", None), member_id, conversation_id, ) - return _json_response(result) + + return get_serialized_response(result) @router.get(base_url + "/v3/conversations/{conversation_id}/pagedmembers") async def get_conversation_paged_members(conversation_id: str, request: Request): - result = await operations.get_conversation_paged_members( + # TODO: continuation token? page size? + result = await handler.on_get_conversation_paged_members( getattr(request.state, "claims_identity", None), conversation_id, ) - return _json_response(result) + + return get_serialized_response(result) @router.delete(base_url + "/v3/conversations/{conversation_id}/members/{member_id}") async def delete_conversation_member( conversation_id: str, member_id: str, request: Request ): - result = await operations.delete_conversation_member( + result = await handler.on_delete_conversation_member( getattr(request.state, "claims_identity", None), conversation_id, member_id, ) - return _json_response(result) + + return get_serialized_response(result) @router.post(base_url + "/v3/conversations/{conversation_id}/activities/history") async def send_conversation_history(conversation_id: str, request: Request): - payload = await _read_payload(request) - result = await operations.send_conversation_history( + transcript = await deserialize_from_body(request, Transcript) + result = await handler.on_send_conversation_history( getattr(request.state, "claims_identity", None), conversation_id, - payload, + transcript, ) - return _json_response(result) + + return get_serialized_response(result) @router.post(base_url + "/v3/conversations/{conversation_id}/attachments") async def upload_attachment(conversation_id: str, request: Request): - payload = await _read_payload(request) - result = await operations.upload_attachment( + attachment_data = await deserialize_from_body(request, AttachmentData) + result = await handler.on_upload_attachment( getattr(request.state, "claims_identity", None), conversation_id, - payload, + attachment_data, ) - return _json_response(result) + + return get_serialized_response(result) return router diff --git a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/cloud_adapter.py b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/cloud_adapter.py index 961804d2..3383c793 100644 --- a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/cloud_adapter.py +++ b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/cloud_adapter.py @@ -1,62 +1,112 @@ -from typing import Any, Optional +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +from traceback import format_exc +from typing import Optional -from fastapi import HTTPException, Request, Response +from fastapi import Request, Response, HTTPException from fastapi.responses import JSONResponse - +from microsoft_agents.hosting.core.authorization import ( + ClaimsIdentity, + Connections, +) +from microsoft_agents.activity import ( + Activity, + DeliveryModes, +) from microsoft_agents.hosting.core import ( + Agent, + ChannelServiceAdapter, ChannelServiceClientFactoryBase, - CloudAdapterBase, - Connections, + MessageFactory, + RestChannelServiceClientFactory, + TurnContext, ) -from microsoft_agents.hosting.core.authorization import ClaimsIdentity from .agent_http_adapter import AgentHttpAdapter -class CloudAdapter(CloudAdapterBase[Request, Response], AgentHttpAdapter): +class CloudAdapter(ChannelServiceAdapter, AgentHttpAdapter): def __init__( self, *, - connection_manager: Connections | None = None, - channel_service_client_factory: ChannelServiceClientFactoryBase | None = None, - ) -> None: - super().__init__( - connection_manager=connection_manager, - channel_service_client_factory=channel_service_client_factory, - ) + connection_manager: Connections = None, + channel_service_client_factory: ChannelServiceClientFactoryBase = None, + ): + """ + Initializes a new instance of the CloudAdapter class. + + :param channel_service_client_factory: The factory to use to create the channel service client. + """ - def _get_method(self, request: Request) -> str: - return request.method + async def on_turn_error(context: TurnContext, error: Exception): + error_message = f"Exception caught : {error}" + print(format_exc()) - async def _read_json_body(self, request: Request) -> Any: - if "application/json" not in request.headers.get("Content-Type", ""): - raise self._unsupported_media_type_error(request) - return await request.json() + await context.send_activity(MessageFactory.text(error_message)) - def _get_claims_identity(self, request: Request) -> ClaimsIdentity | None: - return getattr( - request.state, "claims_identity", self._default_claims_identity() + # Send a trace activity + await context.send_trace_activity( + "OnTurnError Trace", + error_message, + "https://www.botframework.com/schemas/error", + "TurnError", + ) + + self.on_turn_error = on_turn_error + + channel_service_client_factory = ( + channel_service_client_factory + or RestChannelServiceClientFactory(connection_manager) ) - def _method_not_allowed_error(self, request: Request) -> Exception: - return HTTPException(status_code=405, detail="Method Not Allowed") + super().__init__(channel_service_client_factory) - def _unsupported_media_type_error(self, request: Request) -> Exception: - return HTTPException(status_code=415, detail="Unsupported Media Type") + async def process(self, request: Request, agent: Agent) -> Optional[Response]: + if not request: + raise TypeError("CloudAdapter.process: request can't be None") + if not agent: + raise TypeError("CloudAdapter.process: agent can't be None") - def _bad_request_error(self, request: Request) -> Exception: - return HTTPException(status_code=400, detail="Bad Request") + if request.method == "POST": + # Deserialize the incoming Activity + content_type = request.headers.get("Content-Type", "") + if "application/json" in content_type: + body = await request.json() + else: + raise HTTPException(status_code=415, detail="Unsupported Media Type") - def _unauthorized_error(self, request: Request) -> Exception: - return HTTPException(status_code=401, detail="Unauthorized") + activity: Activity = Activity.model_validate(body) - def _create_invoke_response( - self, request: Request, invoke_response: Any - ) -> Response: - return JSONResponse( - content=invoke_response.body, - status_code=invoke_response.status, - ) + # default to anonymous identity with no claims + claims_identity: ClaimsIdentity = getattr( + request.state, "claims_identity", ClaimsIdentity({}, False) + ) + + # A POST request must contain an Activity + if ( + not activity.type + or not activity.conversation + or not activity.conversation.id + ): + raise HTTPException(status_code=400, detail="Bad Request") + + try: + # Process the inbound activity with the agent + invoke_response = await self.process_activity( + claims_identity, activity, agent.on_turn + ) + + if ( + activity.type == "invoke" + or activity.delivery_mode == DeliveryModes.expect_replies + ): + # Invoke and ExpectReplies cannot be performed async, the response must be written before the calling thread is released. + return JSONResponse( + content=invoke_response.body, status_code=invoke_response.status + ) - def _create_accepted_response(self, request: Request) -> Response: - return Response(status_code=202) + return Response(status_code=202) + except PermissionError: + raise HTTPException(status_code=401, detail="Unauthorized") + else: + raise HTTPException(status_code=405, detail="Method Not Allowed") From e0fcd0e030797c5d949ddba067f221ce6ef5a3d9 Mon Sep 17 00:00:00 2001 From: Axel Suarez Date: Mon, 17 Nov 2025 15:19:58 -0800 Subject: [PATCH 3/6] HTTP Refactor WIP: merge conflicts pending --- .../hosting/aiohttp/__init__.py | 4 +- .../aiohttp/channel_service_route_table.py | 181 +++----- .../hosting/aiohttp/cloud_adapter.py | 162 +++---- .../microsoft_agents/hosting/core/__init__.py | 24 + .../hosting/core/app/streaming/__init__.py | 14 + .../hosting/core/app/streaming/citation.py | 22 + .../core/app/streaming/citation_util.py | 85 ++++ .../core/app/streaming/streaming_response.py | 412 ++++++++++++++++++ .../hosting/core/http/__init__.py | 17 + .../core/http/_channel_service_routes.py | 202 +++++++++ .../hosting/core/http/_http_adapter_base.py | 137 ++++++ .../core/http/_http_request_protocol.py | 36 ++ .../hosting/core/http/_http_response.py | 56 +++ .../hosting/fastapi/__init__.py | 4 +- .../fastapi/channel_service_route_table.py | 183 +++----- .../hosting/fastapi/cloud_adapter.py | 155 +++---- 16 files changed, 1277 insertions(+), 417 deletions(-) create mode 100644 libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/streaming/__init__.py create mode 100644 libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/streaming/citation.py create mode 100644 libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/streaming/citation_util.py create mode 100644 libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/streaming/streaming_response.py create mode 100644 libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http/__init__.py create mode 100644 libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http/_channel_service_routes.py create mode 100644 libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http/_http_adapter_base.py create mode 100644 libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http/_http_request_protocol.py create mode 100644 libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http/_http_response.py diff --git a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/__init__.py b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/__init__.py index ed572337..f639246f 100644 --- a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/__init__.py +++ b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/__init__.py @@ -6,7 +6,9 @@ jwt_authorization_middleware, jwt_authorization_decorator, ) -from .app.streaming import ( + +# Import streaming utilities from core for backward compatibility +from microsoft_agents.hosting.core.app.streaming import ( Citation, CitationUtil, StreamingResponse, diff --git a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/channel_service_route_table.py b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/channel_service_route_table.py index 4a8a193b..3a2acad4 100644 --- a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/channel_service_route_table.py +++ b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/channel_service_route_table.py @@ -1,102 +1,81 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. import json -from typing import List, Union, Type from aiohttp.web import RouteTableDef, Request, Response -from microsoft_agents.activity import ( - AgentsModel, - Activity, - AttachmentData, - ConversationParameters, - Transcript, -) from microsoft_agents.hosting.core import ChannelApiHandlerProtocol +from microsoft_agents.hosting.core.http import ChannelServiceRoutes -async def deserialize_from_body( - request: Request, target_model: Type[AgentsModel] -) -> Activity: - if "application/json" in request.headers["Content-Type"]: - body = await request.json() - else: - return Response(status=415) +class AiohttpRequestAdapter: + """Adapter for aiohttp requests to use with ChannelServiceRoutes.""" - return target_model.model_validate(body) + def __init__(self, request: Request): + self._request = request + @property + def method(self) -> str: + return self._request.method -def get_serialized_response( - model_or_list: Union[AgentsModel, List[AgentsModel]], -) -> Response: - if isinstance(model_or_list, AgentsModel): - json_obj = model_or_list.model_dump( - mode="json", exclude_unset=True, by_alias=True - ) - else: - json_obj = [ - model.model_dump(mode="json", exclude_unset=True, by_alias=True) - for model in model_or_list - ] + @property + def headers(self): + return self._request.headers + + async def json(self): + return await self._request.json() + + def get_claims_identity(self): + return self._request.get("claims_identity") - return Response(body=json.dumps(json_obj), content_type="application/json") + def get_path_param(self, name: str) -> str: + return self._request.match_info[name] def channel_service_route_table( handler: ChannelApiHandlerProtocol, base_url: str = "" ) -> RouteTableDef: - # pylint: disable=unused-variable + """Create aiohttp route table for Channel Service API. + + Args: + handler: The handler that implements the Channel API protocol. + base_url: Optional base URL prefix for all routes. + + Returns: + RouteTableDef with all channel service routes. + """ routes = RouteTableDef() + service_routes = ChannelServiceRoutes(handler, base_url) + + def json_response(data: dict) -> Response: + return Response(body=json.dumps(data), content_type="application/json") @routes.post(base_url + "/v3/conversations/{conversation_id}/activities") async def send_to_conversation(request: Request): - activity = await deserialize_from_body(request, Activity) - result = await handler.on_send_to_conversation( - request.get("claims_identity"), - request.match_info["conversation_id"], - activity, + result = await service_routes.send_to_conversation( + AiohttpRequestAdapter(request) ) - - return get_serialized_response(result) + return json_response(result) @routes.post( base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}" ) async def reply_to_activity(request: Request): - activity = await deserialize_from_body(request, Activity) - result = await handler.on_reply_to_activity( - request.get("claims_identity"), - request.match_info["conversation_id"], - request.match_info["activity_id"], - activity, - ) - - return get_serialized_response(result) + result = await service_routes.reply_to_activity(AiohttpRequestAdapter(request)) + return json_response(result) @routes.put( base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}" ) async def update_activity(request: Request): - activity = await deserialize_from_body(request, Activity) - result = await handler.on_update_activity( - request.get("claims_identity"), - request.match_info["conversation_id"], - request.match_info["activity_id"], - activity, - ) - - return get_serialized_response(result) + result = await service_routes.update_activity(AiohttpRequestAdapter(request)) + return json_response(result) @routes.delete( base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}" ) async def delete_activity(request: Request): - await handler.on_delete_activity( - request.get("claims_identity"), - request.match_info["conversation_id"], - request.match_info["activity_id"], - ) - + await service_routes.delete_activity(AiohttpRequestAdapter(request)) return Response() @routes.get( @@ -104,91 +83,61 @@ async def delete_activity(request: Request): + "/v3/conversations/{conversation_id}/activities/{activity_id}/members" ) async def get_activity_members(request: Request): - result = await handler.on_get_activity_members( - request.get("claims_identity"), - request.match_info["conversation_id"], - request.match_info["activity_id"], + result = await service_routes.get_activity_members( + AiohttpRequestAdapter(request) ) - - return get_serialized_response(result) + return json_response(result) @routes.post(base_url + "/") async def create_conversation(request: Request): - conversation_parameters = deserialize_from_body(request, ConversationParameters) - result = await handler.on_create_conversation( - request.get("claims_identity"), conversation_parameters + result = await service_routes.create_conversation( + AiohttpRequestAdapter(request) ) - - return get_serialized_response(result) + return json_response(result) @routes.get(base_url + "/") async def get_conversation(request: Request): - # TODO: continuation token? conversation_id? - result = await handler.on_get_conversations( - request.get("claims_identity"), None - ) - - return get_serialized_response(result) + result = await service_routes.get_conversations(AiohttpRequestAdapter(request)) + return json_response(result) @routes.get(base_url + "/v3/conversations/{conversation_id}/members") async def get_conversation_members(request: Request): - result = await handler.on_get_conversation_members( - request.get("claims_identity"), - request.match_info["conversation_id"], + result = await service_routes.get_conversation_members( + AiohttpRequestAdapter(request) ) - - return get_serialized_response(result) + return json_response(result) @routes.get(base_url + "/v3/conversations/{conversation_id}/members/{member_id}") async def get_conversation_member(request: Request): - result = await handler.on_get_conversation_member( - request.get("claims_identity"), - request.match_info["member_id"], - request.match_info["conversation_id"], + result = await service_routes.get_conversation_member( + AiohttpRequestAdapter(request) ) - - return get_serialized_response(result) + return json_response(result) @routes.get(base_url + "/v3/conversations/{conversation_id}/pagedmembers") async def get_conversation_paged_members(request: Request): - # TODO: continuation token? page size? - result = await handler.on_get_conversation_paged_members( - request.get("claims_identity"), - request.match_info["conversation_id"], + result = await service_routes.get_conversation_paged_members( + AiohttpRequestAdapter(request) ) - - return get_serialized_response(result) + return json_response(result) @routes.delete(base_url + "/v3/conversations/{conversation_id}/members/{member_id}") async def delete_conversation_member(request: Request): - result = await handler.on_delete_conversation_member( - request.get("claims_identity"), - request.match_info["conversation_id"], - request.match_info["member_id"], + result = await service_routes.delete_conversation_member( + AiohttpRequestAdapter(request) ) - - return get_serialized_response(result) + return json_response(result) @routes.post(base_url + "/v3/conversations/{conversation_id}/activities/history") async def send_conversation_history(request: Request): - transcript = deserialize_from_body(request, Transcript) - result = await handler.on_send_conversation_history( - request.get("claims_identity"), - request.match_info["conversation_id"], - transcript, + result = await service_routes.send_conversation_history( + AiohttpRequestAdapter(request) ) - - return get_serialized_response(result) + return json_response(result) @routes.post(base_url + "/v3/conversations/{conversation_id}/attachments") async def upload_attachment(request: Request): - attachment_data = deserialize_from_body(request, AttachmentData) - result = await handler.on_upload_attachment( - request.get("claims_identity"), - request.match_info["conversation_id"], - attachment_data, - ) - - return get_serialized_response(result) + result = await service_routes.upload_attachment(AiohttpRequestAdapter(request)) + return json_response(result) return routes diff --git a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/cloud_adapter.py b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/cloud_adapter.py index 1ef106c3..963b4ac7 100644 --- a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/cloud_adapter.py +++ b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/cloud_adapter.py @@ -1,38 +1,48 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -from traceback import format_exc from typing import Optional -from aiohttp.web import ( - Request, - Response, - json_response, - HTTPBadRequest, - HTTPMethodNotAllowed, - HTTPUnauthorized, - HTTPUnsupportedMediaType, -) -from microsoft_agents.hosting.core.authorization import ( - ClaimsIdentity, - Connections, -) -from microsoft_agents.activity import ( - Activity, - DeliveryModes, -) -from microsoft_agents.hosting.core import ( - Agent, - ChannelServiceAdapter, - ChannelServiceClientFactoryBase, - MessageFactory, - RestChannelServiceClientFactory, - TurnContext, +from aiohttp.web import Request, Response, json_response + +from microsoft_agents.hosting.core import Agent +from microsoft_agents.hosting.core.authorization import Connections +from microsoft_agents.hosting.core.http import ( + HttpAdapterBase, + HttpRequestProtocol, + HttpResponse, ) +from microsoft_agents.hosting.core import ChannelServiceClientFactoryBase from .agent_http_adapter import AgentHttpAdapter -class CloudAdapter(ChannelServiceAdapter, AgentHttpAdapter): +class AiohttpRequestAdapter: + """Adapter to make aiohttp Request compatible with HttpRequestProtocol.""" + + def __init__(self, request: Request): + self._request = request + + @property + def method(self) -> str: + return self._request.method + + @property + def headers(self): + return self._request.headers + + async def json(self): + return await self._request.json() + + def get_claims_identity(self): + return self._request.get("claims_identity") + + def get_path_param(self, name: str) -> str: + return self._request.match_info[name] + + +class CloudAdapter(HttpAdapterBase, AgentHttpAdapter): + """CloudAdapter for aiohttp web framework.""" + def __init__( self, *, @@ -42,77 +52,43 @@ def __init__( """ Initializes a new instance of the CloudAdapter class. + :param connection_manager: Optional connection manager for OAuth. :param channel_service_client_factory: The factory to use to create the channel service client. """ + super().__init__( + connection_manager=connection_manager, + channel_service_client_factory=channel_service_client_factory, + ) - async def on_turn_error(context: TurnContext, error: Exception): - error_message = f"Exception caught : {error}" - print(format_exc()) + async def process(self, request: Request, agent: Agent) -> Optional[Response]: + """Process an aiohttp request. - await context.send_activity(MessageFactory.text(error_message)) + Args: + request: The aiohttp request. + agent: The agent to handle the request. - # Send a trace activity - await context.send_trace_activity( - "OnTurnError Trace", - error_message, - "https://www.botframework.com/schemas/error", - "TurnError", + Returns: + aiohttp Response object. + """ + # Adapt request to protocol + adapted_request = AiohttpRequestAdapter(request) + + # Process using base implementation + http_response: HttpResponse = await self.process_request(adapted_request, agent) + + # Convert HttpResponse to aiohttp Response + return self._to_aiohttp_response(http_response) + + @staticmethod + def _to_aiohttp_response(http_response: HttpResponse) -> Response: + """Convert HttpResponse to aiohttp Response.""" + if http_response.body is not None: + return json_response( + data=http_response.body, + status=http_response.status_code, + headers=http_response.headers, ) - - self.on_turn_error = on_turn_error - - channel_service_client_factory = ( - channel_service_client_factory - or RestChannelServiceClientFactory(connection_manager) + return Response( + status=http_response.status_code, + headers=http_response.headers, ) - - super().__init__(channel_service_client_factory) - - async def process(self, request: Request, agent: Agent) -> Optional[Response]: - if not request: - raise TypeError("CloudAdapter.process: request can't be None") - if not agent: - raise TypeError("CloudAdapter.process: agent can't be None") - - if request.method == "POST": - # Deserialize the incoming Activity - if "application/json" in request.headers["Content-Type"]: - body = await request.json() - else: - raise HTTPUnsupportedMediaType() - - activity: Activity = Activity.model_validate(body) - - # default to anonymous identity with no claims - claims_identity: ClaimsIdentity = request.get( - "claims_identity", ClaimsIdentity({}, False) - ) - - # A POST request must contain an Activity - if ( - not activity.type - or not activity.conversation - or not activity.conversation.id - ): - raise HTTPBadRequest - - try: - # Process the inbound activity with the agent - invoke_response = await self.process_activity( - claims_identity, activity, agent.on_turn - ) - - if ( - activity.type == "invoke" - or activity.delivery_mode == DeliveryModes.expect_replies - ): - # Invoke and ExpectReplies cannot be performed async, the response must be written before the calling thread is released. - return json_response( - data=invoke_response.body, status=invoke_response.status - ) - - return Response(status=202) - except PermissionError: - raise HTTPUnauthorized - else: - raise HTTPMethodNotAllowed diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/__init__.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/__init__.py index 90d6f0ec..4a003bb2 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/__init__.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/__init__.py @@ -10,6 +10,15 @@ from .rest_channel_service_client_factory import RestChannelServiceClientFactory from .turn_context import TurnContext +# HTTP abstractions +from .http import ( + HttpRequestProtocol, + HttpResponse, + HttpResponseFactory, + HttpAdapterBase, + ChannelServiceRoutes, +) + # Application Style from .app._type_defs import RouteHandler, RouteSelector, StateT from .app.agent_application import AgentApplication @@ -20,6 +29,13 @@ from .app._routes import _Route, _RouteList, RouteRank from .app.typing_indicator import TypingIndicator +# App Streaming +from .app.streaming import ( + Citation, + CitationUtil, + StreamingResponse, +) + # App Auth from .app.oauth import ( Authorization, @@ -96,6 +112,11 @@ "Middleware", "RestChannelServiceClientFactory", "TurnContext", + "HttpRequestProtocol", + "HttpResponse", + "HttpResponseFactory", + "HttpAdapterBase", + "ChannelServiceRoutes", "AgentApplication", "ApplicationError", "ApplicationOptions", @@ -105,6 +126,9 @@ "Route", "RouteHandler", "TypingIndicator", + "Citation", + "CitationUtil", + "StreamingResponse", "ConversationState", "state", "State", diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/streaming/__init__.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/streaming/__init__.py new file mode 100644 index 00000000..89efa87d --- /dev/null +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/streaming/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +"""Streaming response utilities.""" + +from .citation import Citation +from .citation_util import CitationUtil +from .streaming_response import StreamingResponse + +__all__ = [ + "Citation", + "CitationUtil", + "StreamingResponse", +] diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/streaming/citation.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/streaming/citation.py new file mode 100644 index 00000000..f643639a --- /dev/null +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/streaming/citation.py @@ -0,0 +1,22 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from typing import Optional +from dataclasses import dataclass + + +@dataclass +class Citation: + """Citations returned by the model.""" + + content: str + """The content of the citation.""" + + title: Optional[str] = None + """The title of the citation.""" + + url: Optional[str] = None + """The URL of the citation.""" + + filepath: Optional[str] = None + """The filepath of the document.""" diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/streaming/citation_util.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/streaming/citation_util.py new file mode 100644 index 00000000..1ec923dc --- /dev/null +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/streaming/citation_util.py @@ -0,0 +1,85 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import re +from typing import List, Optional + +from microsoft_agents.activity import ClientCitation + + +class CitationUtil: + """Utility functions for manipulating text and citations.""" + + @staticmethod + def snippet(text: str, max_length: int) -> str: + """ + Clips the text to a maximum length in case it exceeds the limit. + + Args: + text: The text to clip. + max_length: The maximum length of the text to return, cutting off the last whole word. + + Returns: + The modified text + """ + if len(text) <= max_length: + return text + + snippet = text[:max_length] + snippet = snippet[: min(len(snippet), snippet.rfind(" "))] + snippet += "..." + return snippet + + @staticmethod + def format_citations_response(text: str) -> str: + """ + Convert citation tags `[doc(s)n]` to `[n]` where n is a number. + + Args: + text: The text to format. + + Returns: + The formatted text. + """ + return re.sub(r"\[docs?(\d+)\]", r"[\1]", text, flags=re.IGNORECASE) + + @staticmethod + def get_used_citations( + text: str, citations: List[ClientCitation] + ) -> Optional[List[ClientCitation]]: + """ + Get the citations used in the text. This will remove any citations that are + included in the citations array from the response but not referenced in the text. + + Args: + text: The text to search for citation references, i.e. [1], [2], etc. + citations: The list of citations to search for. + + Returns: + The list of citations used in the text. + """ + regex = re.compile(r"\[(\d+)\]", re.IGNORECASE) + matches = regex.findall(text) + + if not matches: + return None + + # Remove duplicates + filtered_matches = set(matches) + + # Add citations + used_citations = [] + for match in filtered_matches: + citation_ref = f"[{match}]" + found = next( + ( + citation + for citation in citations + if f"[{citation.position}]" == citation_ref + ), + None, + ) + if found: + used_citations.append(found) + + return used_citations if used_citations else None diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/streaming/streaming_response.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/streaming/streaming_response.py new file mode 100644 index 00000000..db57858b --- /dev/null +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/streaming/streaming_response.py @@ -0,0 +1,412 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import asyncio +import logging +from typing import List, Optional, Callable, Literal, TYPE_CHECKING +from dataclasses import dataclass + +from microsoft_agents.activity import ( + Activity, + Entity, + Attachment, + Channels, + ClientCitation, + DeliveryModes, + SensitivityUsageInfo, +) + +if TYPE_CHECKING: + from microsoft_agents.hosting.core.turn_context import TurnContext + +from .citation import Citation +from .citation_util import CitationUtil + +logger = logging.getLogger(__name__) + + +class StreamingResponse: + """ + A helper class for streaming responses to the client. + + This class is used to send a series of updates to the client in a single response. + The expected sequence of calls is: + + `queue_informative_update()`, `queue_text_chunk()`, `queue_text_chunk()`, ..., `end_stream()`. + + Once `end_stream()` is called, the stream is considered ended and no further updates can be sent. + """ + + def __init__(self, context: "TurnContext"): + """ + Creates a new StreamingResponse instance. + + Args: + context: Context for the current turn of conversation with the user. + """ + self._context = context + self._sequence_number = 1 + self._stream_id: Optional[str] = None + self._message = "" + self._attachments: Optional[List[Attachment]] = None + self._ended = False + self._cancelled = False + + # Queue for outgoing activities + self._queue: List[Callable[[], Activity]] = [] + self._queue_sync: Optional[asyncio.Task] = None + self._chunk_queued = False + + # Powered by AI feature flags + self._enable_feedback_loop = False + self._feedback_loop_type: Optional[Literal["default", "custom"]] = None + self._enable_generated_by_ai_label = False + self._citations: Optional[List[ClientCitation]] = [] + self._sensitivity_label: Optional[SensitivityUsageInfo] = None + + # Channel information + self._is_streaming_channel: bool = False + self._channel_id: Channels = None + self._interval: float = 0.1 # Default interval for sending updates + self._set_defaults(context) + + @property + def stream_id(self) -> Optional[str]: + """ + Gets the stream ID of the current response. + Assigned after the initial update is sent. + """ + return self._stream_id + + @property + def citations(self) -> Optional[List[ClientCitation]]: + """Gets the citations of the current response.""" + return self._citations + + @property + def updates_sent(self) -> int: + """Gets the number of updates sent for the stream.""" + return self._sequence_number - 1 + + def queue_informative_update(self, text: str) -> None: + """ + Queues an informative update to be sent to the client. + + Args: + text: Text of the update to send. + """ + if not self._is_streaming_channel: + return + + if self._ended: + raise RuntimeError("The stream has already ended.") + + # Queue a typing activity + def create_activity(): + activity = Activity( + type="typing", + text=text, + entities=[ + Entity( + type="streaminfo", + stream_type="informative", + stream_sequence=self._sequence_number, + ) + ], + ) + self._sequence_number += 1 + return activity + + self._queue_activity(create_activity) + + def queue_text_chunk( + self, text: str, citations: Optional[List[Citation]] = None + ) -> None: + """ + Queues a chunk of partial message text to be sent to the client. + + The text will be sent as quickly as possible to the client. + Chunks may be combined before delivery to the client. + + Args: + text: Partial text of the message to send. + citations: Citations to be included in the message. + """ + if self._cancelled: + return + if self._ended: + raise RuntimeError("The stream has already ended.") + + # Update full message text + self._message += text + + # If there are citations, modify the content so that the sources are numbers instead of [doc1], [doc2], etc. + self._message = CitationUtil.format_citations_response(self._message) + + # Queue the next chunk + self._queue_next_chunk() + + async def end_stream(self) -> None: + """ + Ends the stream by sending the final message to the client. + """ + if self._ended: + raise RuntimeError("The stream has already ended.") + + # Queue final message + self._ended = True + self._queue_next_chunk() + + # Wait for the queue to drain + await self.wait_for_queue() + + def set_attachments(self, attachments: List[Attachment]) -> None: + """ + Sets the attachments to attach to the final chunk. + + Args: + attachments: List of attachments. + """ + self._attachments = attachments + + def set_sensitivity_label(self, sensitivity_label: SensitivityUsageInfo) -> None: + """ + Sets the sensitivity label to attach to the final chunk. + + Args: + sensitivity_label: The sensitivity label. + """ + self._sensitivity_label = sensitivity_label + + def set_citations(self, citations: List[Citation]) -> None: + """ + Sets the citations for the full message. + + Args: + citations: Citations to be included in the message. + """ + if citations: + if not self._citations: + self._citations = [] + + curr_pos = len(self._citations) + + for citation in citations: + client_citation = ClientCitation( + type="Claim", + position=curr_pos + 1, + appearance={ + "type": "DigitalDocument", + "name": citation.title or f"Document #{curr_pos + 1}", + "abstract": CitationUtil.snippet(citation.content, 477), + }, + ) + curr_pos += 1 + self._citations.append(client_citation) + + def set_feedback_loop(self, enable_feedback_loop: bool) -> None: + """ + Sets the Feedback Loop in Teams that allows a user to + give thumbs up or down to a response. + Default is False. + + Args: + enable_feedback_loop: If true, the feedback loop is enabled. + """ + self._enable_feedback_loop = enable_feedback_loop + + def set_feedback_loop_type( + self, feedback_loop_type: Literal["default", "custom"] + ) -> None: + """ + Sets the type of UI to use for the feedback loop. + + Args: + feedback_loop_type: The type of the feedback loop. + """ + self._feedback_loop_type = feedback_loop_type + + def set_generated_by_ai_label(self, enable_generated_by_ai_label: bool) -> None: + """ + Sets the Generated by AI label in Teams. + Default is False. + + Args: + enable_generated_by_ai_label: If true, the label is added. + """ + self._enable_generated_by_ai_label = enable_generated_by_ai_label + + def get_message(self) -> str: + """ + Returns the most recently streamed message. + """ + return self._message + + async def wait_for_queue(self) -> None: + """ + Waits for the outgoing activity queue to be empty. + """ + if self._queue_sync: + await self._queue_sync + + def _set_defaults(self, context: "TurnContext"): + if Channels.ms_teams == context.activity.channel_id.channel: + self._is_streaming_channel = True + self._interval = 1.0 + elif Channels.direct_line == context.activity.channel_id.channel: + self._is_streaming_channel = True + self._interval = 0.5 + elif context.activity.delivery_mode == DeliveryModes.stream: + self._is_streaming_channel = True + self._interval = 0.1 + + self._channel_id = context.activity.channel_id + + def _queue_next_chunk(self) -> None: + """ + Queues the next chunk of text to be sent to the client. + """ + # Are we already waiting to send a chunk? + if self._chunk_queued: + return + + # Queue a chunk of text to be sent + self._chunk_queued = True + + def create_activity(): + self._chunk_queued = False + if self._ended: + # Send final message + activity = Activity( + type="message", + text=self._message or "end stream response", + attachments=self._attachments or [], + entities=[ + Entity( + type="streaminfo", + stream_id=self._stream_id, + stream_type="final", + stream_sequence=self._sequence_number, + ) + ], + ) + elif self._is_streaming_channel: + # Send typing activity + activity = Activity( + type="typing", + text=self._message, + entities=[ + Entity( + type="streaminfo", + stream_type="streaming", + stream_sequence=self._sequence_number, + ) + ], + ) + else: + return + self._sequence_number += 1 + return activity + + self._queue_activity(create_activity) + + def _queue_activity(self, factory: Callable[[], Activity]) -> None: + """ + Queues an activity to be sent to the client. + """ + self._queue.append(factory) + + # If there's no sync in progress, start one + if not self._queue_sync: + self._queue_sync = asyncio.create_task(self._drain_queue()) + + async def _drain_queue(self) -> None: + """ + Sends any queued activities to the client until the queue is empty. + """ + try: + logger.debug(f"Draining queue with {len(self._queue)} activities.") + while self._queue: + factory = self._queue.pop(0) + activity = factory() + if activity: + await self._send_activity(activity) + except Exception as err: + if ( + "403" in str(err) + and self._context.activity.channel_id == Channels.ms_teams + ): + logger.warning("Teams channel stopped the stream.") + self._cancelled = True + else: + logger.error( + f"Error occurred when sending activity while streaming: {err}" + ) + raise + finally: + self._queue_sync = None + + async def _send_activity(self, activity: Activity) -> None: + """ + Sends an activity to the client and saves the stream ID returned. + + Args: + activity: The activity to send. + """ + + streaminfo_entity = None + + if not activity.entities: + streaminfo_entity = Entity(type="streaminfo") + activity.entities = [streaminfo_entity] + else: + for entity in activity.entities: + if hasattr(entity, "type") and entity.type == "streaminfo": + streaminfo_entity = entity + break + + if not streaminfo_entity: + # If no streaminfo entity exists, create one + streaminfo_entity = Entity(type="streaminfo") + activity.entities.append(streaminfo_entity) + + # Set activity ID to the assigned stream ID + if self._stream_id: + activity.id = self._stream_id + streaminfo_entity.stream_id = self._stream_id + + if self._citations and len(self._citations) > 0 and not self._ended: + # Filter out the citations unused in content. + curr_citations = CitationUtil.get_used_citations( + self._message, self._citations + ) + if curr_citations: + activity.entities.append( + Entity( + type="https://schema.org/Message", + schema_type="Message", + context="https://schema.org", + id="", + citation=curr_citations, + ) + ) + + # Add in Powered by AI feature flags + if self._ended: + if self._enable_feedback_loop and self._feedback_loop_type: + # Add feedback loop to streaminfo entity + streaminfo_entity.feedback_loop = {"type": self._feedback_loop_type} + else: + # Add feedback loop enabled to streaminfo entity + streaminfo_entity.feedback_loop_enabled = self._enable_feedback_loop + # Add in Generated by AI + if self._enable_generated_by_ai_label: + activity.add_ai_metadata(self._citations, self._sensitivity_label) + + # Send activity + response = await self._context.send_activity(activity) + await asyncio.sleep(self._interval) + + # Save assigned stream ID + if not self._stream_id and response: + self._stream_id = response.id diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http/__init__.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http/__init__.py new file mode 100644 index 00000000..84500210 --- /dev/null +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http/__init__.py @@ -0,0 +1,17 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +"""HTTP abstractions for framework-agnostic adapter implementations.""" + +from ._http_request_protocol import HttpRequestProtocol +from ._http_response import HttpResponse, HttpResponseFactory +from ._http_adapter_base import HttpAdapterBase +from ._channel_service_routes import ChannelServiceRoutes + +__all__ = [ + "HttpRequestProtocol", + "HttpResponse", + "HttpResponseFactory", + "HttpAdapterBase", + "ChannelServiceRoutes", +] diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http/_channel_service_routes.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http/_channel_service_routes.py new file mode 100644 index 00000000..16adf381 --- /dev/null +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http/_channel_service_routes.py @@ -0,0 +1,202 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +"""Channel service route definitions (framework-agnostic logic).""" + +from typing import Type, List, Union + +from microsoft_agents.activity import ( + AgentsModel, + Activity, + AttachmentData, + ConversationParameters, + Transcript, +) +from microsoft_agents.hosting.core import ChannelApiHandlerProtocol + +from ._http_request_protocol import HttpRequestProtocol + + +class ChannelServiceRoutes: + """Defines the Channel Service API routes and their handlers. + + This class provides framework-agnostic route logic that can be + adapted to different web frameworks (aiohttp, FastAPI, etc.). + """ + + def __init__(self, handler: ChannelApiHandlerProtocol, base_url: str = ""): + """Initialize channel service routes. + + Args: + handler: The handler that implements the Channel API protocol. + base_url: Optional base URL prefix for all routes. + """ + self.handler = handler + self.base_url = base_url + + @staticmethod + async def deserialize_from_body( + request: HttpRequestProtocol, target_model: Type[AgentsModel] + ) -> AgentsModel: + """Deserialize request body to target model.""" + content_type = request.headers.get("Content-Type", "") + if "application/json" not in content_type: + raise ValueError("Content-Type must be application/json") + + body = await request.json() + return target_model.model_validate(body) + + @staticmethod + def serialize_model(model_or_list: Union[AgentsModel, List[AgentsModel]]) -> dict: + """Serialize model or list of models to JSON-compatible dict.""" + if isinstance(model_or_list, AgentsModel): + return model_or_list.model_dump( + mode="json", exclude_unset=True, by_alias=True + ) + else: + return [ + model.model_dump(mode="json", exclude_unset=True, by_alias=True) + for model in model_or_list + ] + + # Route handler methods + async def send_to_conversation(self, request: HttpRequestProtocol) -> dict: + """Handle POST /v3/conversations/{conversation_id}/activities.""" + activity = await self.deserialize_from_body(request, Activity) + conversation_id = request.get_path_param("conversation_id") + result = await self.handler.on_send_to_conversation( + request.get_claims_identity(), + conversation_id, + activity, + ) + return self.serialize_model(result) + + async def reply_to_activity(self, request: HttpRequestProtocol) -> dict: + """Handle POST /v3/conversations/{conversation_id}/activities/{activity_id}.""" + activity = await self.deserialize_from_body(request, Activity) + conversation_id = request.get_path_param("conversation_id") + activity_id = request.get_path_param("activity_id") + result = await self.handler.on_reply_to_activity( + request.get_claims_identity(), + conversation_id, + activity_id, + activity, + ) + return self.serialize_model(result) + + async def update_activity(self, request: HttpRequestProtocol) -> dict: + """Handle PUT /v3/conversations/{conversation_id}/activities/{activity_id}.""" + activity = await self.deserialize_from_body(request, Activity) + conversation_id = request.get_path_param("conversation_id") + activity_id = request.get_path_param("activity_id") + result = await self.handler.on_update_activity( + request.get_claims_identity(), + conversation_id, + activity_id, + activity, + ) + return self.serialize_model(result) + + async def delete_activity(self, request: HttpRequestProtocol) -> None: + """Handle DELETE /v3/conversations/{conversation_id}/activities/{activity_id}.""" + conversation_id = request.get_path_param("conversation_id") + activity_id = request.get_path_param("activity_id") + await self.handler.on_delete_activity( + request.get_claims_identity(), + conversation_id, + activity_id, + ) + + async def get_activity_members(self, request: HttpRequestProtocol) -> dict: + """Handle GET /v3/conversations/{conversation_id}/activities/{activity_id}/members.""" + conversation_id = request.get_path_param("conversation_id") + activity_id = request.get_path_param("activity_id") + result = await self.handler.on_get_activity_members( + request.get_claims_identity(), + conversation_id, + activity_id, + ) + return self.serialize_model(result) + + async def create_conversation(self, request: HttpRequestProtocol) -> dict: + """Handle POST /.""" + conversation_parameters = await self.deserialize_from_body( + request, ConversationParameters + ) + result = await self.handler.on_create_conversation( + request.get_claims_identity(), conversation_parameters + ) + return self.serialize_model(result) + + async def get_conversations(self, request: HttpRequestProtocol) -> dict: + """Handle GET /.""" + # TODO: continuation token? conversation_id? + result = await self.handler.on_get_conversations( + request.get_claims_identity(), None + ) + return self.serialize_model(result) + + async def get_conversation_members(self, request: HttpRequestProtocol) -> dict: + """Handle GET /v3/conversations/{conversation_id}/members.""" + conversation_id = request.get_path_param("conversation_id") + result = await self.handler.on_get_conversation_members( + request.get_claims_identity(), + conversation_id, + ) + return self.serialize_model(result) + + async def get_conversation_member(self, request: HttpRequestProtocol) -> dict: + """Handle GET /v3/conversations/{conversation_id}/members/{member_id}.""" + conversation_id = request.get_path_param("conversation_id") + member_id = request.get_path_param("member_id") + result = await self.handler.on_get_conversation_member( + request.get_claims_identity(), + member_id, + conversation_id, + ) + return self.serialize_model(result) + + async def get_conversation_paged_members( + self, request: HttpRequestProtocol + ) -> dict: + """Handle GET /v3/conversations/{conversation_id}/pagedmembers.""" + conversation_id = request.get_path_param("conversation_id") + # TODO: continuation token? page size? + result = await self.handler.on_get_conversation_paged_members( + request.get_claims_identity(), + conversation_id, + ) + return self.serialize_model(result) + + async def delete_conversation_member(self, request: HttpRequestProtocol) -> dict: + """Handle DELETE /v3/conversations/{conversation_id}/members/{member_id}.""" + conversation_id = request.get_path_param("conversation_id") + member_id = request.get_path_param("member_id") + result = await self.handler.on_delete_conversation_member( + request.get_claims_identity(), + conversation_id, + member_id, + ) + return self.serialize_model(result) + + async def send_conversation_history(self, request: HttpRequestProtocol) -> dict: + """Handle POST /v3/conversations/{conversation_id}/activities/history.""" + conversation_id = request.get_path_param("conversation_id") + transcript = await self.deserialize_from_body(request, Transcript) + result = await self.handler.on_send_conversation_history( + request.get_claims_identity(), + conversation_id, + transcript, + ) + return self.serialize_model(result) + + async def upload_attachment(self, request: HttpRequestProtocol) -> dict: + """Handle POST /v3/conversations/{conversation_id}/attachments.""" + conversation_id = request.get_path_param("conversation_id") + attachment_data = await self.deserialize_from_body(request, AttachmentData) + result = await self.handler.on_upload_attachment( + request.get_claims_identity(), + conversation_id, + attachment_data, + ) + return self.serialize_model(result) diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http/_http_adapter_base.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http/_http_adapter_base.py new file mode 100644 index 00000000..3061d3e2 --- /dev/null +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http/_http_adapter_base.py @@ -0,0 +1,137 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +"""Base HTTP adapter with shared processing logic.""" + +from abc import ABC +from traceback import format_exc +from typing import Optional + +from microsoft_agents.activity import Activity, DeliveryModes +from microsoft_agents.hosting.core.authorization import ClaimsIdentity, Connections +from microsoft_agents.hosting.core import ( + Agent, + ChannelServiceAdapter, + ChannelServiceClientFactoryBase, + MessageFactory, + RestChannelServiceClientFactory, + TurnContext, +) + +from ._http_request_protocol import HttpRequestProtocol +from ._http_response import HttpResponse, HttpResponseFactory + + +class HttpAdapterBase(ChannelServiceAdapter, ABC): + """Base adapter for HTTP-based agent hosting with shared processing logic. + + This class contains all the common logic for processing HTTP requests + and can be subclassed by framework-specific adapters (aiohttp, FastAPI, etc). + """ + + def __init__( + self, + *, + connection_manager: Connections = None, + channel_service_client_factory: ChannelServiceClientFactoryBase = None, + ): + """Initialize the HTTP adapter. + + Args: + connection_manager: Optional connection manager for OAuth. + channel_service_client_factory: Factory for creating channel service clients. + """ + + async def on_turn_error(context: TurnContext, error: Exception): + error_message = f"Exception caught : {error}" + print(format_exc()) + + await context.send_activity(MessageFactory.text(error_message)) + + # Send a trace activity + await context.send_trace_activity( + "OnTurnError Trace", + error_message, + "https://www.botframework.com/schemas/error", + "TurnError", + ) + + self.on_turn_error = on_turn_error + + channel_service_client_factory = ( + channel_service_client_factory + or RestChannelServiceClientFactory(connection_manager) + ) + + super().__init__(channel_service_client_factory) + + async def process_request( + self, request: HttpRequestProtocol, agent: Agent + ) -> HttpResponse: + """Process an incoming HTTP request. + + Args: + request: The HTTP request to process. + agent: The agent to handle the request. + + Returns: + HttpResponse with the result. + + Raises: + TypeError: If request or agent is None. + """ + if not request: + raise TypeError("HttpAdapterBase.process_request: request can't be None") + if not agent: + raise TypeError("HttpAdapterBase.process_request: agent can't be None") + + if request.method != "POST": + return HttpResponseFactory.method_not_allowed() + + # Deserialize the incoming Activity + content_type = request.headers.get("Content-Type", "") + if "application/json" not in content_type: + return HttpResponseFactory.unsupported_media_type() + + try: + body = await request.json() + except Exception: + return HttpResponseFactory.bad_request("Invalid JSON") + + activity: Activity = Activity.model_validate(body) + + # Get claims identity (default to anonymous if not set by middleware) + claims_identity: ClaimsIdentity = ( + request.get_claims_identity() or ClaimsIdentity({}, False) + ) + + # Validate required activity fields + if ( + not activity.type + or not activity.conversation + or not activity.conversation.id + ): + return HttpResponseFactory.bad_request( + "Activity must have type and conversation.id" + ) + + try: + # Process the inbound activity with the agent + invoke_response = await self.process_activity( + claims_identity, activity, agent.on_turn + ) + + # Check if we need to return a synchronous response + if ( + activity.type == "invoke" + or activity.delivery_mode == DeliveryModes.expect_replies + ): + # Invoke and ExpectReplies cannot be performed async + return HttpResponseFactory.json( + invoke_response.body, invoke_response.status + ) + + return HttpResponseFactory.accepted() + + except PermissionError: + return HttpResponseFactory.unauthorized() diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http/_http_request_protocol.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http/_http_request_protocol.py new file mode 100644 index 00000000..f99dc1d8 --- /dev/null +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http/_http_request_protocol.py @@ -0,0 +1,36 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +"""Protocol for abstracting HTTP request objects across frameworks.""" + +from typing import Protocol, Dict, Any, Optional + + +class HttpRequestProtocol(Protocol): + """Protocol for HTTP requests that adapters must implement. + + This protocol defines the interface that framework-specific request + adapters must implement to work with the shared HTTP adapter logic. + """ + + @property + def method(self) -> str: + """HTTP method (GET, POST, etc.).""" + ... + + @property + def headers(self) -> Dict[str, str]: + """Request headers.""" + ... + + async def json(self) -> Dict[str, Any]: + """Parse request body as JSON.""" + ... + + def get_claims_identity(self) -> Optional[Any]: + """Get claims identity attached by auth middleware.""" + ... + + def get_path_param(self, name: str) -> str: + """Get path parameter by name.""" + ... diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http/_http_response.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http/_http_response.py new file mode 100644 index 00000000..d593cdee --- /dev/null +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http/_http_response.py @@ -0,0 +1,56 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +"""HTTP response abstraction.""" + +from typing import Any, Optional, Dict +from dataclasses import dataclass + + +@dataclass +class HttpResponse: + """Framework-agnostic HTTP response.""" + + status_code: int + body: Optional[Any] = None + headers: Optional[Dict[str, str]] = None + content_type: Optional[str] = "application/json" + + +class HttpResponseFactory: + """Factory for creating HTTP responses.""" + + @staticmethod + def ok(body: Any = None) -> HttpResponse: + """Create 200 OK response.""" + return HttpResponse(status_code=200, body=body) + + @staticmethod + def accepted() -> HttpResponse: + """Create 202 Accepted response.""" + return HttpResponse(status_code=202) + + @staticmethod + def json(body: Any, status_code: int = 200) -> HttpResponse: + """Create JSON response.""" + return HttpResponse(status_code=status_code, body=body) + + @staticmethod + def bad_request(message: str = "Bad Request") -> HttpResponse: + """Create 400 Bad Request response.""" + return HttpResponse(status_code=400, body={"error": message}) + + @staticmethod + def unauthorized(message: str = "Unauthorized") -> HttpResponse: + """Create 401 Unauthorized response.""" + return HttpResponse(status_code=401, body={"error": message}) + + @staticmethod + def method_not_allowed(message: str = "Method Not Allowed") -> HttpResponse: + """Create 405 Method Not Allowed response.""" + return HttpResponse(status_code=405, body={"error": message}) + + @staticmethod + def unsupported_media_type(message: str = "Unsupported Media Type") -> HttpResponse: + """Create 415 Unsupported Media Type response.""" + return HttpResponse(status_code=415, body={"error": message}) diff --git a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/__init__.py b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/__init__.py index c3064151..e72ee8d8 100644 --- a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/__init__.py +++ b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/__init__.py @@ -5,7 +5,9 @@ from .jwt_authorization_middleware import ( JwtAuthorizationMiddleware, ) -from .app.streaming import ( + +# Import streaming utilities from core for backward compatibility +from microsoft_agents.hosting.core.app.streaming import ( Citation, CitationUtil, StreamingResponse, diff --git a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/channel_service_route_table.py b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/channel_service_route_table.py index 2dd009fc..b8a44b5e 100644 --- a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/channel_service_route_table.py +++ b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/channel_service_route_table.py @@ -1,64 +1,58 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -import json -from typing import List, Union, Type -from fastapi import APIRouter, Request, Response, HTTPException, Depends +from fastapi import APIRouter, Request, Response from fastapi.responses import JSONResponse -from microsoft_agents.activity import ( - AgentsModel, - Activity, - AttachmentData, - ConversationParameters, - Transcript, -) from microsoft_agents.hosting.core import ChannelApiHandlerProtocol +from microsoft_agents.hosting.core.http import ChannelServiceRoutes -async def deserialize_from_body( - request: Request, target_model: Type[AgentsModel] -) -> AgentsModel: - content_type = request.headers.get("Content-Type", "") - if "application/json" in content_type: - body = await request.json() - else: - raise HTTPException(status_code=415, detail="Unsupported Media Type") +class FastApiRequestAdapter: + """Adapter for FastAPI requests to use with ChannelServiceRoutes.""" - return target_model.model_validate(body) + def __init__(self, request: Request): + self._request = request + @property + def method(self) -> str: + return self._request.method -def get_serialized_response( - model_or_list: Union[AgentsModel, List[AgentsModel]], -) -> JSONResponse: - if isinstance(model_or_list, AgentsModel): - json_obj = model_or_list.model_dump( - mode="json", exclude_unset=True, by_alias=True - ) - else: - json_obj = [ - model.model_dump(mode="json", exclude_unset=True, by_alias=True) - for model in model_or_list - ] + @property + def headers(self): + return self._request.headers + + async def json(self): + return await self._request.json() + + def get_claims_identity(self): + return getattr(self._request.state, "claims_identity", None) - return JSONResponse(content=json_obj) + def get_path_param(self, name: str) -> str: + return self._request.path_params.get(name, "") def channel_service_route_table( handler: ChannelApiHandlerProtocol, base_url: str = "" ) -> APIRouter: + """Create FastAPI router for Channel Service API. + + Args: + handler: The handler that implements the Channel API protocol. + base_url: Optional base URL prefix for all routes. + + Returns: + APIRouter with all channel service routes. + """ router = APIRouter() + service_routes = ChannelServiceRoutes(handler, base_url) @router.post(base_url + "/v3/conversations/{conversation_id}/activities") async def send_to_conversation(conversation_id: str, request: Request): - activity = await deserialize_from_body(request, Activity) - result = await handler.on_send_to_conversation( - getattr(request.state, "claims_identity", None), - conversation_id, - activity, + result = await service_routes.send_to_conversation( + FastApiRequestAdapter(request) ) - - return get_serialized_response(result) + return JSONResponse(content=result) @router.post( base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}" @@ -66,40 +60,21 @@ async def send_to_conversation(conversation_id: str, request: Request): async def reply_to_activity( conversation_id: str, activity_id: str, request: Request ): - activity = await deserialize_from_body(request, Activity) - result = await handler.on_reply_to_activity( - getattr(request.state, "claims_identity", None), - conversation_id, - activity_id, - activity, - ) - - return get_serialized_response(result) + result = await service_routes.reply_to_activity(FastApiRequestAdapter(request)) + return JSONResponse(content=result) @router.put( base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}" ) async def update_activity(conversation_id: str, activity_id: str, request: Request): - activity = await deserialize_from_body(request, Activity) - result = await handler.on_update_activity( - getattr(request.state, "claims_identity", None), - conversation_id, - activity_id, - activity, - ) - - return get_serialized_response(result) + result = await service_routes.update_activity(FastApiRequestAdapter(request)) + return JSONResponse(content=result) @router.delete( base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}" ) async def delete_activity(conversation_id: str, activity_id: str, request: Request): - await handler.on_delete_activity( - getattr(request.state, "claims_identity", None), - conversation_id, - activity_id, - ) - + await service_routes.delete_activity(FastApiRequestAdapter(request)) return Response(status_code=200) @router.get( @@ -109,97 +84,65 @@ async def delete_activity(conversation_id: str, activity_id: str, request: Reque async def get_activity_members( conversation_id: str, activity_id: str, request: Request ): - result = await handler.on_get_activity_members( - getattr(request.state, "claims_identity", None), - conversation_id, - activity_id, + result = await service_routes.get_activity_members( + FastApiRequestAdapter(request) ) - - return get_serialized_response(result) + return JSONResponse(content=result) @router.post(base_url + "/") async def create_conversation(request: Request): - conversation_parameters = await deserialize_from_body( - request, ConversationParameters - ) - result = await handler.on_create_conversation( - getattr(request.state, "claims_identity", None), conversation_parameters + result = await service_routes.create_conversation( + FastApiRequestAdapter(request) ) - - return get_serialized_response(result) + return JSONResponse(content=result) @router.get(base_url + "/") async def get_conversation(request: Request): - # TODO: continuation token? conversation_id? - result = await handler.on_get_conversations( - getattr(request.state, "claims_identity", None), None - ) - - return get_serialized_response(result) + result = await service_routes.get_conversations(FastApiRequestAdapter(request)) + return JSONResponse(content=result) @router.get(base_url + "/v3/conversations/{conversation_id}/members") async def get_conversation_members(conversation_id: str, request: Request): - result = await handler.on_get_conversation_members( - getattr(request.state, "claims_identity", None), - conversation_id, + result = await service_routes.get_conversation_members( + FastApiRequestAdapter(request) ) - - return get_serialized_response(result) + return JSONResponse(content=result) @router.get(base_url + "/v3/conversations/{conversation_id}/members/{member_id}") async def get_conversation_member( conversation_id: str, member_id: str, request: Request ): - result = await handler.on_get_conversation_member( - getattr(request.state, "claims_identity", None), - member_id, - conversation_id, + result = await service_routes.get_conversation_member( + FastApiRequestAdapter(request) ) - - return get_serialized_response(result) + return JSONResponse(content=result) @router.get(base_url + "/v3/conversations/{conversation_id}/pagedmembers") async def get_conversation_paged_members(conversation_id: str, request: Request): - # TODO: continuation token? page size? - result = await handler.on_get_conversation_paged_members( - getattr(request.state, "claims_identity", None), - conversation_id, + result = await service_routes.get_conversation_paged_members( + FastApiRequestAdapter(request) ) - - return get_serialized_response(result) + return JSONResponse(content=result) @router.delete(base_url + "/v3/conversations/{conversation_id}/members/{member_id}") async def delete_conversation_member( conversation_id: str, member_id: str, request: Request ): - result = await handler.on_delete_conversation_member( - getattr(request.state, "claims_identity", None), - conversation_id, - member_id, + result = await service_routes.delete_conversation_member( + FastApiRequestAdapter(request) ) - - return get_serialized_response(result) + return JSONResponse(content=result) @router.post(base_url + "/v3/conversations/{conversation_id}/activities/history") async def send_conversation_history(conversation_id: str, request: Request): - transcript = await deserialize_from_body(request, Transcript) - result = await handler.on_send_conversation_history( - getattr(request.state, "claims_identity", None), - conversation_id, - transcript, + result = await service_routes.send_conversation_history( + FastApiRequestAdapter(request) ) - - return get_serialized_response(result) + return JSONResponse(content=result) @router.post(base_url + "/v3/conversations/{conversation_id}/attachments") async def upload_attachment(conversation_id: str, request: Request): - attachment_data = await deserialize_from_body(request, AttachmentData) - result = await handler.on_upload_attachment( - getattr(request.state, "claims_identity", None), - conversation_id, - attachment_data, - ) - - return get_serialized_response(result) + result = await service_routes.upload_attachment(FastApiRequestAdapter(request)) + return JSONResponse(content=result) return router diff --git a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/cloud_adapter.py b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/cloud_adapter.py index 3383c793..17238ff3 100644 --- a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/cloud_adapter.py +++ b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/cloud_adapter.py @@ -1,31 +1,49 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -from traceback import format_exc from typing import Optional -from fastapi import Request, Response, HTTPException +from fastapi import Request, Response from fastapi.responses import JSONResponse -from microsoft_agents.hosting.core.authorization import ( - ClaimsIdentity, - Connections, -) -from microsoft_agents.activity import ( - Activity, - DeliveryModes, -) -from microsoft_agents.hosting.core import ( - Agent, - ChannelServiceAdapter, - ChannelServiceClientFactoryBase, - MessageFactory, - RestChannelServiceClientFactory, - TurnContext, + +from microsoft_agents.hosting.core import Agent +from microsoft_agents.hosting.core.authorization import Connections +from microsoft_agents.hosting.core.http import ( + HttpAdapterBase, + HttpRequestProtocol, + HttpResponse, ) +from microsoft_agents.hosting.core import ChannelServiceClientFactoryBase from .agent_http_adapter import AgentHttpAdapter -class CloudAdapter(ChannelServiceAdapter, AgentHttpAdapter): +class FastApiRequestAdapter: + """Adapter to make FastAPI Request compatible with HttpRequestProtocol.""" + + def __init__(self, request: Request): + self._request = request + + @property + def method(self) -> str: + return self._request.method + + @property + def headers(self): + return self._request.headers + + async def json(self): + return await self._request.json() + + def get_claims_identity(self): + return getattr(self._request.state, "claims_identity", None) + + def get_path_param(self, name: str) -> str: + return self._request.path_params.get(name, "") + + +class CloudAdapter(HttpAdapterBase, AgentHttpAdapter): + """CloudAdapter for FastAPI web framework.""" + def __init__( self, *, @@ -35,78 +53,43 @@ def __init__( """ Initializes a new instance of the CloudAdapter class. + :param connection_manager: Optional connection manager for OAuth. :param channel_service_client_factory: The factory to use to create the channel service client. """ + super().__init__( + connection_manager=connection_manager, + channel_service_client_factory=channel_service_client_factory, + ) - async def on_turn_error(context: TurnContext, error: Exception): - error_message = f"Exception caught : {error}" - print(format_exc()) + async def process(self, request: Request, agent: Agent) -> Optional[Response]: + """Process a FastAPI request. - await context.send_activity(MessageFactory.text(error_message)) + Args: + request: The FastAPI request. + agent: The agent to handle the request. - # Send a trace activity - await context.send_trace_activity( - "OnTurnError Trace", - error_message, - "https://www.botframework.com/schemas/error", - "TurnError", + Returns: + FastAPI Response object. + """ + # Adapt request to protocol + adapted_request = FastApiRequestAdapter(request) + + # Process using base implementation + http_response: HttpResponse = await self.process_request(adapted_request, agent) + + # Convert HttpResponse to FastAPI Response + return self._to_fastapi_response(http_response) + + @staticmethod + def _to_fastapi_response(http_response: HttpResponse) -> Response: + """Convert HttpResponse to FastAPI Response.""" + if http_response.body is not None: + return JSONResponse( + content=http_response.body, + status_code=http_response.status_code, + headers=http_response.headers, ) - - self.on_turn_error = on_turn_error - - channel_service_client_factory = ( - channel_service_client_factory - or RestChannelServiceClientFactory(connection_manager) + return Response( + status_code=http_response.status_code, + headers=http_response.headers, ) - - super().__init__(channel_service_client_factory) - - async def process(self, request: Request, agent: Agent) -> Optional[Response]: - if not request: - raise TypeError("CloudAdapter.process: request can't be None") - if not agent: - raise TypeError("CloudAdapter.process: agent can't be None") - - if request.method == "POST": - # Deserialize the incoming Activity - content_type = request.headers.get("Content-Type", "") - if "application/json" in content_type: - body = await request.json() - else: - raise HTTPException(status_code=415, detail="Unsupported Media Type") - - activity: Activity = Activity.model_validate(body) - - # default to anonymous identity with no claims - claims_identity: ClaimsIdentity = getattr( - request.state, "claims_identity", ClaimsIdentity({}, False) - ) - - # A POST request must contain an Activity - if ( - not activity.type - or not activity.conversation - or not activity.conversation.id - ): - raise HTTPException(status_code=400, detail="Bad Request") - - try: - # Process the inbound activity with the agent - invoke_response = await self.process_activity( - claims_identity, activity, agent.on_turn - ) - - if ( - activity.type == "invoke" - or activity.delivery_mode == DeliveryModes.expect_replies - ): - # Invoke and ExpectReplies cannot be performed async, the response must be written before the calling thread is released. - return JSONResponse( - content=invoke_response.body, status_code=invoke_response.status - ) - - return Response(status_code=202) - except PermissionError: - raise HTTPException(status_code=401, detail="Unauthorized") - else: - raise HTTPException(status_code=405, detail="Method Not Allowed") From a9809b56ef372ea95e20202dc45dc041916cff03 Mon Sep 17 00:00:00 2001 From: Axel Suarez Martinez Date: Tue, 18 Nov 2025 01:18:24 -0800 Subject: [PATCH 4/6] Cleaning unused references --- .../microsoft_agents/hosting/aiohttp/cloud_adapter.py | 1 - .../microsoft_agents/hosting/fastapi/cloud_adapter.py | 1 - test_samples/fastapi/authorization_agent.py | 2 +- 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/cloud_adapter.py b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/cloud_adapter.py index 963b4ac7..c384dd95 100644 --- a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/cloud_adapter.py +++ b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/cloud_adapter.py @@ -8,7 +8,6 @@ from microsoft_agents.hosting.core.authorization import Connections from microsoft_agents.hosting.core.http import ( HttpAdapterBase, - HttpRequestProtocol, HttpResponse, ) from microsoft_agents.hosting.core import ChannelServiceClientFactoryBase diff --git a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/cloud_adapter.py b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/cloud_adapter.py index 17238ff3..a94f81df 100644 --- a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/cloud_adapter.py +++ b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/cloud_adapter.py @@ -9,7 +9,6 @@ from microsoft_agents.hosting.core.authorization import Connections from microsoft_agents.hosting.core.http import ( HttpAdapterBase, - HttpRequestProtocol, HttpResponse, ) from microsoft_agents.hosting.core import ChannelServiceClientFactoryBase diff --git a/test_samples/fastapi/authorization_agent.py b/test_samples/fastapi/authorization_agent.py index b2265893..81c8bf1c 100644 --- a/test_samples/fastapi/authorization_agent.py +++ b/test_samples/fastapi/authorization_agent.py @@ -8,7 +8,7 @@ import uvicorn from dotenv import load_dotenv -from fastapi import FastAPI, Request, Depends +from fastapi import FastAPI, Request from microsoft_agents.hosting.core import ( Authorization, AgentApplication, From 688e2da8a8a62cdfdac44cf280b14c293961d0f4 Mon Sep 17 00:00:00 2001 From: Axel Suarez Date: Tue, 18 Nov 2025 14:17:57 -0800 Subject: [PATCH 5/6] Fix: minor comments --- .../hosting/aiohttp/app/streaming/streaming_response.py | 3 +-- .../microsoft_agents/hosting/core/http/_http_adapter_base.py | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/app/streaming/streaming_response.py b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/app/streaming/streaming_response.py index f495aa9c..05986cb1 100644 --- a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/app/streaming/streaming_response.py +++ b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/app/streaming/streaming_response.py @@ -3,8 +3,7 @@ import asyncio import logging -from typing import List, Optional, Callable, Literal, TYPE_CHECKING -from dataclasses import dataclass +from typing import List, Optional, Callable, Literal from microsoft_agents.activity import ( Activity, diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http/_http_adapter_base.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http/_http_adapter_base.py index 3061d3e2..55e2df56 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http/_http_adapter_base.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http/_http_adapter_base.py @@ -5,7 +5,6 @@ from abc import ABC from traceback import format_exc -from typing import Optional from microsoft_agents.activity import Activity, DeliveryModes from microsoft_agents.hosting.core.authorization import ClaimsIdentity, Connections From 0d55c4d1157b6045e958b0086bcc8851e7f28e7e Mon Sep 17 00:00:00 2001 From: Axel Suarez Date: Tue, 18 Nov 2025 14:24:57 -0800 Subject: [PATCH 6/6] Fix: minor comments --- .../hosting/core/app/streaming/streaming_response.py | 1 - 1 file changed, 1 deletion(-) diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/streaming/streaming_response.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/streaming/streaming_response.py index db57858b..2d5b0fbf 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/streaming/streaming_response.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/streaming/streaming_response.py @@ -4,7 +4,6 @@ import asyncio import logging from typing import List, Optional, Callable, Literal, TYPE_CHECKING -from dataclasses import dataclass from microsoft_agents.activity import ( Activity,