Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions crates/core/tests/unit/observability/openinference_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::api::scope::{event, pop_scope, push_scope};
use crate::api::tool::ToolAttributes;
use crate::codec::response::{AnnotatedLlmResponse, Usage};
use crate::json::Json;
use crate::observability::atif::{AtifAgentInfo, AtifExporter, AtifStepExtra};
use opentelemetry_sdk::trace::InMemorySpanExporterBuilder;
use serde_json::json;
use std::collections::HashMap;
Expand Down Expand Up @@ -805,6 +806,92 @@ fn preserves_parent_child_relationships() {
assert!(!child.parent_span_is_remote);
}

#[test]
fn atif_lineage_correlates_with_openinference_span_attributes() {
let (provider, exporter) = make_provider();
let mut processor =
OpenInferenceEventProcessor::new(provider.clone(), "test-scope".to_string());

let agent_uuid = Uuid::now_v7();
let llm_uuid = Uuid::now_v7();
let atif_exporter = AtifExporter::new(
agent_uuid.to_string(),
AtifAgentInfo {
name: "test-agent".to_string(),
version: "1.0.0".to_string(),
model_name: None,
tool_definitions: None,
extra: None,
},
);
let atif_subscriber = atif_exporter.subscriber();

let events = vec![
make_start_event(agent_uuid, None, "agent", ScopeType::Agent, None),
make_start_event(
llm_uuid,
Some(agent_uuid),
"model-call",
ScopeType::Llm,
Some(json!({"messages": [{"role": "user", "content": "hello"}]})),
),
make_end_event(
llm_uuid,
Some(agent_uuid),
"model-call",
ScopeType::Llm,
Some(json!({"content": "hi", "role": "assistant"})),
),
make_end_event(agent_uuid, None, "agent", ScopeType::Agent, None),
];

for event in &events {
processor.process(event);
atif_subscriber(event);
}
processor.force_flush().unwrap();

let spans = exporter.get_finished_spans().unwrap();
let agent_span = spans
.iter()
.find(|span| span.name.as_ref() == "agent")
.unwrap();
let llm_span = spans
.iter()
.find(|span| span.name.as_ref() == "model-call")
.unwrap();
let agent_attributes = attr_map(&agent_span.attributes);
let llm_attributes = attr_map(&llm_span.attributes);

assert_eq!(
agent_attributes.get("nemo_flow.uuid"),
Some(&agent_uuid.to_string())
);
assert_eq!(
llm_attributes.get("nemo_flow.uuid"),
Some(&llm_uuid.to_string())
);
assert_eq!(
llm_attributes.get("nemo_flow.parent_uuid"),
Some(&agent_uuid.to_string())
);

let trajectory = atif_exporter.export();
assert_eq!(trajectory.session_id, agent_uuid.to_string());
let agent_step = trajectory
.steps
.iter()
.find(|step| step.source == "agent")
.unwrap();
let extra: AtifStepExtra = serde_json::from_value(agent_step.extra.clone().unwrap()).unwrap();

Comment thread
willkill07 marked this conversation as resolved.
assert_eq!(
llm_attributes.get("nemo_flow.uuid"),
Some(&extra.ancestry.function_id)
);
assert_eq!(extra.ancestry.parent_id, Some(trajectory.session_id));
}

