Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 167 additions & 59 deletions src/google/adk/plugins/bigquery_agent_analytics_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,16 +168,57 @@ def _format_content(
return " | ".join(parts), truncated


def _get_tool_origin(tool: "BaseTool") -> str:
def _find_transfer_target(agent, agent_name: str):
"""Find a transfer target agent by name in the accessible agent tree.

Searches the current agent's sub-agents, parent, and peer agents
to locate the transfer target.

Args:
agent: The current agent executing the transfer.
agent_name: The name of the transfer target to find.

Returns:
The matching agent object, or None if not found.
"""
for sub in getattr(agent, "sub_agents", []):
if sub.name == agent_name:
return sub
parent = getattr(agent, "parent_agent", None)
if parent is not None and parent.name == agent_name:
return parent
if parent is not None:
for peer in getattr(parent, "sub_agents", []):
if peer.name == agent_name and peer.name != agent.name:
return peer
return None


def _get_tool_origin(
tool: "BaseTool",
tool_args: Optional[dict[str, Any]] = None,
tool_context: Optional["ToolContext"] = None,
) -> str:
"""Returns the provenance category of a tool.

Uses lazy imports to avoid circular dependencies.

For ``TransferToAgentTool`` the classification is **call-level**: when
*tool_args* and *tool_context* are supplied the selected
``agent_name`` is resolved against the agent tree so that transfers
to a ``RemoteA2aAgent`` are labelled ``TRANSFER_A2A`` rather than
the generic ``TRANSFER_AGENT``.

Args:
tool: The tool instance.
tool_args: Optional tool arguments, used for call-level
classification of TransferToAgentTool.
tool_context: Optional tool context, used to access the agent
tree for TransferToAgentTool classification.

Returns:
One of LOCAL, MCP, A2A, SUB_AGENT, TRANSFER_AGENT, or UNKNOWN.
One of LOCAL, MCP, A2A, SUB_AGENT, TRANSFER_AGENT,
TRANSFER_A2A, or UNKNOWN.
"""
# Import lazily to avoid circular dependencies.
# pylint: disable=g-import-not-at-top
Expand All @@ -199,6 +240,15 @@ def _get_tool_origin(tool: "BaseTool") -> str:
if McpTool is not None and isinstance(tool, McpTool):
return "MCP"
if isinstance(tool, TransferToAgentTool):
if RemoteA2aAgent is not None and tool_args and tool_context:
agent_name = tool_args.get("agent_name")
if agent_name:
target = _find_transfer_target(
tool_context._invocation_context.agent,
agent_name,
)
if target is not None and isinstance(target, RemoteA2aAgent):
return "TRANSFER_A2A"
return "TRANSFER_AGENT"
if isinstance(tool, AgentTool):
if RemoteA2aAgent is not None and isinstance(tool.agent, RemoteA2aAgent):
Expand Down Expand Up @@ -1825,6 +1875,25 @@ def _get_events_schema() -> list[bigquery.SchemaField]:
"JSON_VALUE(content, '$.tool') AS tool_name",
"JSON_QUERY(content, '$.args') AS tool_args",
],
"A2A_INTERACTION": [
"content AS response_content",
(
"JSON_VALUE(attributes,"
" '$.a2a_metadata.\"a2a:task_id\"') AS a2a_task_id"
),
(
"JSON_VALUE(attributes,"
" '$.a2a_metadata.\"a2a:context_id\"') AS a2a_context_id"
),
(
"JSON_QUERY(attributes,"
" '$.a2a_metadata.\"a2a:request\"') AS a2a_request"
),
(
"JSON_QUERY(attributes,"
" '$.a2a_metadata.\"a2a:response\"') AS a2a_response"
),
],
}

_VIEW_SQL_TEMPLATE = """\
Expand Down Expand Up @@ -2552,39 +2621,59 @@ def _resolve_ids(
) -> tuple[Optional[str], Optional[str], Optional[str]]:
"""Resolves trace_id, span_id, and parent_span_id for a log row.

Resolution rules:

* **trace_id** — ambient OTel trace wins (the plugin stack already
shares the ambient trace when initialised from an ambient span,
so in practice they agree).
* **span_id / parent_span_id** — the plugin's internal span stack
(``TraceManager``) is the preferred source. Ambient OTel spans
are only used as a fallback when the plugin stack has no span.
This ensures every ``parent_span_id`` in BigQuery references a
``span_id`` that is also logged to BigQuery, producing a
self-consistent execution tree.
* **Explicit overrides** (``EventData``) always win last — they
are set by post-pop callbacks that have already captured the
correct plugin-stack values before the pop.

