From 70dfef80afd036a9d7227c5b35981c2346d85528 Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Mon, 13 Apr 2026 23:53:44 -0700 Subject: [PATCH 1/4] fix: classify TransferToAgentTool transfers to RemoteA2aAgent as TRANSFER_A2A MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a RemoteA2aAgent is registered via sub_agents, the BigQuery analytics plugin now correctly classifies transfers to it as "TRANSFER_A2A" instead of the generic "TRANSFER_AGENT". Classification is call-level: a single TransferToAgentTool with mixed local and A2A targets produces the correct label per invocation based on the selected agent_name. The fix is localized to the plugin — no changes to TransferToAgentTool or the ADK core. The plugin resolves the target agent at runtime via the invocation context's agent tree. Fixes #5073 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../bigquery_agent_analytics_plugin.py | 58 ++++++- .../test_bigquery_agent_analytics_plugin.py | 145 ++++++++++++++++++ 2 files changed, 199 insertions(+), 4 deletions(-) diff --git a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py index 9bbe6d7b23..b572c47d99 100644 --- a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py +++ b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py @@ -168,16 +168,57 @@ def _format_content( return " | ".join(parts), truncated -def _get_tool_origin(tool: "BaseTool") -> str: +def _find_transfer_target(agent, agent_name: str): + """Find a transfer target agent by name in the accessible agent tree. + + Searches the current agent's sub-agents, parent, and peer agents + to locate the transfer target. + + Args: + agent: The current agent executing the transfer. + agent_name: The name of the transfer target to find. + + Returns: + The matching agent object, or None if not found. + """ + for sub in getattr(agent, "sub_agents", []): + if sub.name == agent_name: + return sub + parent = getattr(agent, "parent_agent", None) + if parent is not None and parent.name == agent_name: + return parent + if parent is not None: + for peer in getattr(parent, "sub_agents", []): + if peer.name == agent_name and peer.name != agent.name: + return peer + return None + + +def _get_tool_origin( + tool: "BaseTool", + tool_args: Optional[dict[str, Any]] = None, + tool_context: Optional["ToolContext"] = None, +) -> str: """Returns the provenance category of a tool. Uses lazy imports to avoid circular dependencies. + For ``TransferToAgentTool`` the classification is **call-level**: when + *tool_args* and *tool_context* are supplied the selected + ``agent_name`` is resolved against the agent tree so that transfers + to a ``RemoteA2aAgent`` are labelled ``TRANSFER_A2A`` rather than + the generic ``TRANSFER_AGENT``. + Args: tool: The tool instance. + tool_args: Optional tool arguments, used for call-level + classification of TransferToAgentTool. + tool_context: Optional tool context, used to access the agent + tree for TransferToAgentTool classification. Returns: - One of LOCAL, MCP, A2A, SUB_AGENT, TRANSFER_AGENT, or UNKNOWN. + One of LOCAL, MCP, A2A, SUB_AGENT, TRANSFER_AGENT, + TRANSFER_A2A, or UNKNOWN. """ # Import lazily to avoid circular dependencies. # pylint: disable=g-import-not-at-top @@ -199,6 +240,15 @@ def _get_tool_origin(tool: "BaseTool") -> str: if McpTool is not None and isinstance(tool, McpTool): return "MCP" if isinstance(tool, TransferToAgentTool): + if RemoteA2aAgent is not None and tool_args and tool_context: + agent_name = tool_args.get("agent_name") + if agent_name: + target = _find_transfer_target( + tool_context._invocation_context.agent, + agent_name, + ) + if target is not None and isinstance(target, RemoteA2aAgent): + return "TRANSFER_A2A" return "TRANSFER_AGENT" if isinstance(tool, AgentTool): if RemoteA2aAgent is not None and isinstance(tool.agent, RemoteA2aAgent): @@ -3228,7 +3278,7 @@ async def before_tool_callback( args_truncated, is_truncated = _recursive_smart_truncate( tool_args, self.config.max_content_length ) - tool_origin = _get_tool_origin(tool) + tool_origin = _get_tool_origin(tool, tool_args, tool_context) content_dict = { "tool": tool.name, "args": args_truncated, @@ -3262,7 +3312,7 @@ async def after_tool_callback( resp_truncated, is_truncated = _recursive_smart_truncate( result, self.config.max_content_length ) - tool_origin = _get_tool_origin(tool) + tool_origin = _get_tool_origin(tool, tool_args, tool_context) content_dict = { "tool": tool.name, "result": resp_truncated, diff --git a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py index 4dd986386f..bf06e60ed0 100644 --- a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py +++ b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py @@ -4486,6 +4486,151 @@ def test_transfer_tool_returns_transfer_agent(self): result = bigquery_agent_analytics_plugin._get_tool_origin(tool) assert result == "TRANSFER_AGENT" + def test_transfer_tool_without_args_returns_transfer_agent(self): + """TransferToAgentTool without tool_args falls back to TRANSFER_AGENT.""" + from google.adk.tools.transfer_to_agent_tool import TransferToAgentTool + + tool = TransferToAgentTool(agent_names=["remote_a2a"]) + result = bigquery_agent_analytics_plugin._get_tool_origin( + tool, tool_args=None, tool_context=None + ) + assert result == "TRANSFER_AGENT" + + def test_transfer_to_remote_a2a_sub_agent_returns_transfer_a2a(self): + """Transfer to a RemoteA2aAgent sub-agent is classified TRANSFER_A2A.""" + from google.adk.tools.transfer_to_agent_tool import TransferToAgentTool + + try: + from google.adk.agents.remote_a2a_agent import RemoteA2aAgent + except ImportError: + pytest.skip("A2A agent not available") + + remote_agent = mock.MagicMock(spec=RemoteA2aAgent) + remote_agent.name = "remote_a2a" + + current_agent = mock.MagicMock() + current_agent.name = "root" + current_agent.sub_agents = [remote_agent] + current_agent.parent_agent = None + + inv_ctx = mock.MagicMock() + inv_ctx.agent = current_agent + tool_context = mock.MagicMock() + tool_context._invocation_context = inv_ctx + + tool = TransferToAgentTool(agent_names=["remote_a2a"]) + result = bigquery_agent_analytics_plugin._get_tool_origin( + tool, + tool_args={"agent_name": "remote_a2a"}, + tool_context=tool_context, + ) + assert result == "TRANSFER_A2A" + + def test_transfer_to_local_sub_agent_returns_transfer_agent(self): + """Transfer to a local sub-agent is still classified TRANSFER_AGENT.""" + from google.adk.tools.transfer_to_agent_tool import TransferToAgentTool + + local_agent = mock.MagicMock() + local_agent.name = "local_sub" + + current_agent = mock.MagicMock() + current_agent.name = "root" + current_agent.sub_agents = [local_agent] + current_agent.parent_agent = None + + inv_ctx = mock.MagicMock() + inv_ctx.agent = current_agent + tool_context = mock.MagicMock() + tool_context._invocation_context = inv_ctx + + tool = TransferToAgentTool(agent_names=["local_sub"]) + result = bigquery_agent_analytics_plugin._get_tool_origin( + tool, + tool_args={"agent_name": "local_sub"}, + tool_context=tool_context, + ) + assert result == "TRANSFER_AGENT" + + def test_transfer_to_a2a_peer_returns_transfer_a2a(self): + """Transfer to a RemoteA2aAgent peer is classified TRANSFER_A2A.""" + from google.adk.tools.transfer_to_agent_tool import TransferToAgentTool + + try: + from google.adk.agents.remote_a2a_agent import RemoteA2aAgent + except ImportError: + pytest.skip("A2A agent not available") + + remote_peer = mock.MagicMock(spec=RemoteA2aAgent) + remote_peer.name = "remote_peer" + + current_agent = mock.MagicMock() + current_agent.name = "child" + current_agent.sub_agents = [] + + parent_agent = mock.MagicMock() + parent_agent.name = "parent" + parent_agent.sub_agents = [current_agent, remote_peer] + current_agent.parent_agent = parent_agent + + inv_ctx = mock.MagicMock() + inv_ctx.agent = current_agent + tool_context = mock.MagicMock() + tool_context._invocation_context = inv_ctx + + tool = TransferToAgentTool( + agent_names=["remote_peer"], + ) + result = bigquery_agent_analytics_plugin._get_tool_origin( + tool, + tool_args={"agent_name": "remote_peer"}, + tool_context=tool_context, + ) + assert result == "TRANSFER_A2A" + + def test_transfer_mixed_targets_classifies_per_call(self): + """A single TransferToAgentTool with mixed targets classifies per call.""" + from google.adk.tools.transfer_to_agent_tool import TransferToAgentTool + + try: + from google.adk.agents.remote_a2a_agent import RemoteA2aAgent + except ImportError: + pytest.skip("A2A agent not available") + + remote_agent = mock.MagicMock(spec=RemoteA2aAgent) + remote_agent.name = "remote_a2a" + local_agent = mock.MagicMock() + local_agent.name = "local_sub" + + current_agent = mock.MagicMock() + current_agent.name = "root" + current_agent.sub_agents = [remote_agent, local_agent] + current_agent.parent_agent = None + + inv_ctx = mock.MagicMock() + inv_ctx.agent = current_agent + tool_context = mock.MagicMock() + tool_context._invocation_context = inv_ctx + + tool = TransferToAgentTool( + agent_names=["remote_a2a", "local_sub"], + ) + + # Transfer to remote target → TRANSFER_A2A + result = bigquery_agent_analytics_plugin._get_tool_origin( + tool, + tool_args={"agent_name": "remote_a2a"}, + tool_context=tool_context, + ) + assert result == "TRANSFER_A2A" + + # Transfer to local target → TRANSFER_AGENT + result = bigquery_agent_analytics_plugin._get_tool_origin( + tool, + tool_args={"agent_name": "local_sub"}, + tool_context=tool_context, + ) + assert result == "TRANSFER_AGENT" + def test_mcp_tool_returns_mcp(self): try: from google.adk.tools.mcp_tool.mcp_tool import McpTool From 876547a8f30ec1fd9ae0ee1f9ed0c8aabe6f96cb Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Tue, 14 Apr 2026 00:04:39 -0700 Subject: [PATCH 2/4] fix: pass tool_args/tool_context to _get_tool_origin in on_tool_error_callback The error callback path was still calling _get_tool_origin(tool) without the tool_args and tool_context needed for call-level A2A classification, leaving failed remote A2A transfers misclassified as TRANSFER_AGENT. Adds a callback-level regression test that exercises on_tool_error_callback with a TransferToAgentTool targeting a RemoteA2aAgent and asserts the logged tool_origin is TRANSFER_A2A. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../bigquery_agent_analytics_plugin.py | 2 +- .../test_bigquery_agent_analytics_plugin.py | 77 +++++++++++++++++++ 2 files changed, 78 insertions(+), 1 deletion(-) diff --git a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py index b572c47d99..906b481d16 100644 --- a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py +++ b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py @@ -3357,7 +3357,7 @@ async def on_tool_error_callback( args_truncated, is_truncated = _recursive_smart_truncate( tool_args, self.config.max_content_length ) - tool_origin = _get_tool_origin(tool) + tool_origin = _get_tool_origin(tool, tool_args, tool_context) content_dict = { "tool": tool.name, "args": args_truncated, diff --git a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py index bf06e60ed0..8cfcfe439e 100644 --- a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py +++ b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py @@ -4631,6 +4631,83 @@ def test_transfer_mixed_targets_classifies_per_call(self): ) assert result == "TRANSFER_AGENT" + @pytest.mark.asyncio + async def test_tool_error_callback_classifies_a2a_transfer( + self, + mock_auth_default, + mock_bq_client, + mock_write_client, + mock_to_arrow_schema, + dummy_arrow_schema, + mock_asyncio_to_thread, + ): + """on_tool_error_callback produces TRANSFER_A2A for RemoteA2aAgent.""" + from google.adk.tools.transfer_to_agent_tool import TransferToAgentTool + + try: + from google.adk.agents.remote_a2a_agent import RemoteA2aAgent + except ImportError: + pytest.skip("A2A agent not available") + + remote_agent = mock.MagicMock(spec=RemoteA2aAgent) + remote_agent.name = "remote_a2a" + + mock_agent = mock.MagicMock(spec=base_agent.BaseAgent) + mock_agent.name = "root" + mock_agent.instruction = "" + mock_agent.sub_agents = [remote_agent] + mock_agent.parent_agent = None + + mock_s = mock.create_autospec( + session_lib.Session, instance=True, spec_set=True + ) + type(mock_s).id = mock.PropertyMock(return_value="sess-1") + type(mock_s).user_id = mock.PropertyMock(return_value="user-1") + type(mock_s).app_name = mock.PropertyMock(return_value="test_app") + type(mock_s).state = mock.PropertyMock(return_value={}) + + inv_ctx = InvocationContext( + agent=mock_agent, + session=mock_s, + invocation_id="inv-err", + session_service=mock.create_autospec( + base_session_service_lib.BaseSessionService, + instance=True, + spec_set=True, + ), + plugin_manager=mock.create_autospec( + plugin_manager_lib.PluginManager, + instance=True, + spec_set=True, + ), + ) + tool_ctx = tool_context_lib.ToolContext(invocation_context=inv_ctx) + tool = TransferToAgentTool(agent_names=["remote_a2a"]) + + async with managed_plugin( + PROJECT_ID, DATASET_ID, table_id=TABLE_ID + ) as plugin: + await plugin._ensure_started() + mock_write_client.append_rows.reset_mock() + + bigquery_agent_analytics_plugin.TraceManager.push_span(tool_ctx, "tool") + await plugin.on_tool_error_callback( + tool=tool, + tool_args={"agent_name": "remote_a2a"}, + tool_context=tool_ctx, + error=RuntimeError("connection refused"), + ) + await asyncio.sleep(0.01) + + rows = await _get_captured_rows_async( + mock_write_client, dummy_arrow_schema + ) + + assert len(rows) == 1 + assert rows[0]["event_type"] == "TOOL_ERROR" + content = json.loads(rows[0]["content"]) + assert content["tool_origin"] == "TRANSFER_A2A" + def test_mcp_tool_returns_mcp(self): try: from google.adk.tools.mcp_tool.mcp_tool import McpTool From 3585a2fc5f837ff5600082373d113c8d34f5208d Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Tue, 14 Apr 2026 11:44:43 -0700 Subject: [PATCH 3/4] fix: use plugin-stack span hierarchy for BQ rows and surface A2A metadata MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes two BigQuery analytics plugin issues: **#5310 — phantom parent_span_id references:** When OTel instrumentation was active, _resolve_ids() derived parent_span_id from ambient framework spans (execute_tool, call_llm, etc.) that are never written to BQ, creating dangling references. The fix flips priority: plugin-stack span_id/parent_span_id are now preferred, with ambient OTel only used as a fallback when the plugin stack has no span. All 6 post-pop callbacks now always pass both span_id_override and parent_span_id_override from the plugin stack, removing the has_ambient gating. **#5311 — A2A metadata not surfaced in BQ:** RemoteA2aAgent attaches correlation metadata (a2a:task_id, a2a:context_id, a2a:request, a2a:response) to event.custom_metadata, but on_event_callback() only logged STATE_DELTA and HITL events. The fix adds A2A_INTERACTION event detection: when an event carries a2a:request or a2a:response, it is logged with the truncated a2a:response as content and all a2a:* keys under attributes.a2a_metadata. Fixes #5310, Fixes #5311 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../bigquery_agent_analytics_plugin.py | 147 +++++---- .../test_bigquery_agent_analytics_plugin.py | 293 ++++++++++++++++-- 2 files changed, 359 insertions(+), 81 deletions(-) diff --git a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py index 906b481d16..f371694059 100644 --- a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py +++ b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py @@ -2602,17 +2602,34 @@ def _resolve_ids( ) -> tuple[Optional[str], Optional[str], Optional[str]]: """Resolves trace_id, span_id, and parent_span_id for a log row. + Resolution rules: + + * **trace_id** — ambient OTel trace wins (the plugin stack already + shares the ambient trace when initialised from an ambient span, + so in practice they agree). + * **span_id / parent_span_id** — the plugin's internal span stack + (``TraceManager``) is the preferred source. Ambient OTel spans + are only used as a fallback when the plugin stack has no span. + This ensures every ``parent_span_id`` in BigQuery references a + ``span_id`` that is also logged to BigQuery, producing a + self-consistent execution tree. + * **Explicit overrides** (``EventData``) always win last — they + are set by post-pop callbacks that have already captured the + correct plugin-stack values before the pop. + Priority order (highest first): - 1. Explicit ``EventData`` overrides (needed for post-pop callbacks). - 2. Ambient OTel span (the framework's ``start_as_current_span``). - When present this aligns BQ rows with Cloud Trace / o11y. - 3. Plugin's internal span stack (``TraceManager``). + 1. Explicit ``EventData`` overrides. + 2. Plugin's internal span stack (``TraceManager``) for + ``span_id`` / ``parent_span_id``. + 3. Ambient OTel span — always used for ``trace_id``; used for + ``span_id`` / ``parent_span_id`` only when the plugin stack + has no span. 4. ``invocation_id`` fallback for trace_id. Returns: (trace_id, span_id, parent_span_id) """ - # --- Layer 3: plugin stack baseline --- + # --- Plugin stack: span_id / parent_span_id baseline --- trace_id = TraceManager.get_trace_id(callback_context) plugin_span_id, plugin_parent_span_id = ( TraceManager.get_current_span_and_parent() @@ -2620,21 +2637,24 @@ def _resolve_ids( span_id = plugin_span_id parent_span_id = plugin_parent_span_id - # --- Layer 2: ambient OTel span --- + # --- Ambient OTel: trace_id always; span fallback only --- ambient = trace.get_current_span() ambient_ctx = ambient.get_span_context() if ambient_ctx.is_valid: trace_id = format(ambient_ctx.trace_id, "032x") - span_id = format(ambient_ctx.span_id, "016x") - # Reset parent — stale plugin-stack parent must not leak through - # when the ambient span is a root (no parent). - parent_span_id = None - # SDK spans expose .parent; non-recording spans do not. - parent_ctx = getattr(ambient, "parent", None) - if parent_ctx is not None and parent_ctx.span_id: - parent_span_id = format(parent_ctx.span_id, "016x") - - # --- Layer 1: explicit EventData overrides --- + # Only use ambient span IDs when the plugin stack has no span. + # Framework-internal spans (execute_tool, call_llm, etc.) are + # never written to BQ, so deriving parent_span_id from them + # creates phantom references. The plugin stack guarantees + # that both span_id and parent_span_id reference BQ rows. + if span_id is None: + span_id = format(ambient_ctx.span_id, "016x") + parent_span_id = None + parent_ctx = getattr(ambient, "parent", None) + if parent_ctx is not None and parent_ctx.span_id: + parent_span_id = format(parent_ctx.span_id, "016x") + + # --- Explicit EventData overrides (post-pop callbacks) --- if event_data.trace_id_override is not None: trace_id = event_data.trace_id_override if event_data.span_id_override is not None: @@ -2863,13 +2883,18 @@ async def on_event_callback( invocation_context: InvocationContext, event: "Event", ) -> None: - """Logs state changes and HITL events from the event stream. + """Logs state changes, HITL events, and A2A interactions. - Checks each event for a non-empty state_delta and logs it as a STATE_DELTA event. - Detects synthetic ``adk_request_*`` function calls (HITL pause events) and their corresponding function responses (HITL completions) and emits dedicated HITL event types. + - Detects events carrying A2A interaction metadata + (``a2a:request`` / ``a2a:response`` in ``custom_metadata``) + and logs them as ``A2A_INTERACTION`` events so the remote + agent's response and cross-reference IDs (``a2a:task_id``, + ``a2a:context_id``) are visible in BigQuery. The HITL detection must happen here (not in tool callbacks) because ``adk_request_credential``, ``adk_request_confirmation``, and @@ -2933,6 +2958,45 @@ async def on_event_callback( is_truncated=is_truncated, ) + # --- A2A interaction logging --- + # RemoteA2aAgent attaches cross-reference metadata to events: + # a2a:task_id, a2a:context_id — correlation keys + # a2a:request, a2a:response — full interaction payload + # Log an A2A_INTERACTION event when meaningful payload is present + # so the supervisor's BQ trace contains the remote agent's + # response and cross-reference IDs for JOINs. + meta = getattr(event, "custom_metadata", None) + if meta and ( + meta.get("a2a:request") is not None + or meta.get("a2a:response") is not None + ): + a2a_keys = {k: v for k, v in meta.items() if k.startswith("a2a:")} + a2a_truncated, is_truncated = _recursive_smart_truncate( + a2a_keys, self.config.max_content_length + ) + # Use the a2a:response as the event content when available, + # so the remote agent's answer is visible in the content + # column. + response_payload = a2a_keys.get("a2a:response") + content_dict = None + content_truncated = False + if response_payload is not None: + content_dict, content_truncated = _recursive_smart_truncate( + response_payload, + self.config.max_content_length, + ) + await self._log_event( + "A2A_INTERACTION", + callback_ctx, + raw_content=content_dict, + is_truncated=is_truncated or content_truncated, + event_data=EventData( + extra_attributes={ + "a2a_metadata": a2a_truncated, + }, + ), + ) + return None async def on_state_change_callback( @@ -2990,19 +3054,14 @@ async def after_run_callback( span_id, duration = TraceManager.pop_span() parent_span_id = TraceManager.get_current_span_id() - # Only override span IDs when no ambient OTel span exists. - # When ambient exists, _resolve_ids Layer 2 uses the framework's - # span IDs, keeping STARTING/COMPLETED pairs consistent. - has_ambient = trace.get_current_span().get_span_context().is_valid - await self._log_event( "INVOCATION_COMPLETED", callback_ctx, event_data=EventData( trace_id_override=trace_id, latency_ms=duration, - span_id_override=None if has_ambient else span_id, - parent_span_id_override=None if has_ambient else parent_span_id, + span_id_override=span_id, + parent_span_id_override=parent_span_id, ), ) finally: @@ -3045,18 +3104,13 @@ async def after_agent_callback( span_id, duration = TraceManager.pop_span() parent_span_id, _ = TraceManager.get_current_span_and_parent() - # Only override span IDs when no ambient OTel span exists. - # When ambient exists, _resolve_ids Layer 2 uses the framework's - # span IDs, keeping STARTING/COMPLETED pairs consistent. - has_ambient = trace.get_current_span().get_span_context().is_valid - await self._log_event( "AGENT_COMPLETED", callback_context, event_data=EventData( latency_ms=duration, - span_id_override=None if has_ambient else span_id, - parent_span_id_override=None if has_ambient else parent_span_id, + span_id_override=span_id, + parent_span_id_override=parent_span_id, ), ) @@ -3206,12 +3260,6 @@ async def after_model_callback( # Otherwise log_event will fetch current stack (which is parent). span_id = popped_span_id or span_id - # Only override span IDs when no ambient OTel span exists. - # When ambient exists, _resolve_ids Layer 2 uses the framework's - # span IDs, keeping LLM_REQUEST/LLM_RESPONSE pairs consistent. - has_ambient = trace.get_current_span().get_span_context().is_valid - use_override = is_popped and not has_ambient - await self._log_event( "LLM_RESPONSE", callback_context, @@ -3222,8 +3270,8 @@ async def after_model_callback( time_to_first_token_ms=tfft, model_version=llm_response.model_version, usage_metadata=llm_response.usage_metadata, - span_id_override=span_id if use_override else None, - parent_span_id_override=parent_span_id if use_override else None, + span_id_override=span_id if is_popped else None, + parent_span_id_override=(parent_span_id if is_popped else None), ), ) @@ -3245,9 +3293,6 @@ async def on_model_error_callback( span_id, duration = TraceManager.pop_span() parent_span_id, _ = TraceManager.get_current_span_and_parent() - # Only override span IDs when no ambient OTel span exists. - has_ambient = trace.get_current_span().get_span_context().is_valid - await self._log_event( "LLM_ERROR", callback_context, @@ -3255,8 +3300,8 @@ async def on_model_error_callback( status="ERROR", error_message=str(error), latency_ms=duration, - span_id_override=None if has_ambient else span_id, - parent_span_id_override=None if has_ambient else parent_span_id, + span_id_override=span_id, + parent_span_id_override=parent_span_id, ), ) @@ -3321,13 +3366,10 @@ async def after_tool_callback( span_id, duration = TraceManager.pop_span() parent_span_id, _ = TraceManager.get_current_span_and_parent() - # Only override span IDs when no ambient OTel span exists. - has_ambient = trace.get_current_span().get_span_context().is_valid - event_data = EventData( latency_ms=duration, - span_id_override=None if has_ambient else span_id, - parent_span_id_override=None if has_ambient else parent_span_id, + span_id_override=span_id, + parent_span_id_override=parent_span_id, ) await self._log_event( "TOOL_COMPLETED", @@ -3366,9 +3408,6 @@ async def on_tool_error_callback( span_id, duration = TraceManager.pop_span() parent_span_id, _ = TraceManager.get_current_span_and_parent() - # Only override span IDs when no ambient OTel span exists. - has_ambient = trace.get_current_span().get_span_context().is_valid - await self._log_event( "TOOL_ERROR", tool_context, @@ -3378,7 +3417,7 @@ async def on_tool_error_callback( status="ERROR", error_message=str(error), latency_ms=duration, - span_id_override=None if has_ambient else span_id, - parent_span_id_override=None if has_ambient else parent_span_id, + span_id_override=span_id, + parent_span_id_override=parent_span_id, ), ) diff --git a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py index 8cfcfe439e..46efbe6d62 100644 --- a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py +++ b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py @@ -3259,8 +3259,49 @@ def test_none_override_keeps_default(self, callback_context): assert span_id == "span-1" assert parent_id == "parent-1" - def test_ambient_otel_span_takes_priority(self, callback_context): - """When an ambient OTel span is valid, its IDs take priority.""" + def test_ambient_provides_trace_id_only_when_stack_present( + self, callback_context + ): + """Plugin stack owns span_id/parent; ambient only provides trace_id.""" + from opentelemetry.sdk.trace import TracerProvider as SdkProvider + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + + provider = SdkProvider() + provider.add_span_processor(SimpleSpanProcessor(InMemorySpanExporter())) + real_tracer = provider.get_tracer("test") + + ed = bigquery_agent_analytics_plugin.EventData() + + with real_tracer.start_as_current_span("invocation") as parent_span: + with real_tracer.start_as_current_span("agent") as agent_span: + ambient_ctx = agent_span.get_span_context() + expected_trace = format(ambient_ctx.trace_id, "032x") + + # Plugin stack has spans — these should win for span/parent. + with ( + mock.patch.object( + bigquery_agent_analytics_plugin.TraceManager, + "get_current_span_and_parent", + return_value=("plugin-span", "plugin-parent"), + ), + mock.patch.object( + bigquery_agent_analytics_plugin.TraceManager, + "get_trace_id", + return_value="plugin-trace", + ), + ): + trace_id, span_id, parent_id = self._resolve(ed, callback_context) + + # trace_id comes from ambient OTel. + assert trace_id == expected_trace + # span_id and parent_span_id come from plugin stack. + assert span_id == "plugin-span" + assert parent_id == "plugin-parent" + provider.shutdown() + + def test_ambient_fallback_when_no_plugin_stack(self, callback_context): + """Ambient OTel provides span_id/parent when plugin stack is empty.""" from opentelemetry.sdk.trace import TracerProvider as SdkProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter @@ -3278,7 +3319,20 @@ def test_ambient_otel_span_takes_priority(self, callback_context): expected_span = format(ambient_ctx.span_id, "016x") expected_parent = format(parent_span.get_span_context().span_id, "016x") - trace_id, span_id, parent_id = self._resolve(ed, callback_context) + # Plugin stack returns None — ambient is the fallback. + with ( + mock.patch.object( + bigquery_agent_analytics_plugin.TraceManager, + "get_current_span_and_parent", + return_value=(None, None), + ), + mock.patch.object( + bigquery_agent_analytics_plugin.TraceManager, + "get_trace_id", + return_value=None, + ), + ): + trace_id, span_id, parent_id = self._resolve(ed, callback_context) assert trace_id == expected_trace assert span_id == expected_span @@ -3309,8 +3363,8 @@ def test_override_beats_ambient(self, callback_context): assert parent_id == "forced-parent" provider.shutdown() - def test_ambient_root_span_no_self_parent(self, callback_context): - """Ambient root span (no parent) must not produce self-parent.""" + def test_plugin_stack_wins_over_ambient_root_span(self, callback_context): + """Plugin stack span is used even when ambient root span exists.""" from opentelemetry.sdk.trace import TracerProvider as SdkProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter @@ -3319,7 +3373,7 @@ def test_ambient_root_span_no_self_parent(self, callback_context): provider.add_span_processor(SimpleSpanProcessor(InMemorySpanExporter())) real_tracer = provider.get_tracer("test") - # Seed the plugin stack with a span so there's a stale parent. + # Seed the plugin stack with a span. bigquery_agent_analytics_plugin._span_records_ctx.set(None) with mock.patch.object( bigquery_agent_analytics_plugin, "tracer", real_tracer @@ -3328,29 +3382,68 @@ def test_ambient_root_span_no_self_parent(self, callback_context): callback_context, "plugin-child" ) + # Capture the plugin span_id that was pushed. + plugin_span_id, _ = ( + bigquery_agent_analytics_plugin.TraceManager.get_current_span_and_parent() + ) + ed = bigquery_agent_analytics_plugin.EventData() # Single root ambient span — no parent. with real_tracer.start_as_current_span("root_invocation") as root: trace_id, span_id, parent_id = self._resolve(ed, callback_context) - root_span_id = format(root.get_span_context().span_id, "016x") + ambient_trace = format(root.get_span_context().trace_id, "032x") - # span_id should be the ambient root's span_id - assert span_id == root_span_id - # parent must be None — not the stale plugin parent, not self + # trace_id comes from ambient. + assert trace_id == ambient_trace + # span_id comes from plugin stack, not ambient. + assert span_id == plugin_span_id + # parent is None — only one span in plugin stack. assert parent_id is None - assert span_id != parent_id # Cleanup bigquery_agent_analytics_plugin.TraceManager.pop_span() provider.shutdown() - def test_ambient_span_used_for_completed_event(self, callback_context): - """Completed event with overrides should use ambient when present. + def test_ambient_root_fallback_no_self_parent(self, callback_context): + """Ambient root span fallback must not produce self-parent.""" + from opentelemetry.sdk.trace import TracerProvider as SdkProvider + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + + provider = SdkProvider() + provider.add_span_processor(SimpleSpanProcessor(InMemorySpanExporter())) + real_tracer = provider.get_tracer("test") + + ed = bigquery_agent_analytics_plugin.EventData() - When an ambient OTel span is valid, passing None overrides lets - _resolve_ids Layer 2 pick the ambient span — matching the - STARTING event's span_id. + # Plugin stack empty — ambient provides the fallback. + with ( + mock.patch.object( + bigquery_agent_analytics_plugin.TraceManager, + "get_current_span_and_parent", + return_value=(None, None), + ), + mock.patch.object( + bigquery_agent_analytics_plugin.TraceManager, + "get_trace_id", + return_value=None, + ), + ): + with real_tracer.start_as_current_span("root") as root: + trace_id, span_id, parent_id = self._resolve(ed, callback_context) + root_span_id = format(root.get_span_context().span_id, "016x") + + assert span_id == root_span_id + assert parent_id is None + provider.shutdown() + + def test_plugin_stack_pairs_starting_completed(self, callback_context): + """STARTING/COMPLETED pairing uses plugin stack, not ambient. + + Post-pop callbacks now always pass explicit overrides from the + plugin stack. The plugin stack span_id is used for both events + regardless of ambient OTel state. """ from opentelemetry.sdk.trace import TracerProvider as SdkProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor @@ -3360,23 +3453,33 @@ def test_ambient_span_used_for_completed_event(self, callback_context): provider.add_span_processor(SimpleSpanProcessor(InMemorySpanExporter())) real_tracer = provider.get_tracer("test") - with real_tracer.start_as_current_span("invoke_agent") as agent_span: - expected_span = format(agent_span.get_span_context().span_id, "016x") - - # Simulate STARTING: no overrides → ambient Layer 2 wins. - ed_starting = bigquery_agent_analytics_plugin.EventData() - _, span_starting, _ = self._resolve(ed_starting, callback_context) + with real_tracer.start_as_current_span("invoke_agent"): + # Simulate STARTING: plugin stack provides span_id. + with ( + mock.patch.object( + bigquery_agent_analytics_plugin.TraceManager, + "get_current_span_and_parent", + return_value=("plugin-agent", "plugin-inv"), + ), + mock.patch.object( + bigquery_agent_analytics_plugin.TraceManager, + "get_trace_id", + return_value="plugin-trace", + ), + ): + ed_starting = bigquery_agent_analytics_plugin.EventData() + _, span_starting, _ = self._resolve(ed_starting, callback_context) - # Simulate COMPLETED: None overrides (ambient check passed). + # Simulate COMPLETED: explicit override from popped span. ed_completed = bigquery_agent_analytics_plugin.EventData( - span_id_override=None, - parent_span_id_override=None, + span_id_override="plugin-agent", + parent_span_id_override="plugin-inv", latency_ms=42, ) _, span_completed, _ = self._resolve(ed_completed, callback_context) - assert span_starting == expected_span - assert span_completed == expected_span + assert span_starting == "plugin-agent" + assert span_completed == "plugin-agent" assert span_starting == span_completed provider.shutdown() @@ -6894,3 +6997,139 @@ def _fake_run_coroutine_threadsafe(coro, loop): mock_rcts.assert_called() call_args = mock_rcts.call_args assert call_args[0][1] is other_loop + + +# ============================================================== +# TEST CLASS: A2A_INTERACTION event logging via on_event_callback +# ============================================================== +class TestA2AInteractionLogging: + """Tests for A2A interaction event emission via on_event_callback. + + When a RemoteA2aAgent processes a response, it attaches A2A + metadata (``a2a:task_id``, ``a2a:context_id``, ``a2a:request``, + ``a2a:response``) to the event's ``custom_metadata``. The + plugin's ``on_event_callback`` should detect events carrying + ``a2a:request`` or ``a2a:response`` and log an + ``A2A_INTERACTION`` event so the remote agent's response and + cross-reference IDs are visible in BigQuery. + """ + + @pytest.mark.asyncio + async def test_a2a_interaction_logged_for_response_metadata( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """Event with a2a:response in custom_metadata emits A2A_INTERACTION.""" + a2a_meta = { + "a2a:task_id": "task-abc", + "a2a:context_id": "ctx-123", + "a2a:response": {"status": "completed", "text": "result"}, + } + event = event_lib.Event( + author="remote_agent", + custom_metadata=a2a_meta, + ) + + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + result = await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + assert result is None + + await asyncio.sleep(0.05) + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + event_types = [r["event_type"] for r in rows] + assert "A2A_INTERACTION" in event_types + + a2a_row = next(r for r in rows if r["event_type"] == "A2A_INTERACTION") + attributes = json.loads(a2a_row["attributes"]) + assert "a2a_metadata" in attributes + assert attributes["a2a_metadata"]["a2a:task_id"] == "task-abc" + assert attributes["a2a_metadata"]["a2a:context_id"] == "ctx-123" + + # Content should contain the a2a:response payload. + content = json.loads(a2a_row["content"]) + assert content["status"] == "completed" + + @pytest.mark.asyncio + async def test_a2a_interaction_logged_for_request_metadata( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + dummy_arrow_schema, + ): + """Event with a2a:request (no a2a:response) emits A2A_INTERACTION.""" + a2a_meta = { + "a2a:task_id": "task-xyz", + "a2a:request": {"message": "hello"}, + } + event = event_lib.Event( + author="remote_agent", + custom_metadata=a2a_meta, + ) + + bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context) + result = await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + assert result is None + + await asyncio.sleep(0.05) + rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema) + event_types = [r["event_type"] for r in rows] + assert "A2A_INTERACTION" in event_types + + a2a_row = next(r for r in rows if r["event_type"] == "A2A_INTERACTION") + attributes = json.loads(a2a_row["attributes"]) + assert attributes["a2a_metadata"]["a2a:request"] == {"message": "hello"} + # No a2a:response → content should be None. + assert a2a_row["content"] is None + + @pytest.mark.asyncio + async def test_no_a2a_interaction_for_irrelevant_metadata( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + ): + """Events with only a2a:task_id (no request/response) are skipped.""" + a2a_meta = { + "a2a:task_id": "task-only", + "a2a:context_id": "ctx-only", + } + event = event_lib.Event( + author="remote_agent", + custom_metadata=a2a_meta, + ) + + result = await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + assert result is None + + await asyncio.sleep(0.05) + # No events logged — a2a:task_id alone is not a meaningful + # interaction payload. + assert mock_write_client.append_rows.call_count == 0 + + @pytest.mark.asyncio + async def test_no_a2a_interaction_for_no_metadata( + self, + bq_plugin_inst, + mock_write_client, + invocation_context, + ): + """Events without custom_metadata produce no A2A_INTERACTION.""" + event = event_lib.Event(author="regular_agent") + + result = await bq_plugin_inst.on_event_callback( + invocation_context=invocation_context, event=event + ) + assert result is None + + await asyncio.sleep(0.05) + assert mock_write_client.append_rows.call_count == 0 From 2a78dec8a10e2e03a64b5c233b44be73e6adc28b Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Tue, 14 Apr 2026 11:50:55 -0700 Subject: [PATCH 4/4] fix: add A2A_INTERACTION to _EVENT_VIEW_DEFS view registry Without this entry the plugin creates the raw BQ rows but never generates a typed per-event view for A2A_INTERACTION, so users consuming the plugin through its generated views would not see the new A2A rows or extracted fields. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../bigquery_agent_analytics_plugin.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py index f371694059..01b45074b7 100644 --- a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py +++ b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py @@ -1875,6 +1875,25 @@ def _get_events_schema() -> list[bigquery.SchemaField]: "JSON_VALUE(content, '$.tool') AS tool_name", "JSON_QUERY(content, '$.args') AS tool_args", ], + "A2A_INTERACTION": [ + "content AS response_content", + ( + "JSON_VALUE(attributes," + " '$.a2a_metadata.\"a2a:task_id\"') AS a2a_task_id" + ), + ( + "JSON_VALUE(attributes," + " '$.a2a_metadata.\"a2a:context_id\"') AS a2a_context_id" + ), + ( + "JSON_QUERY(attributes," + " '$.a2a_metadata.\"a2a:request\"') AS a2a_request" + ), + ( + "JSON_QUERY(attributes," + " '$.a2a_metadata.\"a2a:response\"') AS a2a_response" + ), + ], } _VIEW_SQL_TEMPLATE = """\