#[test]
fn orphan_marks_become_zero_duration_spans() {
let (provider, exporter) = make_provider();
Expand Down
86 changes: 86 additions & 0 deletions crates/core/tests/unit/observability/otel_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::api::scope::ScopeType;
use crate::api::scope::{event, pop_scope, push_scope};
use crate::api::tool::ToolAttributes;
use crate::json::Json;
use crate::observability::atif::{AtifAgentInfo, AtifExporter, AtifStepExtra};
use opentelemetry_sdk::trace::InMemorySpanExporterBuilder;
use serde_json::json;
use std::collections::HashMap;
Expand Down Expand Up @@ -551,6 +552,91 @@ fn preserves_parent_child_relationships() {
assert!(!child.parent_span_is_remote);
}

#[test]
fn atif_lineage_correlates_with_otel_span_attributes() {
let (provider, exporter) = make_provider();
let mut processor = OtelEventProcessor::new(provider.clone(), "test-scope".to_string());

let agent_uuid = Uuid::now_v7();
let llm_uuid = Uuid::now_v7();
let atif_exporter = AtifExporter::new(
agent_uuid.to_string(),
AtifAgentInfo {
name: "test-agent".to_string(),
version: "1.0.0".to_string(),
model_name: None,
tool_definitions: None,
extra: None,
},
);
let atif_subscriber = atif_exporter.subscriber();

let events = vec![
make_start_event(agent_uuid, None, "agent", ScopeType::Agent, None),
make_start_event(
llm_uuid,
Some(agent_uuid),
"model-call",
ScopeType::Llm,
Some(json!({"messages": [{"role": "user", "content": "hello"}]})),
),
make_end_event(
llm_uuid,
Some(agent_uuid),
"model-call",
ScopeType::Llm,
Some(json!({"content": "hi", "role": "assistant"})),
),
make_end_event(agent_uuid, None, "agent", ScopeType::Agent, None),
];

for event in &events {
processor.process(event);
atif_subscriber(event);
}
processor.force_flush().unwrap();

let spans = exporter.get_finished_spans().unwrap();
let agent_span = spans
.iter()
.find(|span| span.name.as_ref() == "agent")
.unwrap();
let llm_span = spans
.iter()
.find(|span| span.name.as_ref() == "model-call")
.unwrap();
let agent_attributes = attr_map(&agent_span.attributes);
let llm_attributes = attr_map(&llm_span.attributes);

assert_eq!(
agent_attributes.get("nemo_flow.uuid"),
Some(&agent_uuid.to_string())
);
assert_eq!(
llm_attributes.get("nemo_flow.uuid"),
Some(&llm_uuid.to_string())
);
assert_eq!(
llm_attributes.get("nemo_flow.parent_uuid"),
Some(&agent_uuid.to_string())
);

let trajectory = atif_exporter.export();
assert_eq!(trajectory.session_id, agent_uuid.to_string());
let agent_step = trajectory
.steps
.iter()
.find(|step| step.source == "agent")
.unwrap();
let extra: AtifStepExtra = serde_json::from_value(agent_step.extra.clone().unwrap()).unwrap();

Comment thread
willkill07 marked this conversation as resolved.
assert_eq!(
llm_attributes.get("nemo_flow.uuid"),
Some(&extra.ancestry.function_id)
);
assert_eq!(extra.ancestry.parent_id, Some(trajectory.session_id));
}

#[test]
fn orphan_marks_become_zero_duration_spans() {
let (provider, exporter) = make_provider();
Expand Down
14 changes: 14 additions & 0 deletions docs/plugins/observability/about.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,20 @@ Choose the exporter based on the downstream system:
Start with local event inspection before production export. Add sanitize
guardrails before exporters receive sensitive payloads.

## Correlating Trajectories And Traces

When ATIF and trace exporters observe the same NeMo Flow events, they share
NeMo Flow UUIDs for cross-format joins. Plugin-managed ATIF uses the top-level
agent scope UUID as the trajectory `session_id`. ATIF step lineage stores the
event UUID as `step.extra.ancestry.function_id` and the parent UUID as
`step.extra.ancestry.parent_id`.

OpenTelemetry and OpenInference spans carry the same values as
`nemo_flow.uuid` and `nemo_flow.parent_uuid` span attributes. Mark events use
`nemo_flow.mark.uuid` and `nemo_flow.mark.parent_uuid`. Native backend
`trace_id` and `span_id` values are still generated by the tracing backend and
are not written into ATIF.

## Pages

- [Observability Configuration](configuration.md) documents the whole plugin
Expand Down
6 changes: 6 additions & 0 deletions docs/plugins/observability/atif.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ The plugin writes each trajectory when the top-level agent scope closes. If the
plugin is cleared while an agent is still open, teardown flushes the partial
trajectory.

To correlate ATIF with OpenTelemetry or OpenInference traces from the same run,
join on NeMo Flow UUIDs. The plugin-managed ATIF `session_id` is the top-level
agent scope UUID. Each step's `extra.ancestry.function_id` is the event UUID,
and `extra.ancestry.parent_id` is the parent event UUID. Trace spans expose the
same values as `nemo_flow.uuid` and `nemo_flow.parent_uuid` attributes.

## Plugin Configuration

Use plugin configuration when the application should let NeMo Flow own the ATIF
Expand Down
10 changes: 8 additions & 2 deletions docs/plugins/observability/openinference.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,14 @@ OpenInference uses the same OTLP section shape as
## Expected Output

The backend should show OpenInference-oriented spans for scopes, tools, and LLM
calls from the same `root_uuid`. LLM usage metadata appears as token counters
when provider responses include usage information.
calls grouped by root scope. LLM usage metadata appears as token counters when
provider responses include usage information.

Each lifecycle span includes `nemo_flow.uuid` and `nemo_flow.parent_uuid`
attributes. These values match ATIF `step.extra.ancestry.function_id` and
`step.extra.ancestry.parent_id` for the same events. For plugin-managed ATIF,
the root agent span's `nemo_flow.uuid` also matches the ATIF `session_id`.
Backend-native `trace_id` and `span_id` values are not written into ATIF.

Redact sensitive event payloads with sanitize guardrails before production
export.
Expand Down
6 changes: 6 additions & 0 deletions docs/plugins/observability/opentelemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ The collector should receive OTLP trace export requests. The tracing backend
should show spans for NeMo Flow scopes, tools, LLM calls, and marks grouped by
root scope.

Each lifecycle span includes `nemo_flow.uuid` and `nemo_flow.parent_uuid`
attributes. These values match ATIF `step.extra.ancestry.function_id` and
`step.extra.ancestry.parent_id` for the same events. For plugin-managed ATIF,
the root agent span's `nemo_flow.uuid` also matches the ATIF `session_id`.
Backend-native `trace_id` and `span_id` values are not written into ATIF.

Register the plugin before the first instrumented request, use stable service
identity fields, keep credentials outside source code, and flush during
graceful shutdown.
Expand Down
Loading