Priority order (highest first):
1. Explicit ``EventData`` overrides (needed for post-pop callbacks).
2. Ambient OTel span (the framework's ``start_as_current_span``).
When present this aligns BQ rows with Cloud Trace / o11y.
3. Plugin's internal span stack (``TraceManager``).
1. Explicit ``EventData`` overrides.
2. Plugin's internal span stack (``TraceManager``) for
``span_id`` / ``parent_span_id``.
3. Ambient OTel span — always used for ``trace_id``; used for
``span_id`` / ``parent_span_id`` only when the plugin stack
has no span.
4. ``invocation_id`` fallback for trace_id.

Returns:
(trace_id, span_id, parent_span_id)
"""
# --- Layer 3: plugin stack baseline ---
# --- Plugin stack: span_id / parent_span_id baseline ---
trace_id = TraceManager.get_trace_id(callback_context)
plugin_span_id, plugin_parent_span_id = (
TraceManager.get_current_span_and_parent()
)
span_id = plugin_span_id
parent_span_id = plugin_parent_span_id

# --- Layer 2: ambient OTel span ---
# --- Ambient OTel: trace_id always; span fallback only ---
ambient = trace.get_current_span()
ambient_ctx = ambient.get_span_context()
if ambient_ctx.is_valid:
trace_id = format(ambient_ctx.trace_id, "032x")
span_id = format(ambient_ctx.span_id, "016x")
# Reset parent — stale plugin-stack parent must not leak through
# when the ambient span is a root (no parent).
parent_span_id = None
# SDK spans expose .parent; non-recording spans do not.
parent_ctx = getattr(ambient, "parent", None)
if parent_ctx is not None and parent_ctx.span_id:
parent_span_id = format(parent_ctx.span_id, "016x")

# --- Layer 1: explicit EventData overrides ---
# Only use ambient span IDs when the plugin stack has no span.
# Framework-internal spans (execute_tool, call_llm, etc.) are
# never written to BQ, so deriving parent_span_id from them
# creates phantom references. The plugin stack guarantees
# that both span_id and parent_span_id reference BQ rows.
if span_id is None:
span_id = format(ambient_ctx.span_id, "016x")
parent_span_id = None
parent_ctx = getattr(ambient, "parent", None)
if parent_ctx is not None and parent_ctx.span_id:
parent_span_id = format(parent_ctx.span_id, "016x")

# --- Explicit EventData overrides (post-pop callbacks) ---
if event_data.trace_id_override is not None:
trace_id = event_data.trace_id_override
if event_data.span_id_override is not None:
Expand Down Expand Up @@ -2813,13 +2902,18 @@ async def on_event_callback(
invocation_context: InvocationContext,
event: "Event",
) -> None:
"""Logs state changes and HITL events from the event stream.
"""Logs state changes, HITL events, and A2A interactions.

- Checks each event for a non-empty state_delta and logs it as a
STATE_DELTA event.
- Detects synthetic ``adk_request_*`` function calls (HITL pause
events) and their corresponding function responses (HITL
completions) and emits dedicated HITL event types.
- Detects events carrying A2A interaction metadata
(``a2a:request`` / ``a2a:response`` in ``custom_metadata``)
and logs them as ``A2A_INTERACTION`` events so the remote
agent's response and cross-reference IDs (``a2a:task_id``,
``a2a:context_id``) are visible in BigQuery.

The HITL detection must happen here (not in tool callbacks) because
``adk_request_credential``, ``adk_request_confirmation``, and
Expand Down Expand Up @@ -2883,6 +2977,45 @@ async def on_event_callback(
is_truncated=is_truncated,
)

# --- A2A interaction logging ---
# RemoteA2aAgent attaches cross-reference metadata to events:
# a2a:task_id, a2a:context_id — correlation keys
# a2a:request, a2a:response — full interaction payload
# Log an A2A_INTERACTION event when meaningful payload is present
# so the supervisor's BQ trace contains the remote agent's
# response and cross-reference IDs for JOINs.
meta = getattr(event, "custom_metadata", None)
if meta and (
meta.get("a2a:request") is not None
or meta.get("a2a:response") is not None
):
a2a_keys = {k: v for k, v in meta.items() if k.startswith("a2a:")}
a2a_truncated, is_truncated = _recursive_smart_truncate(
a2a_keys, self.config.max_content_length
)
# Use the a2a:response as the event content when available,
# so the remote agent's answer is visible in the content
# column.
response_payload = a2a_keys.get("a2a:response")
content_dict = None
content_truncated = False
if response_payload is not None:
content_dict, content_truncated = _recursive_smart_truncate(
response_payload,
self.config.max_content_length,
)
await self._log_event(
"A2A_INTERACTION",
callback_ctx,
raw_content=content_dict,
is_truncated=is_truncated or content_truncated,
event_data=EventData(
extra_attributes={
"a2a_metadata": a2a_truncated,
},
),
)

return None

async def on_state_change_callback(
Expand Down Expand Up @@ -2940,19 +3073,14 @@ async def after_run_callback(
span_id, duration = TraceManager.pop_span()
parent_span_id = TraceManager.get_current_span_id()

# Only override span IDs when no ambient OTel span exists.
# When ambient exists, _resolve_ids Layer 2 uses the framework's
# span IDs, keeping STARTING/COMPLETED pairs consistent.
has_ambient = trace.get_current_span().get_span_context().is_valid

await self._log_event(
"INVOCATION_COMPLETED",
callback_ctx,
event_data=EventData(
trace_id_override=trace_id,
latency_ms=duration,
span_id_override=None if has_ambient else span_id,
parent_span_id_override=None if has_ambient else parent_span_id,
span_id_override=span_id,
parent_span_id_override=parent_span_id,
),
)
finally:
Expand Down Expand Up @@ -2995,18 +3123,13 @@ async def after_agent_callback(
span_id, duration = TraceManager.pop_span()
parent_span_id, _ = TraceManager.get_current_span_and_parent()

# Only override span IDs when no ambient OTel span exists.
# When ambient exists, _resolve_ids Layer 2 uses the framework's
# span IDs, keeping STARTING/COMPLETED pairs consistent.
has_ambient = trace.get_current_span().get_span_context().is_valid

await self._log_event(
"AGENT_COMPLETED",
callback_context,
event_data=EventData(
latency_ms=duration,
span_id_override=None if has_ambient else span_id,
parent_span_id_override=None if has_ambient else parent_span_id,
span_id_override=span_id,
parent_span_id_override=parent_span_id,
),
)

Expand Down Expand Up @@ -3156,12 +3279,6 @@ async def after_model_callback(
# Otherwise log_event will fetch current stack (which is parent).
span_id = popped_span_id or span_id

# Only override span IDs when no ambient OTel span exists.
# When ambient exists, _resolve_ids Layer 2 uses the framework's
# span IDs, keeping LLM_REQUEST/LLM_RESPONSE pairs consistent.
has_ambient = trace.get_current_span().get_span_context().is_valid
use_override = is_popped and not has_ambient

await self._log_event(
"LLM_RESPONSE",
callback_context,
Expand All @@ -3172,8 +3289,8 @@ async def after_model_callback(
time_to_first_token_ms=tfft,
model_version=llm_response.model_version,
usage_metadata=llm_response.usage_metadata,
span_id_override=span_id if use_override else None,
parent_span_id_override=parent_span_id if use_override else None,
span_id_override=span_id if is_popped else None,
parent_span_id_override=(parent_span_id if is_popped else None),
),
)

Expand All @@ -3195,18 +3312,15 @@ async def on_model_error_callback(
span_id, duration = TraceManager.pop_span()
parent_span_id, _ = TraceManager.get_current_span_and_parent()

# Only override span IDs when no ambient OTel span exists.
has_ambient = trace.get_current_span().get_span_context().is_valid

await self._log_event(
"LLM_ERROR",
callback_context,
event_data=EventData(
status="ERROR",
error_message=str(error),
latency_ms=duration,
span_id_override=None if has_ambient else span_id,
parent_span_id_override=None if has_ambient else parent_span_id,
span_id_override=span_id,
parent_span_id_override=parent_span_id,
),
)

Expand All @@ -3228,7 +3342,7 @@ async def before_tool_callback(
args_truncated, is_truncated = _recursive_smart_truncate(
tool_args, self.config.max_content_length
)
tool_origin = _get_tool_origin(tool)
tool_origin = _get_tool_origin(tool, tool_args, tool_context)
content_dict = {
"tool": tool.name,
"args": args_truncated,
Expand Down Expand Up @@ -3262,7 +3376,7 @@ async def after_tool_callback(
resp_truncated, is_truncated = _recursive_smart_truncate(
result, self.config.max_content_length
)
tool_origin = _get_tool_origin(tool)
tool_origin = _get_tool_origin(tool, tool_args, tool_context)
content_dict = {
"tool": tool.name,
"result": resp_truncated,
Expand All @@ -3271,13 +3385,10 @@ async def after_tool_callback(
span_id, duration = TraceManager.pop_span()
parent_span_id, _ = TraceManager.get_current_span_and_parent()

# Only override span IDs when no ambient OTel span exists.
has_ambient = trace.get_current_span().get_span_context().is_valid

event_data = EventData(
latency_ms=duration,
span_id_override=None if has_ambient else span_id,
parent_span_id_override=None if has_ambient else parent_span_id,
span_id_override=span_id,
parent_span_id_override=parent_span_id,
)
await self._log_event(
"TOOL_COMPLETED",
Expand Down Expand Up @@ -3307,7 +3418,7 @@ async def on_tool_error_callback(
args_truncated, is_truncated = _recursive_smart_truncate(
tool_args, self.config.max_content_length
)
tool_origin = _get_tool_origin(tool)
tool_origin = _get_tool_origin(tool, tool_args, tool_context)
content_dict = {
"tool": tool.name,
"args": args_truncated,
Expand All @@ -3316,9 +3427,6 @@ async def on_tool_error_callback(
span_id, duration = TraceManager.pop_span()
parent_span_id, _ = TraceManager.get_current_span_and_parent()

# Only override span IDs when no ambient OTel span exists.
has_ambient = trace.get_current_span().get_span_context().is_valid

await self._log_event(
"TOOL_ERROR",
tool_context,
Expand All @@ -3328,7 +3436,7 @@ async def on_tool_error_callback(
status="ERROR",
error_message=str(error),
latency_ms=duration,
span_id_override=None if has_ambient else span_id,
parent_span_id_override=None if has_ambient else parent_span_id,
span_id_override=span_id,
parent_span_id_override=parent_span_id,
),
)
Loading