From c3fee35fa8d4967a37a625c96de652fad6da0ee2 Mon Sep 17 00:00:00 2001 From: Will Killian Date: Mon, 18 May 2026 19:49:41 -0400 Subject: [PATCH 1/2] test: add ATIF trace correlation coverage Signed-off-by: Will Killian --- .../unit/observability/openinference_tests.rs | 87 +++++++++++++++++++ .../tests/unit/observability/otel_tests.rs | 86 ++++++++++++++++++ 2 files changed, 173 insertions(+) diff --git a/crates/core/tests/unit/observability/openinference_tests.rs b/crates/core/tests/unit/observability/openinference_tests.rs index 75552228..08e8f15b 100644 --- a/crates/core/tests/unit/observability/openinference_tests.rs +++ b/crates/core/tests/unit/observability/openinference_tests.rs @@ -15,6 +15,7 @@ use crate::api::scope::{event, pop_scope, push_scope}; use crate::api::tool::ToolAttributes; use crate::codec::response::{AnnotatedLlmResponse, Usage}; use crate::json::Json; +use crate::observability::atif::{AtifAgentInfo, AtifExporter, AtifStepExtra}; use opentelemetry_sdk::trace::InMemorySpanExporterBuilder; use serde_json::json; use std::collections::HashMap; @@ -805,6 +806,92 @@ fn preserves_parent_child_relationships() { assert!(!child.parent_span_is_remote); } +#[test] +fn atif_lineage_correlates_with_openinference_span_attributes() { + let (provider, exporter) = make_provider(); + let mut processor = + OpenInferenceEventProcessor::new(provider.clone(), "test-scope".to_string()); + + let agent_uuid = Uuid::now_v7(); + let llm_uuid = Uuid::now_v7(); + let atif_exporter = AtifExporter::new( + agent_uuid.to_string(), + AtifAgentInfo { + name: "test-agent".to_string(), + version: "1.0.0".to_string(), + model_name: None, + tool_definitions: None, + extra: None, + }, + ); + let atif_subscriber = atif_exporter.subscriber(); + + let events = vec![ + make_start_event(agent_uuid, None, "agent", ScopeType::Agent, None), + make_start_event( + llm_uuid, + Some(agent_uuid), + "model-call", + ScopeType::Llm, + Some(json!({"messages": [{"role": "user", "content": "hello"}]})), + ), + make_end_event( + llm_uuid, + Some(agent_uuid), + "model-call", + ScopeType::Llm, + Some(json!({"content": "hi", "role": "assistant"})), + ), + make_end_event(agent_uuid, None, "agent", ScopeType::Agent, None), + ]; + + for event in &events { + processor.process(event); + atif_subscriber(event); + } + processor.force_flush().unwrap(); + + let spans = exporter.get_finished_spans().unwrap(); + let agent_span = spans + .iter() + .find(|span| span.name.as_ref() == "agent") + .unwrap(); + let llm_span = spans + .iter() + .find(|span| span.name.as_ref() == "model-call") + .unwrap(); + let agent_attributes = attr_map(&agent_span.attributes); + let llm_attributes = attr_map(&llm_span.attributes); + + assert_eq!( + agent_attributes.get("nemo_flow.uuid"), + Some(&agent_uuid.to_string()) + ); + assert_eq!( + llm_attributes.get("nemo_flow.uuid"), + Some(&llm_uuid.to_string()) + ); + assert_eq!( + llm_attributes.get("nemo_flow.parent_uuid"), + Some(&agent_uuid.to_string()) + ); + + let trajectory = atif_exporter.export(); + assert_eq!(trajectory.session_id, agent_uuid.to_string()); + let agent_step = trajectory + .steps + .iter() + .find(|step| step.source == "agent") + .unwrap(); + let extra: AtifStepExtra = serde_json::from_value(agent_step.extra.clone().unwrap()).unwrap(); + + assert_eq!( + llm_attributes.get("nemo_flow.uuid"), + Some(&extra.ancestry.function_id) + ); + assert_eq!(extra.ancestry.parent_id, Some(trajectory.session_id)); +} + #[test] fn orphan_marks_become_zero_duration_spans() { let (provider, exporter) = make_provider(); diff --git a/crates/core/tests/unit/observability/otel_tests.rs b/crates/core/tests/unit/observability/otel_tests.rs index 4fe643a2..169307ae 100644 --- a/crates/core/tests/unit/observability/otel_tests.rs +++ b/crates/core/tests/unit/observability/otel_tests.rs @@ -14,6 +14,7 @@ use crate::api::scope::ScopeType; use crate::api::scope::{event, pop_scope, push_scope}; use crate::api::tool::ToolAttributes; use crate::json::Json; +use crate::observability::atif::{AtifAgentInfo, AtifExporter, AtifStepExtra}; use opentelemetry_sdk::trace::InMemorySpanExporterBuilder; use serde_json::json; use std::collections::HashMap; @@ -551,6 +552,91 @@ fn preserves_parent_child_relationships() { assert!(!child.parent_span_is_remote); } +#[test] +fn atif_lineage_correlates_with_otel_span_attributes() { + let (provider, exporter) = make_provider(); + let mut processor = OtelEventProcessor::new(provider.clone(), "test-scope".to_string()); + + let agent_uuid = Uuid::now_v7(); + let llm_uuid = Uuid::now_v7(); + let atif_exporter = AtifExporter::new( + agent_uuid.to_string(), + AtifAgentInfo { + name: "test-agent".to_string(), + version: "1.0.0".to_string(), + model_name: None, + tool_definitions: None, + extra: None, + }, + ); + let atif_subscriber = atif_exporter.subscriber(); + + let events = vec![ + make_start_event(agent_uuid, None, "agent", ScopeType::Agent, None), + make_start_event( + llm_uuid, + Some(agent_uuid), + "model-call", + ScopeType::Llm, + Some(json!({"messages": [{"role": "user", "content": "hello"}]})), + ), + make_end_event( + llm_uuid, + Some(agent_uuid), + "model-call", + ScopeType::Llm, + Some(json!({"content": "hi", "role": "assistant"})), + ), + make_end_event(agent_uuid, None, "agent", ScopeType::Agent, None), + ]; + + for event in &events { + processor.process(event); + atif_subscriber(event); + } + processor.force_flush().unwrap(); + + let spans = exporter.get_finished_spans().unwrap(); + let agent_span = spans + .iter() + .find(|span| span.name.as_ref() == "agent") + .unwrap(); + let llm_span = spans + .iter() + .find(|span| span.name.as_ref() == "model-call") + .unwrap(); + let agent_attributes = attr_map(&agent_span.attributes); + let llm_attributes = attr_map(&llm_span.attributes); + + assert_eq!( + agent_attributes.get("nemo_flow.uuid"), + Some(&agent_uuid.to_string()) + ); + assert_eq!( + llm_attributes.get("nemo_flow.uuid"), + Some(&llm_uuid.to_string()) + ); + assert_eq!( + llm_attributes.get("nemo_flow.parent_uuid"), + Some(&agent_uuid.to_string()) + ); + + let trajectory = atif_exporter.export(); + assert_eq!(trajectory.session_id, agent_uuid.to_string()); + let agent_step = trajectory + .steps + .iter() + .find(|step| step.source == "agent") + .unwrap(); + let extra: AtifStepExtra = serde_json::from_value(agent_step.extra.clone().unwrap()).unwrap(); + + assert_eq!( + llm_attributes.get("nemo_flow.uuid"), + Some(&extra.ancestry.function_id) + ); + assert_eq!(extra.ancestry.parent_id, Some(trajectory.session_id)); +} + #[test] fn orphan_marks_become_zero_duration_spans() { let (provider, exporter) = make_provider(); From 5cb555f06251ae15486596d8f50948309fa01aaa Mon Sep 17 00:00:00 2001 From: Will Killian Date: Mon, 18 May 2026 19:53:38 -0400 Subject: [PATCH 2/2] docs: document ATIF trace correlation Signed-off-by: Will Killian --- docs/plugins/observability/about.md | 14 ++++++++++++++ docs/plugins/observability/atif.md | 6 ++++++ docs/plugins/observability/openinference.md | 10 ++++++++-- docs/plugins/observability/opentelemetry.md | 6 ++++++ 4 files changed, 34 insertions(+), 2 deletions(-) diff --git a/docs/plugins/observability/about.md b/docs/plugins/observability/about.md index 2bbbd659..8215c22a 100644 --- a/docs/plugins/observability/about.md +++ b/docs/plugins/observability/about.md @@ -60,6 +60,20 @@ Choose the exporter based on the downstream system: Start with local event inspection before production export. Add sanitize guardrails before exporters receive sensitive payloads. +## Correlating Trajectories And Traces + +When ATIF and trace exporters observe the same NeMo Flow events, they share +NeMo Flow UUIDs for cross-format joins. Plugin-managed ATIF uses the top-level +agent scope UUID as the trajectory `session_id`. ATIF step lineage stores the +event UUID as `step.extra.ancestry.function_id` and the parent UUID as +`step.extra.ancestry.parent_id`. + +OpenTelemetry and OpenInference spans carry the same values as +`nemo_flow.uuid` and `nemo_flow.parent_uuid` span attributes. Mark events use +`nemo_flow.mark.uuid` and `nemo_flow.mark.parent_uuid`. Native backend +`trace_id` and `span_id` values are still generated by the tracing backend and +are not written into ATIF. + ## Pages - [Observability Configuration](configuration.md) documents the whole plugin diff --git a/docs/plugins/observability/atif.md b/docs/plugins/observability/atif.md index c9bad9f0..fba2e0b2 100644 --- a/docs/plugins/observability/atif.md +++ b/docs/plugins/observability/atif.md @@ -60,6 +60,12 @@ The plugin writes each trajectory when the top-level agent scope closes. If the plugin is cleared while an agent is still open, teardown flushes the partial trajectory. +To correlate ATIF with OpenTelemetry or OpenInference traces from the same run, +join on NeMo Flow UUIDs. The plugin-managed ATIF `session_id` is the top-level +agent scope UUID. Each step's `extra.ancestry.function_id` is the event UUID, +and `extra.ancestry.parent_id` is the parent event UUID. Trace spans expose the +same values as `nemo_flow.uuid` and `nemo_flow.parent_uuid` attributes. + ## Plugin Configuration Use plugin configuration when the application should let NeMo Flow own the ATIF diff --git a/docs/plugins/observability/openinference.md b/docs/plugins/observability/openinference.md index 0d574aaa..43e38906 100644 --- a/docs/plugins/observability/openinference.md +++ b/docs/plugins/observability/openinference.md @@ -66,8 +66,14 @@ OpenInference uses the same OTLP section shape as ## Expected Output The backend should show OpenInference-oriented spans for scopes, tools, and LLM -calls from the same `root_uuid`. LLM usage metadata appears as token counters -when provider responses include usage information. +calls grouped by root scope. LLM usage metadata appears as token counters when +provider responses include usage information. + +Each lifecycle span includes `nemo_flow.uuid` and `nemo_flow.parent_uuid` +attributes. These values match ATIF `step.extra.ancestry.function_id` and +`step.extra.ancestry.parent_id` for the same events. For plugin-managed ATIF, +the root agent span's `nemo_flow.uuid` also matches the ATIF `session_id`. +Backend-native `trace_id` and `span_id` values are not written into ATIF. Redact sensitive event payloads with sanitize guardrails before production export. diff --git a/docs/plugins/observability/opentelemetry.md b/docs/plugins/observability/opentelemetry.md index 97f948fc..53791e00 100644 --- a/docs/plugins/observability/opentelemetry.md +++ b/docs/plugins/observability/opentelemetry.md @@ -65,6 +65,12 @@ The collector should receive OTLP trace export requests. The tracing backend should show spans for NeMo Flow scopes, tools, LLM calls, and marks grouped by root scope. +Each lifecycle span includes `nemo_flow.uuid` and `nemo_flow.parent_uuid` +attributes. These values match ATIF `step.extra.ancestry.function_id` and +`step.extra.ancestry.parent_id` for the same events. For plugin-managed ATIF, +the root agent span's `nemo_flow.uuid` also matches the ATIF `session_id`. +Backend-native `trace_id` and `span_id` values are not written into ATIF. + Register the plugin before the first instrumented request, use stable service identity fields, keep credentials outside source code, and flush during graceful shutdown.