diff --git a/Cargo.lock b/Cargo.lock index 1a867aaf..5b8ef7b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1253,6 +1253,7 @@ dependencies = [ "http", "http-body-util", "nemo-flow", + "nemo-flow-adaptive", "reqwest", "rustls", "serde", diff --git a/crates/adaptive/src/config.rs b/crates/adaptive/src/config.rs index 323446a4..77b74085 100644 --- a/crates/adaptive/src/config.rs +++ b/crates/adaptive/src/config.rs @@ -52,7 +52,7 @@ impl Default for AdaptiveConfig { } /// Shared state configuration consumed by adaptive features that need persistence. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct StateConfig { /// Backend selection for adaptive state. pub backend: BackendSpec, @@ -68,6 +68,12 @@ pub struct BackendSpec { pub config: Map, } +impl Default for BackendSpec { + fn default() -> Self { + Self::in_memory() + } +} + impl BackendSpec { /// Creates an in-memory backend spec. pub fn in_memory() -> Self { @@ -209,6 +215,127 @@ fn default_acg_priority() -> i32 { 50 } +nemo_flow::editor_config! { + impl AdaptiveConfig { + agent_id => { label: "agent_id", kind: String, optional: true }, + state => { + label: "state", + kind: Section, + optional: true, + nested: StateConfig, + default: StateConfig, + }, + telemetry => { + label: "telemetry", + kind: Section, + optional: true, + nested: TelemetryComponentConfig, + default: TelemetryComponentConfig, + }, + adaptive_hints => { + label: "adaptive_hints", + kind: Section, + optional: true, + nested: AdaptiveHintsComponentConfig, + default: AdaptiveHintsComponentConfig, + }, + tool_parallelism => { + label: "tool_parallelism", + kind: Section, + optional: true, + nested: ToolParallelismComponentConfig, + default: ToolParallelismComponentConfig, + }, + acg => { + label: "acg", + kind: Section, + optional: true, + nested: AcgComponentConfig, + default: AcgComponentConfig, + }, + policy => { + label: "policy", + kind: Section, + nested: ConfigPolicy, + default: ConfigPolicy, + }, + } +} + +nemo_flow::editor_config! { + impl StateConfig { + backend => { + label: "backend", + kind: Section, + nested: BackendSpec, + default: BackendSpec, + }, + } +} + +nemo_flow::editor_config! { + impl BackendSpec { + kind => { label: "kind", kind: Enum, values: ["in_memory", "redis"] }, + config => { label: "config", kind: Json }, + } +} + +nemo_flow::editor_config! { + impl TelemetryComponentConfig { + subscriber_name => { label: "subscriber_name", kind: String, optional: true }, + learners => { label: "learners", kind: Json }, + } +} + +nemo_flow::editor_config! { + impl AdaptiveHintsComponentConfig { + priority => { label: "priority", kind: Integer }, + break_chain => { label: "break_chain", kind: Boolean }, + inject_header => { label: "inject_header", kind: Boolean }, + inject_body_path => { label: "inject_body_path", kind: String }, + } +} + +nemo_flow::editor_config! { + impl ToolParallelismComponentConfig { + priority => { label: "priority", kind: Integer }, + mode => { + label: "mode", + kind: Enum, + values: ["observe_only", "inject_hints", "schedule"], + }, + } +} + +nemo_flow::editor_config! { + impl AcgComponentConfig { + provider => { + label: "provider", + kind: Enum, + values: ["passthrough", "anthropic", "openai"], + }, + observation_window => { label: "observation_window", kind: Integer }, + priority => { label: "priority", kind: Integer }, + stability_thresholds => { + label: "stability_thresholds", + kind: Section, + nested: crate::acg::stability::StabilityThresholds, + default: crate::acg::stability::StabilityThresholds, + }, + } +} + +nemo_flow::editor_config! { + impl crate::acg::stability::StabilityThresholds { + stable_threshold => { label: "stable_threshold", kind: Float }, + semi_stable_threshold => { label: "semi_stable_threshold", kind: Float }, + min_observations_for_full_confidence => { + label: "min_observations_for_full_confidence", + kind: Integer, + }, + } +} + #[cfg(test)] #[path = "../tests/unit/config_tests.rs"] mod tests; diff --git a/crates/adaptive/tests/unit/config_tests.rs b/crates/adaptive/tests/unit/config_tests.rs index 1ea750ac..dabe2e3a 100644 --- a/crates/adaptive/tests/unit/config_tests.rs +++ b/crates/adaptive/tests/unit/config_tests.rs @@ -4,6 +4,7 @@ //! Unit tests for config in the NeMo Flow adaptive crate. use super::*; +use nemo_flow::config_editor::{EditorConfig, EditorFieldKind}; use serde_json::json; #[test] @@ -71,3 +72,50 @@ fn test_component_configs_deserialize_with_default_helpers() { assert_eq!(tool_parallelism.priority, 100); assert_eq!(tool_parallelism.mode, "observe_only"); } + +#[test] +fn test_adaptive_editor_schema_covers_canonical_options() { + let schema = AdaptiveConfig::editor_schema(); + let fields = schema + .fields + .iter() + .map(|field| field.name) + .collect::>(); + assert_eq!( + fields, + vec![ + "agent_id", + "state", + "telemetry", + "adaptive_hints", + "tool_parallelism", + "acg", + "policy", + ] + ); + + let state = schema.field("state").unwrap().schema().unwrap(); + let backend = state.field("backend").unwrap().schema().unwrap(); + assert_eq!(backend.field("kind").unwrap().kind, EditorFieldKind::Enum); + assert_eq!(backend.field("config").unwrap().kind, EditorFieldKind::Json); + + let telemetry = schema.field("telemetry").unwrap().schema().unwrap(); + assert_eq!( + telemetry.field("learners").unwrap().kind, + EditorFieldKind::Json + ); + + let acg = schema.field("acg").unwrap().schema().unwrap(); + let thresholds = acg.field("stability_thresholds").unwrap().schema().unwrap(); + assert_eq!( + thresholds.field("stable_threshold").unwrap().kind, + EditorFieldKind::Float + ); + assert_eq!( + thresholds + .field("min_observations_for_full_confidence") + .unwrap() + .kind, + EditorFieldKind::Integer + ); +} diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 4316643a..0b6c9e5d 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -22,6 +22,7 @@ workspace = true [dependencies] nemo-flow = { workspace = true, features = ["openinference"] } +nemo-flow-adaptive = { workspace = true, features = ["redis-backend"] } async-stream = "0.3" axum = "0.8" bytes = "1" diff --git a/crates/cli/src/gateway.rs b/crates/cli/src/gateway.rs index b03ade05..4666e506 100644 --- a/crates/cli/src/gateway.rs +++ b/crates/cli/src/gateway.rs @@ -6,7 +6,7 @@ use std::sync::{Arc, Mutex}; use async_stream::stream; use axum::body::{Body, Bytes}; use axum::extract::State; -use axum::http::{HeaderMap, HeaderName, Method, Request, Response, StatusCode}; +use axum::http::{HeaderMap, HeaderName, HeaderValue, Method, Request, Response, StatusCode}; use futures_util::StreamExt; use nemo_flow::api::llm::{ LlmCallExecuteParams, LlmRequest, LlmStreamCallExecuteParams, llm_call_execute, @@ -297,7 +297,7 @@ fn build_buffered_func( let body_bytes = prepared.body_bytes.clone(); let headers = prepared.headers.clone(); let route = prepared.provider; - Arc::new(move |_request| { + Arc::new(move |request| { let http = http.clone(); let method = method.clone(); let url = url.clone(); @@ -307,17 +307,24 @@ fn build_buffered_func( let upstream_error = upstream_error.clone(); let response_bytes = response_bytes.clone(); Box::pin(async move { - let response = - match forward_upstream_request(&http, &method, &url, &body_bytes, &headers, route) - .await - { - Ok(response) => response, - Err(error) => { - let message = error.to_string(); - *upstream_error.lock().expect("upstream error lock poisoned") = Some(error); - return Err(FlowError::Internal(message)); - } - }; + let response = match forward_upstream_request( + &http, + &method, + &url, + &body_bytes, + &headers, + Some(&request), + route, + ) + .await + { + Ok(response) => response, + Err(error) => { + let message = error.to_string(); + *upstream_error.lock().expect("upstream error lock poisoned") = Some(error); + return Err(FlowError::Internal(message)); + } + }; let status = response.status(); let response_headers = response_headers(response.headers()); let bytes = match response.bytes().await { @@ -431,7 +438,7 @@ fn build_streaming_func( let body_bytes = prepared.body_bytes.clone(); let headers = prepared.headers.clone(); let route = prepared.provider; - Arc::new(move |_request| { + Arc::new(move |request| { let http = http.clone(); let method = method.clone(); let url = url.clone(); @@ -440,17 +447,24 @@ fn build_streaming_func( let upstream_info = upstream_info.clone(); let upstream_error = upstream_error.clone(); Box::pin(async move { - let response = - match forward_upstream_request(&http, &method, &url, &body_bytes, &headers, route) - .await - { - Ok(response) => response, - Err(error) => { - let message = error.to_string(); - *upstream_error.lock().expect("upstream error lock poisoned") = Some(error); - return Err(FlowError::Internal(message)); - } - }; + let response = match forward_upstream_request( + &http, + &method, + &url, + &body_bytes, + &headers, + Some(&request), + route, + ) + .await + { + Ok(response) => response, + Err(error) => { + let message = error.to_string(); + *upstream_error.lock().expect("upstream error lock poisoned") = Some(error); + return Err(FlowError::Internal(message)); + } + }; let status = response.status(); let response_headers = response_headers(response.headers()); *upstream_info.lock().expect("upstream info lock poisoned") = @@ -554,8 +568,10 @@ async fn forward_upstream_request( url: &str, body_bytes: &Bytes, headers: &HeaderMap, + effective_request: Option<&LlmRequest>, route: ProviderRoute, ) -> Result { + let (body_bytes, headers) = effective_upstream_request(body_bytes, headers, effective_request); // Only strip the inbound JWT when we actually have a replacement key to inject. Without one // the upstream just receives no auth and 401s, which is no better than letting it reject the // JWT itself — and stripping silently can break setups that point the gateway at an upstream @@ -566,7 +582,7 @@ async fn forward_upstream_request( .ok() .filter(|v| !v.trim().is_empty()) .is_some(); - let sanitized = strip_chatgpt_oauth_for_openai_route(headers, route, has_openai_env); + let sanitized = strip_chatgpt_oauth_for_openai_route(&headers, route, has_openai_env); let mut upstream = http.request(method.clone(), url).body(body_bytes.clone()); for (name, value) in &sanitized { if should_forward_request_header(name) { @@ -577,6 +593,43 @@ async fn forward_upstream_request( upstream.send().await } +fn effective_upstream_request( + body_bytes: &Bytes, + headers: &HeaderMap, + effective_request: Option<&LlmRequest>, +) -> (Bytes, HeaderMap) { + let Some(request) = effective_request else { + return (body_bytes.clone(), headers.clone()); + }; + + let body_bytes = if request.content.is_null() { + body_bytes.clone() + } else { + serde_json::to_vec(&request.content) + .map(Bytes::from) + .unwrap_or_else(|_| body_bytes.clone()) + }; + let mut headers = headers.clone(); + for (name, value) in &request.headers { + let Ok(name) = HeaderName::from_bytes(name.as_bytes()) else { + continue; + }; + let Some(value) = json_header_value(value) else { + continue; + }; + headers.insert(name, value); + } + (body_bytes, headers) +} + +fn json_header_value(value: &Value) -> Option { + let rendered = match value { + Value::String(value) => value.clone(), + value => serde_json::to_string(value).ok()?, + }; + HeaderValue::from_str(&rendered).ok() +} + // Builds the upstream URL for the ChatGPT backend. OpenAI API bases commonly include `/v1`, while // the ChatGPT backend base is // `chatgpt.com/backend-api/codex` (no `/v1`). Both append `/responses` to their base, so the @@ -718,6 +771,7 @@ async fn passthrough_streaming( &prepared.upstream_url, &prepared.body_bytes, &prepared.headers, + None, prepared.provider, ) .await?; diff --git a/crates/cli/src/plugins.rs b/crates/cli/src/plugins.rs index 1d8c6e11..94e6cdc9 100644 --- a/crates/cli/src/plugins.rs +++ b/crates/cli/src/plugins.rs @@ -15,6 +15,7 @@ use dialoguer::theme::ColorfulTheme; use dialoguer::{Input, Select}; use nemo_flow::config_editor::{EditorConfig, EditorFieldKind, EditorFieldSpec}; use nemo_flow::observability::plugin_component::ObservabilityConfig; +use nemo_flow_adaptive::AdaptiveConfig; use serde_json::{Value, json}; use crate::config::PluginsEditCommand; @@ -89,29 +90,41 @@ pub(crate) fn edit(command: PluginsEditCommand) -> Result<(), CliError> { let path = target_path(scope)?; let mut config = read_plugin_config(&path)?; ensure_observability_component(&mut config)?; + ensure_adaptive_component(&mut config)?; let mut observability = component_observability_config(&config)?; + let mut adaptive = component_adaptive_config(&config)?; let theme = ColorfulTheme::default(); crate::banner::print_intro(); - println!( - " Editing Observability plugin config at {}", - path.display() - ); + println!(" Editing plugin config at {}", path.display()); println!(" Tip: ↑/↓ or j/k to move, SPACE/ENTER to select, p to preview, s to save."); println!(); + let mut selected_index = 0; loop { let summary = observability_summary(&config, &observability); - let section_fields = ObservabilityConfig::editor_schema().fields; + let adaptive_summary = adaptive_summary(&config, &adaptive); + let observability_fields = ObservabilityConfig::editor_schema().fields; + let adaptive_fields = AdaptiveConfig::editor_schema().fields; let mut items = vec![MenuItem::new(format!( "Toggle Observability component [{}]", status_label(component_enabled(&config)) ))]; - items.extend(section_fields.iter().map(|section| { + items.extend(observability_fields.iter().map(|section| { MenuItem::new(configured_label( section_configured(&observability, *section), format!("Edit {}", section.label), )) })); + items.push(MenuItem::new(format!( + "Toggle Adaptive component [{}]", + status_label(adaptive_component_enabled(&config)) + ))); + items.extend(adaptive_fields.iter().map(|field| { + MenuItem::new(configured_label( + config_field_configured(&adaptive, *field).unwrap_or(false), + format!("Edit Adaptive {}", field.label), + )) + })); items.push(MenuItem::new(shortcut_label("Preview TOML", "p"))); items.push(MenuItem::new(shortcut_label( format!("Save to {}", path.display()), @@ -120,26 +133,52 @@ pub(crate) fn edit(command: PluginsEditCommand) -> Result<(), CliError> { items.push(MenuItem::new(shortcut_label("Cancel", "q"))); println!(); println!("Observability: {summary}"); - let preview_index = section_fields.len() + 1; - let save_index = section_fields.len() + 2; - let cancel_index = section_fields.len() + 3; - let selection = prompt_menu(&theme, "plugins.toml", &items, 0)?; + println!("Adaptive: {adaptive_summary}"); + let observability_start_index = 1; + let observability_end_index = observability_start_index + observability_fields.len(); + let adaptive_toggle_index = observability_end_index; + let adaptive_start_index = adaptive_toggle_index + 1; + let preview_index = adaptive_start_index + adaptive_fields.len(); + let save_index = preview_index + 1; + let cancel_index = preview_index + 2; + let selection = prompt_menu(&theme, "plugins.toml", &items, selected_index)?; + if let Some(selected) = menu_response_index(&selection) { + selected_index = selected; + } match selection { MenuResponse::Selected(0) => { let enabled = !component_enabled(&config); set_component_enabled(&mut config, enabled); } MenuResponse::Selected(selection) - if (1..=section_fields.len()).contains(&selection) => + if (observability_start_index..observability_end_index).contains(&selection) => { - edit_section(&theme, &mut observability, section_fields[selection - 1])? + edit_section( + &theme, + &mut observability, + observability_fields[selection - observability_start_index], + )? + } + MenuResponse::Selected(selection) if selection == adaptive_toggle_index => { + let enabled = !adaptive_component_enabled(&config); + set_adaptive_component_enabled(&mut config, enabled); + } + MenuResponse::Selected(selection) + if (adaptive_start_index..preview_index).contains(&selection) => + { + edit_config_field( + &theme, + &mut adaptive, + adaptive_fields[selection - adaptive_start_index], + )? } MenuResponse::Selected(selection) if selection == preview_index => { - let preview_config = config_with_observability(&config, &observability)?; + let preview_config = config_with_components(&config, &observability, &adaptive)?; print_preview(&preview_config)?; } MenuResponse::Selected(selection) if selection == save_index => { store_observability_config(&mut config, &observability)?; + store_adaptive_config(&mut config, &adaptive)?; validate_config(&config)?; write_plugin_config(&path, &config)?; print_save_success(&path); @@ -151,19 +190,34 @@ pub(crate) fn edit(command: PluginsEditCommand) -> Result<(), CliError> { )); } MenuResponse::Shortcut(MenuShortcut::Preview, _) => { - let preview_config = config_with_observability(&config, &observability)?; + let preview_config = config_with_components(&config, &observability, &adaptive)?; print_preview(&preview_config)?; } MenuResponse::Shortcut(MenuShortcut::Save, _) => { store_observability_config(&mut config, &observability)?; + store_adaptive_config(&mut config, &adaptive)?; validate_config(&config)?; write_plugin_config(&path, &config)?; print_save_success(&path); return Ok(()); } MenuResponse::Shortcut(MenuShortcut::Help, _) => print_editor_help(), - MenuResponse::Shortcut(MenuShortcut::Reset | MenuShortcut::Clear, _) => { - println!(" Select a section first, then use reset or clear on a field."); + MenuResponse::Shortcut(MenuShortcut::Reset | MenuShortcut::Clear, selected) => { + if (observability_start_index..observability_end_index).contains(&selected) { + reset_config_field( + &mut observability, + observability_fields[selected - observability_start_index], + )?; + } else if (adaptive_start_index..preview_index).contains(&selected) { + reset_config_field( + &mut adaptive, + adaptive_fields[selected - adaptive_start_index], + )?; + } else { + println!( + " Select an Observability or Adaptive field, or a section field, to reset or clear." + ); + } } MenuResponse::Cancel | MenuResponse::Selected(_) => { return Err(CliError::Config( @@ -174,6 +228,21 @@ pub(crate) fn edit(command: PluginsEditCommand) -> Result<(), CliError> { } } +fn menu_response_index(response: &MenuResponse) -> Option { + match response { + MenuResponse::Selected(index) + | MenuResponse::Shortcut( + MenuShortcut::Preview + | MenuShortcut::Save + | MenuShortcut::Help + | MenuShortcut::Reset + | MenuShortcut::Clear, + index, + ) => Some(*index), + MenuResponse::Cancel => None, + } +} + fn prompt_menu( theme: &ColorfulTheme, prompt: &str, @@ -338,19 +407,25 @@ fn ensure_tty() -> Result<(), CliError> { Ok(()) } -fn edit_section( +fn edit_section( theme: &ColorfulTheme, - config: &mut ObservabilityConfig, + config: &mut T, section: EditorFieldSpec, -) -> Result<(), CliError> { - ensure_section(config, section); +) -> Result<(), CliError> +where + T: SerializeConfig, +{ let fields = section .schema() .ok_or_else(|| CliError::Config(format!("{} is not an editable section", section.name)))? .fields; + let mut selected_index = 0; loop { let items = section_menu_items(config, section, fields)?; - let selection = prompt_menu(theme, section.name, &items, 0)?; + let selection = prompt_menu(theme, section.name, &items, selected_index)?; + if let Some(selected) = menu_response_index(&selection) { + selected_index = selected; + } let selection = match selection { MenuResponse::Selected(selection) => selection, MenuResponse::Shortcut(MenuShortcut::Help, _) => { @@ -380,11 +455,14 @@ fn edit_section( } } -fn section_menu_items( - config: &ObservabilityConfig, +fn section_menu_items( + config: &T, section: EditorFieldSpec, fields: &[EditorFieldSpec], -) -> Result, CliError> { +) -> Result, CliError> +where + T: serde::Serialize, +{ let mut items = Vec::new(); if section_has_enabled_toggle(section) { let enabled = section_enabled(config, section).unwrap_or(false); @@ -401,11 +479,14 @@ fn section_menu_items( Ok(items) } -fn section_field_menu_item( - config: &ObservabilityConfig, +fn section_field_menu_item( + config: &T, section: EditorFieldSpec, field: EditorFieldSpec, -) -> Result { +) -> Result +where + T: serde::Serialize, +{ let configured = section_field_configured(config, section, field)?; let value = section_field_value(config, section, field.name)? .map(|value| display_field_value(section, field, &value)) @@ -429,12 +510,15 @@ fn reset_section_index(section: EditorFieldSpec, fields: &[EditorFieldSpec]) -> usize::from(section_has_enabled_toggle(section)) + fields.len() } -fn reset_selected_item( - config: &mut ObservabilityConfig, +fn reset_selected_item( + config: &mut T, section: EditorFieldSpec, fields: &[EditorFieldSpec], selected: usize, -) -> Result<(), CliError> { +) -> Result<(), CliError> +where + T: SerializeConfig, +{ if reset_selected_field(config, section, fields, selected)? { return Ok(()); } @@ -444,13 +528,16 @@ fn reset_selected_item( Ok(()) } -fn edit_selected_section_item( +fn edit_selected_section_item( theme: &ColorfulTheme, - config: &mut ObservabilityConfig, + config: &mut T, section: EditorFieldSpec, fields: &[EditorFieldSpec], selection: usize, -) -> Result { +) -> Result +where + T: SerializeConfig, +{ if section_has_enabled_toggle(section) && selection == 0 { toggle_section(config, section); return Ok(true); @@ -467,12 +554,19 @@ fn edit_selected_section_item( Ok(false) } -fn edit_field( +fn edit_field( theme: &ColorfulTheme, - config: &mut ObservabilityConfig, + config: &mut T, section: EditorFieldSpec, field: &EditorFieldSpec, -) -> Result<(), CliError> { +) -> Result<(), CliError> +where + T: SerializeConfig, +{ + if field.kind == EditorFieldKind::Section { + edit_nested_section(theme, config, section, *field)?; + return Ok(()); + } let current = section_field_value(config, section, field.name)?; let actions = [ MenuItem::new("Set value"), @@ -514,6 +608,352 @@ fn edit_field( Ok(()) } +fn edit_config_field( + theme: &ColorfulTheme, + config: &mut T, + field: EditorFieldSpec, +) -> Result<(), CliError> +where + T: Default + SerializeConfig, +{ + if field.kind == EditorFieldKind::Section { + let mut value = config_field_value(config, field.name)? + .or_else(|| field.default_value()) + .unwrap_or_else(|| json!({})); + let schema = field.schema().ok_or_else(|| { + CliError::Config(format!("{} is not an editable section", field.name)) + })?; + if edit_value_section(theme, field.name, &mut value, schema, field.default_value())? { + set_struct_field(config, field.name, value)?; + } + return Ok(()); + } + + let current = config_field_value(config, field.name)?; + let actions = [ + MenuItem::new("Set value"), + MenuItem::new(shortcut_label( + "Reset to default/none", + "r, Backspace, Delete", + )), + MenuItem::new(shortcut_label("Back", "q")), + ]; + let action = prompt_menu( + theme, + &format!( + "{}, current {}", + field.name, + current + .as_ref() + .map(display_value) + .or_else(|| default_config_field_value::(field) + .map(|value| { format!("{} (default)", display_value(&value)) })) + .unwrap_or_else(|| "(default)".to_string()) + ), + &actions, + 0, + )?; + match action { + MenuResponse::Selected(0) => { + let value = prompt_value(theme, &field, current.as_ref())?; + set_struct_field(config, field.name, value)?; + } + MenuResponse::Selected(1) + | MenuResponse::Shortcut(MenuShortcut::Reset | MenuShortcut::Clear, _) => { + reset_config_field(config, field)? + } + MenuResponse::Shortcut(MenuShortcut::Help, _) => print_editor_help(), + MenuResponse::Shortcut(MenuShortcut::Preview | MenuShortcut::Save, _) => { + println!(" Preview and save are available from the main plugins.toml menu."); + } + _ => {} + } + Ok(()) +} + +fn edit_nested_section( + theme: &ColorfulTheme, + config: &mut T, + section: EditorFieldSpec, + field: EditorFieldSpec, +) -> Result<(), CliError> +where + T: SerializeConfig, +{ + let mut value = section_field_value(config, section, field.name)? + .or_else(|| field.default_value()) + .unwrap_or_else(|| json!({})); + let schema = field + .schema() + .ok_or_else(|| CliError::Config(format!("{} is not an editable section", field.name)))?; + if edit_value_section( + theme, + &format!("{}.{}", section.name, field.name), + &mut value, + schema, + field.default_value(), + )? { + set_section_field(config, section, field.name, value)?; + } + Ok(()) +} + +fn edit_value_section( + theme: &ColorfulTheme, + prompt: &str, + value: &mut Value, + schema: &nemo_flow::config_editor::EditorSchema, + default: Option, +) -> Result { + ensure_object(value); + let original = value.clone(); + let mut selected_index = 0; + loop { + let items = value_section_menu_items(value, schema, default.as_ref())?; + let selection = prompt_menu(theme, prompt, &items, selected_index)?; + if let Some(selected) = menu_response_index(&selection) { + selected_index = selected; + } + let selection = match selection { + MenuResponse::Selected(selection) => selection, + MenuResponse::Shortcut(MenuShortcut::Help, _) => { + print_editor_help(); + continue; + } + MenuResponse::Shortcut(MenuShortcut::Reset, selected) => { + reset_value_section_item(value, schema, default.as_ref(), selected); + continue; + } + MenuResponse::Shortcut(MenuShortcut::Clear, selected) => { + if clear_value_field(value, schema, selected) { + continue; + } + println!(" Select a field to clear."); + continue; + } + MenuResponse::Shortcut(MenuShortcut::Preview | MenuShortcut::Save, _) => { + println!(" Preview and save are available from the main plugins.toml menu."); + continue; + } + MenuResponse::Cancel => return Ok(*value != original), + }; + if !edit_selected_value_item(theme, prompt, value, schema, default.as_ref(), selection)? { + return Ok(*value != original); + } + } +} + +fn value_section_menu_items( + value: &Value, + schema: &nemo_flow::config_editor::EditorSchema, + default: Option<&Value>, +) -> Result, CliError> { + let mut items = schema + .fields + .iter() + .map(|field| value_field_menu_item(value, *field, default)) + .collect::, _>>()?; + items.push(MenuItem::new(shortcut_label("Reset section", "r"))); + items.push(MenuItem::new(shortcut_label("Back", "q"))); + Ok(items) +} + +fn value_field_menu_item( + value: &Value, + field: EditorFieldSpec, + default: Option<&Value>, +) -> Result { + let configured = value_field_configured(value, field, default); + let rendered = value_field_value(value, field.name) + .map(|value| display_value_with_default(&value, default_object_field_value(default, field))) + .or_else(|| { + default_object_field_value(default, field) + .map(|value| format!("{} (default)", display_value(&value))) + }) + .unwrap_or_else(|| "(default)".to_string()); + Ok(MenuItem::new(format!( + "{} = {}", + configured_label(configured, field.name), + rendered + ))) +} + +fn edit_selected_value_item( + theme: &ColorfulTheme, + prompt: &str, + value: &mut Value, + schema: &nemo_flow::config_editor::EditorSchema, + default: Option<&Value>, + selection: usize, +) -> Result { + if let Some(field) = schema.fields.get(selection) { + edit_value_field(theme, prompt, value, *field, default)?; + return Ok(true); + } + if selection == schema.fields.len() { + *value = default.cloned().unwrap_or_else(|| json!({})); + ensure_object(value); + return Ok(true); + } + Ok(false) +} + +fn edit_value_field( + theme: &ColorfulTheme, + prompt: &str, + value: &mut Value, + field: EditorFieldSpec, + default: Option<&Value>, +) -> Result<(), CliError> { + if field.kind == EditorFieldKind::Section { + let mut nested_value = value_field_value(value, field.name) + .or_else(|| field.default_value()) + .unwrap_or_else(|| json!({})); + let nested_schema = field.schema().ok_or_else(|| { + CliError::Config(format!("{} is not an editable section", field.name)) + })?; + if edit_value_section( + theme, + &format!("{prompt}.{}", field.name), + &mut nested_value, + nested_schema, + field.default_value(), + )? { + set_value_field(value, field.name, nested_value); + } + return Ok(()); + } + + let current = value_field_value(value, field.name); + let actions = [ + MenuItem::new("Set value"), + MenuItem::new(shortcut_label( + "Reset to default/none", + "r, Backspace, Delete", + )), + MenuItem::new(shortcut_label("Back", "q")), + ]; + let action = prompt_menu( + theme, + &format!( + "{prompt}.{}, current {}", + field.name, + current + .as_ref() + .map(|value| { + display_value_with_default(value, default_object_field_value(default, field)) + }) + .or_else(|| { + default_object_field_value(default, field) + .map(|value| format!("{} (default)", display_value(&value))) + }) + .unwrap_or_else(|| "(default)".to_string()) + ), + &actions, + 0, + )?; + match action { + MenuResponse::Selected(0) => { + let field_value = prompt_value(theme, &field, current.as_ref())?; + set_value_field(value, field.name, field_value); + } + MenuResponse::Selected(1) + | MenuResponse::Shortcut(MenuShortcut::Reset | MenuShortcut::Clear, _) => { + reset_value_field(value, field, default) + } + MenuResponse::Shortcut(MenuShortcut::Help, _) => print_editor_help(), + MenuResponse::Shortcut(MenuShortcut::Preview | MenuShortcut::Save, _) => { + println!(" Preview and save are available from the main plugins.toml menu."); + } + _ => {} + } + Ok(()) +} + +fn reset_value_section_item( + value: &mut Value, + schema: &nemo_flow::config_editor::EditorSchema, + default: Option<&Value>, + selected: usize, +) { + if let Some(field) = schema.fields.get(selected) { + reset_value_field(value, *field, default); + } else if selected == schema.fields.len() { + *value = default.cloned().unwrap_or_else(|| json!({})); + ensure_object(value); + } +} + +fn clear_value_field( + value: &mut Value, + schema: &nemo_flow::config_editor::EditorSchema, + selected: usize, +) -> bool { + let Some(field) = schema.fields.get(selected) else { + return false; + }; + remove_value_field(value, field.name); + true +} + +fn value_field_configured(value: &Value, field: EditorFieldSpec, default: Option<&Value>) -> bool { + let Some(current) = value_field_value(value, field.name) else { + return false; + }; + if field.optional { + return true; + } + default_object_field_value(default, field) + .as_ref() + .is_none_or(|default| default != ¤t) +} + +fn value_field_value(value: &Value, field: &str) -> Option { + value + .as_object() + .and_then(|object| object.get(field)) + .filter(|value| !value.is_null()) + .cloned() +} + +fn default_object_field_value(default: Option<&Value>, field: EditorFieldSpec) -> Option { + default + .and_then(Value::as_object) + .and_then(|object| object.get(field.name)) + .filter(|value| !value.is_null()) + .cloned() +} + +fn set_value_field(target: &mut Value, field: &str, field_value: Value) { + ensure_object(target).insert(field.to_string(), field_value); +} + +fn remove_value_field(target: &mut Value, field: &str) { + if let Some(object) = target.as_object_mut() { + object.remove(field); + } +} + +fn reset_value_field(value: &mut Value, field: EditorFieldSpec, default: Option<&Value>) { + if let Some(default) = default_object_field_value(default, field) { + set_value_field(value, field.name, default); + } else { + remove_value_field(value, field.name); + } +} + +fn display_value_with_default(value: &Value, default: Option) -> String { + if default.as_ref().is_some_and(|default| default == value) { + format!("{} (default)", display_value(value)) + } else { + display_value(value) + } +} + +trait SerializeConfig: serde::Serialize + serde::de::DeserializeOwned {} + +impl SerializeConfig for T where T: serde::Serialize + serde::de::DeserializeOwned {} + fn prompt_value( theme: &ColorfulTheme, field: &EditorFieldSpec, @@ -541,14 +981,23 @@ fn prompt_value( .with_initial_text(initial) .interact_text() .map_err(editor_error)?; - let parsed = value.trim().parse::().map_err(|error| { + let parsed = value.trim().parse::().map_err(|error| { CliError::Config(format!("{} must be an integer: {error}", field.name)) })?; Ok(json!(parsed)) } + EditorFieldKind::Float => { + let initial = current.map(display_value).unwrap_or_default(); + let value: String = Input::with_theme(theme) + .with_prompt(field.name) + .with_initial_text(initial) + .interact_text() + .map_err(editor_error)?; + parse_float_value(field, &value) + } EditorFieldKind::StringMap | EditorFieldKind::Json => { let initial = current.map(display_value).unwrap_or_else(|| { - if field.name == "tool_definitions" { + if matches!(field.name, "tool_definitions" | "learners") { "[]".to_string() } else { "{}".to_string() @@ -593,6 +1042,20 @@ fn prompt_value( } } +fn parse_float_value(field: &EditorFieldSpec, value: &str) -> Result { + let value = value.trim(); + let parsed = value + .parse::() + .map_err(|error| CliError::Config(format!("{} must be a number: {error}", field.name)))?; + if !parsed.is_finite() { + return Err(CliError::Config(format!( + "{} must be a finite number: {value}", + field.name + ))); + } + Ok(json!(parsed)) +} + fn editor_error(err: dialoguer::Error) -> CliError { match err { dialoguer::Error::IO(io_err) diff --git a/crates/cli/src/plugins/config_io.rs b/crates/cli/src/plugins/config_io.rs index ad31143a..a759dde7 100644 --- a/crates/cli/src/plugins/config_io.rs +++ b/crates/cli/src/plugins/config_io.rs @@ -7,6 +7,7 @@ use std::path::{Path, PathBuf}; use console::style; use nemo_flow::plugin::{ConfigPolicy, PluginConfig, validate_plugin_config}; +use nemo_flow_adaptive::plugin_component::register_adaptive_component; use serde_json::{Map, Value}; use crate::config::{ @@ -115,6 +116,9 @@ pub(super) fn print_preview(config: &PluginConfig) -> Result<(), CliError> { } pub(super) fn validate_config(config: &PluginConfig) -> Result<(), CliError> { + register_adaptive_component().map_err(|error| { + CliError::Config(format!("adaptive plugin registration failed: {error}")) + })?; let report = validate_plugin_config(config); if report.has_errors() { let messages = report diff --git a/crates/cli/src/plugins/editor_model.rs b/crates/cli/src/plugins/editor_model.rs index 0e526a35..01072442 100644 --- a/crates/cli/src/plugins/editor_model.rs +++ b/crates/cli/src/plugins/editor_model.rs @@ -1,11 +1,13 @@ // SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 -//! Testable Observability plugin editor state helpers. +//! Testable plugin editor state helpers. use nemo_flow::config_editor::{EditorConfig, EditorFieldKind, EditorFieldSpec}; use nemo_flow::observability::plugin_component::{OBSERVABILITY_PLUGIN_KIND, ObservabilityConfig}; use nemo_flow::plugin::{PluginComponentSpec, PluginConfig}; +use nemo_flow_adaptive::AdaptiveConfig; +use nemo_flow_adaptive::plugin_component::ADAPTIVE_PLUGIN_KIND; use serde::Serialize; use serde::de::DeserializeOwned; use serde_json::{Map, Value, json}; @@ -29,6 +31,21 @@ pub(super) fn ensure_observability_component(config: &mut PluginConfig) -> Resul Ok(()) } +pub(super) fn ensure_adaptive_component(config: &mut PluginConfig) -> Result<(), CliError> { + if !config + .components + .iter() + .any(|component| component.kind == ADAPTIVE_PLUGIN_KIND) + { + config.components.push(PluginComponentSpec { + kind: ADAPTIVE_PLUGIN_KIND.to_string(), + enabled: false, + config: adaptive_config_map(&AdaptiveConfig::default())?, + }); + } + Ok(()) +} + pub(super) fn component_enabled(config: &PluginConfig) -> bool { observability_component(config) .map(|component| component.enabled) @@ -41,6 +58,18 @@ pub(super) fn set_component_enabled(config: &mut PluginConfig, enabled: bool) { } } +pub(super) fn adaptive_component_enabled(config: &PluginConfig) -> bool { + adaptive_component(config) + .map(|component| component.enabled) + .unwrap_or(false) +} + +pub(super) fn set_adaptive_component_enabled(config: &mut PluginConfig, enabled: bool) { + if let Some(component) = adaptive_component_mut(config) { + component.enabled = enabled; + } +} + pub(super) fn component_observability_config( config: &PluginConfig, ) -> Result { @@ -51,12 +80,22 @@ pub(super) fn component_observability_config( .ok_or_else(|| CliError::Config("observability plugin component is missing".into())) } -pub(super) fn config_with_observability( +pub(super) fn component_adaptive_config(config: &PluginConfig) -> Result { + adaptive_component(config) + .map(|component| serde_json::from_value(Value::Object(component.config.clone()))) + .transpose() + .map_err(|error| CliError::Config(format!("invalid adaptive plugin config: {error}")))? + .ok_or_else(|| CliError::Config("adaptive plugin component is missing".into())) +} + +pub(super) fn config_with_components( config: &PluginConfig, observability: &ObservabilityConfig, + adaptive: &AdaptiveConfig, ) -> Result { let mut config = config.clone(); store_observability_config(&mut config, observability)?; + store_adaptive_config(&mut config, adaptive)?; Ok(config) } @@ -73,7 +112,20 @@ pub(super) fn store_observability_config( Ok(()) } -pub(super) fn ensure_section(config: &mut ObservabilityConfig, section: EditorFieldSpec) { +pub(super) fn store_adaptive_config( + config: &mut PluginConfig, + adaptive: &AdaptiveConfig, +) -> Result<(), CliError> { + if let Some(component) = adaptive_component_mut(config) { + merge_adaptive_editor_config(&mut component.config, adaptive_config_map(adaptive)?); + } + Ok(()) +} + +pub(super) fn ensure_section(config: &mut T, section: EditorFieldSpec) +where + T: Serialize + DeserializeOwned, +{ if let Ok(Some(Value::Object(_))) = section_value(config, section) { return; } @@ -83,23 +135,32 @@ pub(super) fn ensure_section(config: &mut ObservabilityConfig, section: EditorFi let _ = set_struct_field(config, section.name, default); } -pub(super) fn toggle_section(config: &mut ObservabilityConfig, section: EditorFieldSpec) { +pub(super) fn toggle_section(config: &mut T, section: EditorFieldSpec) +where + T: Serialize + DeserializeOwned, +{ ensure_section(config, section); let enabled = section_enabled(config, section).unwrap_or(false); let _ = set_section_field(config, section, "enabled", json!(!enabled)); } -pub(super) fn reset_section(config: &mut ObservabilityConfig, section: EditorFieldSpec) { +pub(super) fn reset_section(config: &mut T, section: EditorFieldSpec) +where + T: Serialize + DeserializeOwned, +{ let value = section.default_value().unwrap_or_else(|| json!({})); let _ = set_struct_field(config, section.name, value); } -pub(super) fn reset_selected_field( - config: &mut ObservabilityConfig, +pub(super) fn reset_selected_field( + config: &mut T, section: EditorFieldSpec, fields: &[EditorFieldSpec], selected: usize, -) -> Result { +) -> Result +where + T: Serialize + DeserializeOwned, +{ let offset = usize::from(section_has_enabled_toggle(section)); let Some(index) = selected.checked_sub(offset) else { return Ok(false); @@ -119,10 +180,10 @@ pub(super) fn section_has_enabled_toggle(section: EditorFieldSpec) -> bool { .is_some_and(|field| field.kind == EditorFieldKind::Boolean) } -pub(super) fn section_enabled( - config: &ObservabilityConfig, - section: EditorFieldSpec, -) -> Option { +pub(super) fn section_enabled(config: &T, section: EditorFieldSpec) -> Option +where + T: Serialize, +{ section_value(config, section) .ok() .flatten() @@ -130,7 +191,10 @@ pub(super) fn section_enabled( .and_then(|enabled| enabled.as_bool()) } -pub(super) fn section_configured(config: &ObservabilityConfig, section: EditorFieldSpec) -> bool { +pub(super) fn section_configured(config: &T, section: EditorFieldSpec) -> bool +where + T: Serialize, +{ let Ok(Some(value)) = section_value(config, section) else { return false; }; @@ -143,11 +207,14 @@ pub(super) fn section_configured(config: &ObservabilityConfig, section: EditorFi .is_none_or(|default| default != &value) } -pub(super) fn section_field_configured( - config: &ObservabilityConfig, +pub(super) fn section_field_configured( + config: &T, section: EditorFieldSpec, field: EditorFieldSpec, -) -> Result { +) -> Result +where + T: Serialize, +{ let Some(value) = section_field_value(config, section, field.name)? else { return Ok(false); }; @@ -159,20 +226,26 @@ pub(super) fn section_field_configured( .is_none_or(|default| default != &value)) } -pub(super) fn section_field_value( - config: &ObservabilityConfig, +pub(super) fn section_field_value( + config: &T, section: EditorFieldSpec, field: &str, -) -> Result, CliError> { +) -> Result, CliError> +where + T: Serialize, +{ Ok(section_value(config, section)? .and_then(|section| section.as_object().cloned()) .and_then(|section| section.get(field).cloned())) } -pub(super) fn section_value( - config: &ObservabilityConfig, +pub(super) fn section_value( + config: &T, section: EditorFieldSpec, -) -> Result, CliError> { +) -> Result, CliError> +where + T: Serialize, +{ let value = serde_json::to_value(config).map_err(serde_error)?; Ok(value .as_object() @@ -181,12 +254,15 @@ pub(super) fn section_value( .cloned()) } -pub(super) fn set_section_field( - config: &mut ObservabilityConfig, +pub(super) fn set_section_field( + config: &mut T, section: EditorFieldSpec, field: &str, value: Value, -) -> Result<(), CliError> { +) -> Result<(), CliError> +where + T: Serialize + DeserializeOwned, +{ ensure_section(config, section); let mut object = serde_json::to_value(&*config).map_err(serde_error)?; let config_object = ensure_object(&mut object); @@ -198,11 +274,14 @@ pub(super) fn set_section_field( Ok(()) } -pub(super) fn remove_section_field( - config: &mut ObservabilityConfig, +pub(super) fn remove_section_field( + config: &mut T, section: EditorFieldSpec, field: &str, -) -> Result<(), CliError> { +) -> Result<(), CliError> +where + T: Serialize + DeserializeOwned, +{ let mut object = serde_json::to_value(&*config).map_err(serde_error)?; if let Some(section_object) = object .as_object_mut() @@ -225,6 +304,69 @@ where Ok(()) } +pub(super) fn remove_struct_field(target: &mut T, field: &str) -> Result<(), CliError> +where + T: Serialize + DeserializeOwned, +{ + let mut object = serde_json::to_value(&*target).map_err(serde_error)?; + if let Some(object) = object.as_object_mut() { + object.remove(field); + } + *target = serde_json::from_value(object).map_err(serde_error)?; + Ok(()) +} + +pub(super) fn config_field_value(config: &T, field: &str) -> Result, CliError> +where + T: Serialize, +{ + let value = serde_json::to_value(config).map_err(serde_error)?; + Ok(value + .as_object() + .and_then(|config| config.get(field)) + .filter(|value| !value.is_null()) + .cloned()) +} + +pub(super) fn config_field_configured( + config: &T, + field: EditorFieldSpec, +) -> Result +where + T: Default + Serialize, +{ + let Some(value) = config_field_value(config, field.name)? else { + return Ok(false); + }; + if field.optional { + return Ok(true); + } + Ok(default_config_field_value::(field) + .as_ref() + .is_none_or(|default| default != &value)) +} + +pub(super) fn reset_config_field(config: &mut T, field: EditorFieldSpec) -> Result<(), CliError> +where + T: Default + Serialize + DeserializeOwned, +{ + if let Some(default) = default_config_field_value::(field) { + set_struct_field(config, field.name, default) + } else { + remove_struct_field(config, field.name) + } +} + +pub(super) fn default_config_field_value(field: EditorFieldSpec) -> Option +where + T: Default + Serialize, +{ + serde_json::to_value(T::default()) + .ok() + .and_then(|value| value.as_object().cloned()) + .and_then(|config| config.get(field.name).cloned()) +} + pub(super) fn observability_component(config: &PluginConfig) -> Option<&PluginComponentSpec> { config .components @@ -241,6 +383,22 @@ pub(super) fn observability_component_mut( .find(|component| component.kind == OBSERVABILITY_PLUGIN_KIND) } +pub(super) fn adaptive_component(config: &PluginConfig) -> Option<&PluginComponentSpec> { + config + .components + .iter() + .find(|component| component.kind == ADAPTIVE_PLUGIN_KIND) +} + +pub(super) fn adaptive_component_mut( + config: &mut PluginConfig, +) -> Option<&mut PluginComponentSpec> { + config + .components + .iter_mut() + .find(|component| component.kind == ADAPTIVE_PLUGIN_KIND) +} + pub(super) fn ensure_object(value: &mut Value) -> &mut Map { if !value.is_object() { *value = json!({}); @@ -260,6 +418,21 @@ pub(super) fn observability_config_map( } } +pub(super) fn adaptive_config_map(config: &AdaptiveConfig) -> Result, CliError> { + let value = serde_json::to_value(config).map_err(serde_error)?; + match value { + Value::Object(mut map) => { + if map.get("version") == Some(&json!(1)) { + map.remove("version"); + } + Ok(map) + } + _ => Err(CliError::Config( + "adaptive config must serialize to an object".into(), + )), + } +} + pub(super) fn merge_observability_editor_config( existing: &mut Map, edited: Map, @@ -272,6 +445,21 @@ pub(super) fn merge_observability_editor_config( ); } +pub(super) fn merge_adaptive_editor_config( + existing: &mut Map, + edited: Map, +) { + if existing.get("version") == Some(&json!(1)) { + existing.remove("version"); + } + merge_known_editor_object( + existing, + edited, + &nested_editor_keys(AdaptiveConfig::editor_schema()), + AdaptiveConfig::editor_schema(), + ); +} + pub(super) fn merge_known_editor_object( existing: &mut Map, edited: Map, @@ -383,3 +571,26 @@ pub(super) fn observability_summary( } ) } + +pub(super) fn adaptive_summary(config: &PluginConfig, adaptive: &AdaptiveConfig) -> String { + let configured_fields = AdaptiveConfig::editor_schema() + .fields + .iter() + .filter(|field| field.name != POLICY_SECTION) + .filter(|field| config_field_configured(adaptive, **field).unwrap_or(false)) + .map(|field| field.label) + .collect::>(); + format!( + "component {}, fields {}", + if adaptive_component_enabled(config) { + "enabled" + } else { + "disabled" + }, + if configured_fields.is_empty() { + "none".into() + } else { + configured_fields.join(", ") + } + ) +} diff --git a/crates/cli/src/server.rs b/crates/cli/src/server.rs index 9e937152..b942ba18 100644 --- a/crates/cli/src/server.rs +++ b/crates/cli/src/server.rs @@ -8,6 +8,7 @@ use axum::http::HeaderMap; use axum::routing::{get, post}; use axum::{Json, Router}; use nemo_flow::plugin::{PluginConfig, clear_plugin_configuration, initialize_plugins}; +use nemo_flow_adaptive::plugin_component::register_adaptive_component; use reqwest::Client; use serde_json::Value; use tokio::net::TcpListener; @@ -152,6 +153,9 @@ impl PluginActivation { let Some(config) = config else { return Ok(Self { active: false }); }; + register_adaptive_component().map_err(|error| { + CliError::Config(format!("adaptive plugin registration failed: {error}")) + })?; let plugin_config: PluginConfig = serde_json::from_value(config) .map_err(|error| CliError::Config(format!("invalid plugin config: {error}")))?; initialize_plugins(plugin_config) diff --git a/crates/cli/src/setup.rs b/crates/cli/src/setup.rs index 7f9038b4..2d127c29 100644 --- a/crates/cli/src/setup.rs +++ b/crates/cli/src/setup.rs @@ -133,7 +133,7 @@ pub(crate) async fn run(agent_hint: Option) -> Result<(), CliError> for path in &written { println!(" {}", path.display()); } - println!(" Configure observability with `nemo-flow plugins edit`."); + println!(" Configure plugins with `nemo-flow plugins edit`."); println!(); Ok(()) } diff --git a/crates/cli/tests/coverage/gateway_tests.rs b/crates/cli/tests/coverage/gateway_tests.rs index 6f46c382..19bd3b7a 100644 --- a/crates/cli/tests/coverage/gateway_tests.rs +++ b/crates/cli/tests/coverage/gateway_tests.rs @@ -7,7 +7,7 @@ use crate::server::AppState; use crate::session::SessionManager; use axum::body::Body; use axum::extract::State; -use axum::http::{HeaderMap, HeaderValue, Method, Request, StatusCode}; +use axum::http::{HeaderMap, HeaderValue, Method, Request, StatusCode, header}; use http_body_util::BodyExt; use reqwest::Client; @@ -140,6 +140,105 @@ fn openai_upstream_url_accepts_origin_or_v1_base() { ); } +#[test] +fn effective_upstream_request_overlays_runtime_body_and_headers() { + let original_body = Bytes::from_static(br#"{"model":"original"}"#); + let mut original_headers = HeaderMap::new(); + original_headers.insert( + header::AUTHORIZATION, + HeaderValue::from_static("Bearer original"), + ); + let request = LlmRequest { + headers: Map::from_iter([ + ("x-runtime".to_string(), json!("enabled")), + ("x-runtime-json".to_string(), json!({ "enabled": true })), + ]), + content: json!({ + "model": "rewritten", + "nvext": { "agent_hints": { "priority": 1 } } + }), + }; + + let (body, headers) = + effective_upstream_request(&original_body, &original_headers, Some(&request)); + let body: Value = serde_json::from_slice(&body).unwrap(); + + assert_eq!(body["model"], json!("rewritten")); + assert_eq!(body["nvext"]["agent_hints"]["priority"], json!(1)); + assert_eq!( + headers.get(header::AUTHORIZATION).unwrap(), + "Bearer original" + ); + assert_eq!(headers.get("x-runtime").unwrap(), "enabled"); + assert_eq!( + headers.get("x-runtime-json").unwrap(), + r#"{"enabled":true}"# + ); +} + +#[test] +fn effective_upstream_request_returns_original_without_runtime_request() { + let original_body = Bytes::from_static(br#"{"model":"original"}"#); + let mut original_headers = HeaderMap::new(); + original_headers.insert( + header::AUTHORIZATION, + HeaderValue::from_static("Bearer original"), + ); + original_headers.insert("x-request-id", HeaderValue::from_static("request-1")); + + let (body, headers) = effective_upstream_request(&original_body, &original_headers, None); + + assert_eq!(body, original_body); + assert_eq!( + headers.get(header::AUTHORIZATION).unwrap(), + "Bearer original" + ); + assert_eq!(headers.get("x-request-id").unwrap(), "request-1"); +} + +#[test] +fn effective_upstream_request_preserves_original_body_for_null_runtime_content() { + let original_body = Bytes::from_static(b"not-json-but-still-upstream-body"); + let mut original_headers = HeaderMap::new(); + original_headers.insert("x-original", HeaderValue::from_static("kept")); + let request = LlmRequest { + headers: Map::from_iter([("x-runtime".to_string(), json!("enabled"))]), + content: Value::Null, + }; + + let (body, headers) = + effective_upstream_request(&original_body, &original_headers, Some(&request)); + + assert_eq!(body, original_body); + assert_eq!(headers.get("x-original").unwrap(), "kept"); + assert_eq!(headers.get("x-runtime").unwrap(), "enabled"); +} + +#[test] +fn effective_upstream_request_skips_invalid_runtime_headers() { + let original_body = Bytes::from_static(br#"{"model":"original"}"#); + let mut original_headers = HeaderMap::new(); + original_headers.insert("x-original", HeaderValue::from_static("kept")); + let request = LlmRequest { + headers: Map::from_iter([ + ("bad header".to_string(), json!("skip")), + ("x-invalid-value".to_string(), json!("line\nbreak")), + ("x-good".to_string(), json!("ok")), + ]), + content: json!({ "model": "rewritten" }), + }; + + let (body, headers) = + effective_upstream_request(&original_body, &original_headers, Some(&request)); + let body: Value = serde_json::from_slice(&body).unwrap(); + + assert_eq!(body["model"], json!("rewritten")); + assert_eq!(headers.get("x-original").unwrap(), "kept"); + assert_eq!(headers.get("x-good").unwrap(), "ok"); + assert!(headers.get("x-invalid-value").is_none()); + assert!(headers.keys().all(|name| name.as_str() != "bad header")); +} + #[test] fn gateway_session_id_prefers_headers_and_has_fallbacks() { let mut headers = HeaderMap::new(); diff --git a/crates/cli/tests/coverage/plugins_tests.rs b/crates/cli/tests/coverage/plugins_tests.rs index 674fcd4e..0f34ea80 100644 --- a/crates/cli/tests/coverage/plugins_tests.rs +++ b/crates/cli/tests/coverage/plugins_tests.rs @@ -5,6 +5,32 @@ use super::*; use crate::config::{global_plugin_config_path, project_plugin_config_path}; use nemo_flow::observability::plugin_component::OBSERVABILITY_PLUGIN_KIND; use nemo_flow::plugin::{ConfigPolicy, PluginComponentSpec, PluginConfig}; +use nemo_flow_adaptive::AdaptiveConfig; +use nemo_flow_adaptive::plugin_component::ADAPTIVE_PLUGIN_KIND; + +fn adaptive_component_config(agent_id: &str) -> serde_json::Map { + json!({ + "agent_id": agent_id, + "state": { + "backend": { + "kind": "in_memory", + "config": {} + } + }, + "telemetry": { + "learners": ["tool_parallelism"] + }, + "adaptive_hints": { + "priority": 100, + "break_chain": false, + "inject_header": true, + "inject_body_path": "nvext.agent_hints" + } + }) + .as_object() + .unwrap() + .clone() +} #[test] fn target_scope_defaults_to_user_and_rejects_conflicts() { @@ -59,6 +85,34 @@ fn typed_editor_model_contains_observability_sections() { ); } +#[test] +fn typed_editor_model_contains_adaptive_options() { + let schema = AdaptiveConfig::editor_schema(); + assert!(!schema.fields.iter().any(|field| field.name == "version")); + assert!(schema.fields.iter().any(|field| field.name == "agent_id")); + + let state = schema.field("state").unwrap().schema().unwrap(); + let backend = state.field("backend").unwrap().schema().unwrap(); + assert_eq!( + backend.field("kind").unwrap().enum_values, + &["in_memory", "redis"] + ); + assert_eq!(backend.field("config").unwrap().kind, EditorFieldKind::Json); + + let telemetry = schema.field("telemetry").unwrap().schema().unwrap(); + assert_eq!( + telemetry.field("learners").unwrap().kind, + EditorFieldKind::Json + ); + + let acg = schema.field("acg").unwrap().schema().unwrap(); + let thresholds = acg.field("stability_thresholds").unwrap().schema().unwrap(); + assert_eq!( + thresholds.field("stable_threshold").unwrap().kind, + EditorFieldKind::Float + ); +} + #[test] fn plugin_menu_uses_setup_theme_markers() { let theme = ColorfulTheme::default(); @@ -77,6 +131,16 @@ fn plugin_menu_uses_setup_theme_markers() { assert!(!rendered.contains("> First")); } +#[test] +fn menu_response_index_tracks_selected_and_shortcut_positions() { + assert_eq!(menu_response_index(&MenuResponse::Selected(3)), Some(3)); + assert_eq!( + menu_response_index(&MenuResponse::Shortcut(MenuShortcut::Reset, 4)), + Some(4) + ); + assert_eq!(menu_response_index(&MenuResponse::Cancel), None); +} + #[test] fn plugin_menu_marks_configured_sections_and_fields() { let mut observability = ObservabilityConfig::default(); @@ -110,6 +174,19 @@ fn editor_model_renders_valid_observability_plugin_config() { validate_config(&config).unwrap(); } +#[test] +fn editor_model_adds_disabled_adaptive_component() { + let mut config = PluginConfig::default(); + + ensure_adaptive_component(&mut config).unwrap(); + + let component = adaptive_component(&config).unwrap(); + assert_eq!(component.kind, ADAPTIVE_PLUGIN_KIND); + assert!(!component.enabled); + assert!(!component.config.contains_key("version")); + assert!(component.config.contains_key("policy")); +} + #[test] fn typed_editor_serializes_explicit_observability_overrides() { let mut observability = ObservabilityConfig::default(); @@ -193,6 +270,176 @@ fn editor_save_preserves_unknown_observability_fields() { assert!(!atof_config.contains_key("output_directory")); } +#[test] +fn editor_save_preserves_unknown_adaptive_fields_and_all_sections() { + let mut config = PluginConfig { + components: vec![PluginComponentSpec { + kind: ADAPTIVE_PLUGIN_KIND.to_string(), + enabled: true, + config: json!({ + "version": 1, + "future_top_level": "preserve", + "state": { + "future_state": "preserve", + "backend": { + "kind": "in_memory", + "config": {}, + "future_backend": "preserve" + } + } + }) + .as_object() + .unwrap() + .clone(), + }], + ..PluginConfig::default() + }; + let mut adaptive = component_adaptive_config(&config).unwrap(); + let schema = AdaptiveConfig::editor_schema(); + let state = schema.field("state").unwrap(); + let telemetry = schema.field("telemetry").unwrap(); + let adaptive_hints = schema.field("adaptive_hints").unwrap(); + let tool_parallelism = schema.field("tool_parallelism").unwrap(); + let acg = schema.field("acg").unwrap(); + + set_struct_field(&mut adaptive, "agent_id", json!("planner")).unwrap(); + set_section_field( + &mut adaptive, + state, + "backend", + json!({ + "kind": "redis", + "config": { + "url": "redis://127.0.0.1/", + "key_prefix": "adaptive:" + } + }), + ) + .unwrap(); + set_section_field( + &mut adaptive, + telemetry, + "learners", + json!(["tool_parallelism", "acg"]), + ) + .unwrap(); + set_section_field( + &mut adaptive, + telemetry, + "subscriber_name", + json!("adaptive"), + ) + .unwrap(); + set_section_field( + &mut adaptive, + adaptive_hints, + "inject_body_path", + json!("nvext.agent_hints"), + ) + .unwrap(); + set_section_field( + &mut adaptive, + tool_parallelism, + "mode", + json!("inject_hints"), + ) + .unwrap(); + set_section_field(&mut adaptive, acg, "provider", json!("anthropic")).unwrap(); + set_section_field( + &mut adaptive, + acg, + "stability_thresholds", + json!({ + "stable_threshold": 0.9, + "semi_stable_threshold": 0.4, + "min_observations_for_full_confidence": 10 + }), + ) + .unwrap(); + + store_adaptive_config(&mut config, &adaptive).unwrap(); + + let component = adaptive_component(&config).unwrap(); + assert!(!component.config.contains_key("version")); + assert_eq!( + component.config.get("future_top_level"), + Some(&json!("preserve")) + ); + let state = component.config["state"].as_object().unwrap(); + assert_eq!(state.get("future_state"), Some(&json!("preserve"))); + let backend = state["backend"].as_object().unwrap(); + assert_eq!(backend.get("kind"), Some(&json!("redis"))); + assert_eq!(backend.get("future_backend"), Some(&json!("preserve"))); + assert_eq!(backend["config"]["key_prefix"], json!("adaptive:")); + assert_eq!( + component.config["telemetry"]["learners"], + json!(["tool_parallelism", "acg"]) + ); + assert_eq!( + component.config["adaptive_hints"]["inject_body_path"], + json!("nvext.agent_hints") + ); + assert_eq!( + component.config["tool_parallelism"]["mode"], + json!("inject_hints") + ); + assert_eq!( + component.config["acg"]["stability_thresholds"]["stable_threshold"], + json!(0.9) + ); +} + +#[test] +fn adaptive_config_field_reset_handles_optional_and_default_fields() { + let mut adaptive = AdaptiveConfig { + agent_id: Some("planner".into()), + acg: Some(Default::default()), + ..AdaptiveConfig::default() + }; + let schema = AdaptiveConfig::editor_schema(); + + reset_config_field(&mut adaptive, schema.field("agent_id").unwrap()).unwrap(); + reset_config_field(&mut adaptive, schema.field("acg").unwrap()).unwrap(); + + assert!(adaptive.agent_id.is_none()); + assert!(adaptive.acg.is_none()); +} + +#[test] +fn observability_config_field_reset_clears_optional_section() { + let mut observability = ObservabilityConfig::default(); + let atof = ObservabilityConfig::editor_schema().field("atof").unwrap(); + toggle_section(&mut observability, atof); + + reset_config_field(&mut observability, atof).unwrap(); + + assert!(observability.atof.is_none()); +} + +#[test] +fn adaptive_summary_tracks_component_and_configured_fields() { + let mut config = PluginConfig::default(); + ensure_adaptive_component(&mut config).unwrap(); + let mut adaptive = component_adaptive_config(&config).unwrap(); + + assert_eq!( + adaptive_summary(&config, &adaptive), + "component disabled, fields none" + ); + + set_adaptive_component_enabled(&mut config, true); + set_struct_field(&mut adaptive, "agent_id", json!("planner")).unwrap(); + let adaptive_hints = AdaptiveConfig::editor_schema() + .field("adaptive_hints") + .unwrap(); + set_section_field(&mut adaptive, adaptive_hints, "inject_header", json!(true)).unwrap(); + + assert_eq!( + adaptive_summary(&config, &adaptive), + "component enabled, fields agent_id, adaptive_hints" + ); +} + #[test] fn component_enablement_and_summary_track_config_state() { let mut config = PluginConfig::default(); @@ -262,15 +509,39 @@ fn write_plugin_config_prunes_defaults_and_round_trips() { let path = temp.path().join("plugins.toml"); let mut config = PluginConfig::default(); ensure_observability_component(&mut config).unwrap(); + config.components.push(PluginComponentSpec { + kind: ADAPTIVE_PLUGIN_KIND.to_string(), + enabled: true, + config: adaptive_component_config("cli-roundtrip"), + }); write_plugin_config(&path, &config).unwrap(); let rendered = std::fs::read_to_string(&path).unwrap(); assert!(rendered.contains("kind = \"observability\"")); + assert!(rendered.contains("kind = \"adaptive\"")); assert!(!rendered.contains("enabled = true")); let round_tripped = read_plugin_config(&path).unwrap(); - assert_eq!(round_tripped.components.len(), 1); + assert_eq!(round_tripped.components.len(), 2); assert_eq!(round_tripped.components[0].kind, OBSERVABILITY_PLUGIN_KIND); + let adaptive = round_tripped + .components + .iter() + .find(|component| component.kind == ADAPTIVE_PLUGIN_KIND) + .unwrap(); + assert_eq!( + adaptive.config.get("agent_id"), + Some(&json!("cli-roundtrip")) + ); + let adaptive_hints = adaptive + .config + .get("adaptive_hints") + .and_then(Value::as_object) + .unwrap(); + assert_eq!( + adaptive_hints.get("inject_body_path"), + Some(&json!("nvext.agent_hints")) + ); } #[test] @@ -322,6 +593,20 @@ fn validate_config_reports_plugin_diagnostics() { assert!(error.contains("ATOF mode"), "error was: {error}"); } +#[test] +fn validate_config_accepts_adaptive_component() { + let config = PluginConfig { + components: vec![PluginComponentSpec { + kind: ADAPTIVE_PLUGIN_KIND.to_string(), + enabled: true, + config: adaptive_component_config("cli-validation"), + }], + ..PluginConfig::default() + }; + + validate_config(&config).unwrap(); +} + #[test] fn display_helpers_render_scalars_json_and_defaults() { assert_eq!(display_value(&json!("logs")), "logs"); @@ -341,6 +626,30 @@ fn display_helpers_render_scalars_json_and_defaults() { ); } +#[test] +fn parse_float_value_rejects_non_finite_numbers() { + let field = EditorFieldSpec { + name: "stable_threshold", + label: "Stable threshold", + kind: EditorFieldKind::Float, + enum_values: &[], + optional: false, + nested_schema: None, + nested_default: None, + }; + + assert_eq!(parse_float_value(&field, "0.75").unwrap(), json!(0.75)); + + for value in ["inf", "-inf", "NaN"] { + let error = parse_float_value(&field, value).unwrap_err().to_string(); + assert!( + error.contains("stable_threshold must be a finite number"), + "error was: {error}" + ); + assert!(error.contains(value), "error was: {error}"); + } +} + #[test] fn target_path_resolves_project_and_global_without_user_env() { let cwd = std::env::current_dir().unwrap(); diff --git a/crates/cli/tests/coverage/server_tests.rs b/crates/cli/tests/coverage/server_tests.rs index a2a8c142..3fd4a57c 100644 --- a/crates/cli/tests/coverage/server_tests.rs +++ b/crates/cli/tests/coverage/server_tests.rs @@ -358,6 +358,48 @@ async fn serve_listener_activates_any_registered_plugin_kind() { let _ = deregister_plugin(GENERIC_TEST_PLUGIN_KIND); } +#[tokio::test] +async fn serve_listener_activates_adaptive_plugin_config() { + let _guard = PLUGIN_TEST_LOCK.lock().await; + let _ = nemo_flow::plugin::clear_plugin_configuration(); + + let mut config = test_config(); + config.plugin_config = Some(json!({ + "version": 1, + "components": [ + { + "kind": "adaptive", + "enabled": true, + "config": { + "version": 1, + "agent_id": "cli-test", + "state": { + "backend": { + "kind": "in_memory", + "config": {} + } + } + } + } + ] + })); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let address = listener.local_addr().unwrap(); + let url = format!("http://{address}"); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let handle = + tokio::spawn(async move { serve_listener(listener, config, Some(shutdown_rx)).await }); + + wait_for_gateway(&url).await; + let report = nemo_flow::plugin::active_plugin_report().unwrap(); + assert!(report.diagnostics.is_empty()); + + shutdown_tx.send(()).unwrap(); + handle.await.unwrap().unwrap(); + assert!(nemo_flow::plugin::active_plugin_report().is_none()); +} + #[tokio::test] async fn serve_listener_rejects_invalid_plugin_config() { let _guard = PLUGIN_TEST_LOCK.lock().await; @@ -751,9 +793,13 @@ async fn spawn_upstream(streaming: bool) -> TestServer { let payload: Value = serde_json::from_slice(&body).unwrap(); Json(json!({ "model": payload["model"], + "input": payload["input"], "authorization": headers .get(header::AUTHORIZATION) .and_then(|value| value.to_str().ok()), + "x_test_intercept": headers + .get("x-test-intercept") + .and_then(|value| value.to_str().ok()), "connection": headers .get(header::CONNECTION) .and_then(|value| value.to_str().ok()) diff --git a/crates/core/src/config_editor.rs b/crates/core/src/config_editor.rs index 4094d5fc..5ef8ecc7 100644 --- a/crates/core/src/config_editor.rs +++ b/crates/core/src/config_editor.rs @@ -18,6 +18,8 @@ pub enum EditorFieldKind { String, /// Integer value. Integer, + /// Floating-point number value. + Float, /// String enum with a fixed set of allowed values. Enum, /// Object with string keys and string values. @@ -132,6 +134,7 @@ macro_rules! editor_config { (@kind Boolean) => { $crate::config_editor::EditorFieldKind::Boolean }; (@kind String) => { $crate::config_editor::EditorFieldKind::String }; (@kind Integer) => { $crate::config_editor::EditorFieldKind::Integer }; + (@kind Float) => { $crate::config_editor::EditorFieldKind::Float }; (@kind Enum) => { $crate::config_editor::EditorFieldKind::Enum }; (@kind StringMap) => { $crate::config_editor::EditorFieldKind::StringMap }; (@kind Json) => { $crate::config_editor::EditorFieldKind::Json }; diff --git a/docs/build-plugins/plugin-configuration-files.md b/docs/build-plugins/plugin-configuration-files.md index 18504c88..65a02c5c 100644 --- a/docs/build-plugins/plugin-configuration-files.md +++ b/docs/build-plugins/plugin-configuration-files.md @@ -97,7 +97,7 @@ without process-level plugin activation. ## Editing Files -Use the interactive editor for Observability plugin configuration: +Use the interactive editor for Observability and Adaptive plugin configuration: ```bash nemo-flow plugins edit @@ -194,9 +194,9 @@ Tables inside component config merge recursively. ## Explicit Defaults And Overrides -The editor writes explicit defaults for edited Observability sections. This is -intentional. In a layered config model, omitting a field means "inherit a lower -precedence value"; it does not mean "delete that value." +The editor writes explicit defaults for edited Observability and Adaptive +sections. This is intentional. In a layered config model, omitting a field means +"inherit a lower precedence value"; it does not mean "delete that value." For example, this user file disables ATOF even if a project file enables it: diff --git a/docs/integrations/openclaw-plugin.md b/docs/integrations/openclaw-plugin.md index 2548ba42..32de9a48 100644 --- a/docs/integrations/openclaw-plugin.md +++ b/docs/integrations/openclaw-plugin.md @@ -9,13 +9,14 @@ Use the OpenClaw plugin when OpenClaw owns the agent, tool, and LLM lifecycle that needs NeMo Flow observability. The plugin observes supported OpenClaw plugin hooks and converts them into NeMo Flow sessions, LLM spans, tool spans, and marks that the generic NeMo Flow observability component can export as -Agent Trajectory Interchange Format (ATIF) JSON, OpenTelemetry spans, and -OpenInference/Phoenix spans. +Agent Trajectory Interchange Format (ATIF) JSON, OpenTelemetry spans, +OpenInference/Phoenix spans, and adaptive telemetry inputs. -This public OpenClaw plugin provides observability support only. It does not -add NeMo Flow security middleware or adaptive optimization behavior to OpenClaw -execution. For middleware-backed behavior, use the patch-based OpenClaw -integration from the NeMo Flow repository. +This public OpenClaw plugin uses OpenClaw public hooks. It can initialize +generic NeMo Flow plugin components such as `observability` and `adaptive`, but +hook-backed mode does not rewrite OpenClaw tool execution, provider routing, or +model requests. For middleware-backed behavior that changes execution, use the +patch-based OpenClaw integration from the NeMo Flow repository. Use this guide to install the plugin, enable it in OpenClaw, configure telemetry outputs, verify exported traces, and understand current LLM replay fidelity. @@ -103,6 +104,23 @@ access, and place the OpenClaw plugin configuration under "service_name": "openclaw-nemo-flow" } } + }, + { + "kind": "adaptive", + "enabled": true, + "config": { + "version": 1, + "agent_id": "openclaw", + "state": { + "backend": { + "kind": "in_memory", + "config": {} + } + }, + "telemetry": { + "learners": ["tool_parallelism"] + } + } } ] }, @@ -138,7 +156,8 @@ do not use, or set their `enabled` fields to `false`. - `config.enabled` disables or enables the NeMo Flow OpenClaw wrapper without removing the plugin entry. `config.backend` currently supports only `hooks`. - `config.plugins` is the generic NeMo Flow plugin configuration document. Use - this object to configure built-in components such as `observability`. + this object to configure built-in components such as `observability` and + `adaptive`. - `config.plugins.components[].config.atif` writes ATIF trajectory JSON files. Set `output_directory` to the directory where OpenClaw should write files. - `config.plugins.components[].config.opentelemetry` sends generic OTLP spans to @@ -146,6 +165,10 @@ do not use, or set their `enabled` fields to `false`. - `config.plugins.components[].config.openinference` sends OpenInference OTLP spans to Phoenix or another OpenInference-compatible collector when `enabled` is `true`. +- `config.plugins.components[]` entries with `kind: "adaptive"` initialize the + Adaptive plugin. In hook-backed OpenClaw mode, adaptive telemetry can consume + replayed NeMo Flow events, while request-rewrite features such as adaptive + hints require a managed execution path. - `config.capture` controls prompt, response, tool argument, and tool result capture. Tool arguments and tool results are stripped by default because they often contain user data, local paths, tokens, or large payloads. @@ -223,11 +246,11 @@ reason when present. ## Runtime Mapping -The plugin maps supported OpenClaw hook events into NeMo Flow telemetry without -changing OpenClaw execution behavior. +The plugin maps supported OpenClaw hook events into NeMo Flow telemetry and +adaptive inputs without changing OpenClaw execution behavior. It does not change OpenClaw tool execution, provider routing, policy decisions, -or adaptive behavior. +or provider request payloads. | OpenClaw hook | NeMo Flow behavior | | --- | --- | diff --git a/integrations/openclaw/README.md b/integrations/openclaw/README.md index 7ca9166c..f31ebe9e 100644 --- a/integrations/openclaw/README.md +++ b/integrations/openclaw/README.md @@ -9,12 +9,13 @@ SPDX-License-Identifier: Apache-2.0 OpenClaw. It converts supported OpenClaw hook events into NeMo Flow sessions, LLM spans, tool spans, and lifecycle marks that the generic NeMo Flow observability component can export as ATIF JSON, OpenTelemetry spans, and -OpenInference/Phoenix spans. +OpenInference/Phoenix spans. The same generic plugin config path can initialize +Adaptive components for hook-backed telemetry learning. -This public OpenClaw plugin package provides observability support only. It -does not add NeMo Flow security middleware or adaptive optimization behavior to -OpenClaw execution. For middleware-backed behavior, use the patch-based -OpenClaw integration from the NeMo Flow repository. +This public OpenClaw plugin package uses OpenClaw public hooks. It does not +rewrite OpenClaw tool execution, provider routing, policy decisions, or model +requests. For middleware-backed behavior that changes execution, use the +patch-based OpenClaw integration from the NeMo Flow repository. ## Why Use It? @@ -30,6 +31,7 @@ OpenClaw integration from the NeMo Flow repository. - OpenClaw plugin ID `nemo-flow`. - Generic NeMo Flow plugin initialization through `config.plugins`. - ATIF JSON export through the built-in `observability` component. +- Adaptive plugin initialization through `config.plugins`. - Optional OpenTelemetry OTLP export. - Optional OpenInference/Phoenix OTLP export. - Bounded LLM replay correlation across supported OpenClaw hooks. @@ -98,6 +100,23 @@ OpenClaw plugin configuration under `plugins.entries["nemo-flow"].config`: "service_name": "openclaw-nemo-flow" } } + }, + { + "kind": "adaptive", + "enabled": true, + "config": { + "version": 1, + "agent_id": "openclaw", + "state": { + "backend": { + "kind": "in_memory", + "config": {} + } + }, + "telemetry": { + "learners": ["tool_parallelism"] + } + } } ] }, @@ -132,7 +151,8 @@ do not use, or set their `enabled` fields to `false`. - `config.enabled` disables or enables the NeMo Flow OpenClaw wrapper without removing the plugin entry. `config.backend` currently supports only `hooks`. - `config.plugins` is the generic NeMo Flow plugin configuration document. Use - this object to configure built-in components such as `observability`. + this object to configure built-in components such as `observability` and + `adaptive`. - `config.plugins.components[].config.atif` writes ATIF trajectory JSON files. Set `output_directory` to the directory where OpenClaw should write files. - `config.plugins.components[].config.opentelemetry` sends generic OTLP spans to @@ -140,6 +160,10 @@ do not use, or set their `enabled` fields to `false`. - `config.plugins.components[].config.openinference` sends OpenInference OTLP spans to Phoenix or another OpenInference-compatible collector when `enabled` is `true`. +- `config.plugins.components[]` entries with `kind: "adaptive"` initialize the + Adaptive plugin. In hook-backed OpenClaw mode, adaptive telemetry can consume + replayed NeMo Flow events, while request-rewrite features such as adaptive + hints require a managed execution path. - `config.capture` controls prompt, response, tool argument, and tool result capture. Tool arguments and tool results are stripped by default because they often contain user data, local paths, tokens, or large payloads. @@ -184,7 +208,7 @@ The plugin maps supported OpenClaw hook events into NeMo Flow telemetry without changing OpenClaw execution behavior. It does not change OpenClaw tool execution, provider routing, policy decisions, -or adaptive behavior. +or provider request payloads. Current OpenClaw public hooks expose request, response, message-write, and provider timing details through separate event streams. The plugin correlates diff --git a/integrations/openclaw/src/modules.ts b/integrations/openclaw/src/modules.ts index e9a1170a..5d5c2c59 100644 --- a/integrations/openclaw/src/modules.ts +++ b/integrations/openclaw/src/modules.ts @@ -8,6 +8,7 @@ * when the native binding is unavailable, then degrade only at runtime start. */ import type * as NemoFlowRuntime from "nemo-flow-node"; +import type * as NemoFlowAdaptive from "nemo-flow-node/adaptive"; import type * as NemoFlowPluginHost from "nemo-flow-node/plugin"; type NemoFlowRuntimeKeys = @@ -24,6 +25,7 @@ type NemoFlowRuntimeKeys = | "toolCallEnd"; type NemoFlowPluginHostKeys = "defaultConfig" | "validate" | "initialize" | "clear"; +type NemoFlowAdaptiveKeys = "ADAPTIVE_PLUGIN_KIND" | "ComponentSpec"; export type ConfigDiagnostic = NemoFlowPluginHost.ConfigDiagnostic; export type ConfigReport = NemoFlowPluginHost.ConfigReport; @@ -44,22 +46,31 @@ export type NemoFlowRuntimeModule = Omit; +/** + * @internal Adaptive helper subset loaded so the package verifies the built-in + * adaptive plugin path is available alongside the generic plugin host. + */ +export type NemoFlowAdaptiveModule = Pick; + export type NemoFlowModules = { nf: NemoFlowRuntimeModule; pluginHost: NemoFlowPluginHostModule; + adaptive: NemoFlowAdaptiveModule; }; export type NemoFlowModuleLoader = () => Promise; /** Load the runtime and plugin-host modules used by the OpenClaw integration. */ export const defaultNemoFlowModuleLoader: NemoFlowModuleLoader = async () => { - const [nf, pluginHost] = await Promise.all([ + const [nf, pluginHost, adaptive] = await Promise.all([ import("nemo-flow-node"), import("nemo-flow-node/plugin"), + import("nemo-flow-node/adaptive"), ]); return { nf: nf as NemoFlowRuntimeModule, pluginHost: pluginHost as NemoFlowPluginHostModule, + adaptive: adaptive as NemoFlowAdaptiveModule, }; }; diff --git a/integrations/openclaw/test/config.test.ts b/integrations/openclaw/test/config.test.ts index 1817f108..9d6682b1 100644 --- a/integrations/openclaw/test/config.test.ts +++ b/integrations/openclaw/test/config.test.ts @@ -69,6 +69,41 @@ describe("nemo-flow OpenClaw plugin shell", () => { assert.deepEqual(config.plugins, pluginConfig); }); + it("keeps adaptive components in the generic plugin config path", () => { + const pluginConfig = { + version: 1, + components: [ + { + kind: "adaptive", + enabled: true, + config: { + version: 1, + agent_id: "openclaw", + state: { + backend: { + kind: "in_memory", + config: {}, + }, + }, + telemetry: { + learners: ["tool_parallelism"], + }, + adaptive_hints: { + priority: 100, + break_chain: false, + inject_header: true, + inject_body_path: "nvext.agent_hints", + }, + }, + }, + ], + }; + + const config = parseConfig({ plugins: pluginConfig }); + + assert.deepEqual(config.plugins, pluginConfig); + }); + it("rejects unsupported backends and invalid correlation values", () => { assert.throws( () => parseConfig({ backend: "managed_execution" }), @@ -209,6 +244,23 @@ describe("nemo-flow OpenClaw plugin shell", () => { openinference: { enabled: true, endpoint: "http://phoenix.example" }, }, }, + { + kind: "adaptive", + enabled: true, + config: { + version: 1, + agent_id: "openclaw", + state: { + backend: { + kind: "in_memory", + config: {}, + }, + }, + telemetry: { + learners: ["tool_parallelism"], + }, + }, + }, ], }; const modules = createModules(); @@ -234,6 +286,46 @@ describe("nemo-flow OpenClaw plugin shell", () => { } }); + it("passes adaptive helper components through plugin host initialization", async () => { + const modules = createModules(); + const configuredPlugins = { + version: 1, + components: [ + modules.adaptive.ComponentSpec({ + version: 1, + agent_id: "openclaw-helper", + state: { + backend: { + kind: "in_memory", + config: {}, + }, + }, + adaptive_hints: { + priority: 100, + break_chain: false, + inject_header: true, + inject_body_path: "nvext.agent_hints", + }, + }), + ], + }; + const api = createApi({ pluginConfig: { plugins: configuredPlugins } }); + + registerPlugin(api, async () => modules); + const service = api.calls.services[0]; + assert.ok(service); + try { + await service.start({ stateDir: "/tmp/openclaw-state", config: {} as never, logger: api.logger }); + + assert.deepEqual(modules.pluginHost.calls.validate, [configuredPlugins]); + assert.deepEqual(modules.pluginHost.calls.initialize, [configuredPlugins]); + assert.equal(configuredPlugins.components[0]?.kind, modules.adaptive.ADAPTIVE_PLUGIN_KIND); + assert.equal(configuredPlugins.components[0]?.enabled, true); + } finally { + await service.stop?.({ stateDir: "/tmp/openclaw-state", config: {} as never, logger: api.logger }); + } + }); + it("continues hook-backed telemetry when plugin host validation fails", async () => { const modules = createModules({ validateDiagnostics: [{ level: "error", code: "bad_config", message: "invalid" }], @@ -656,8 +748,20 @@ function createModules(params: { } = {}): TestModules { const nf = createNemoFlowRuntime(); const calls: TestPluginHost["calls"] = { validate: [], initialize: [], clear: 0 }; + const adaptive: TestModules["adaptive"] = { + ADAPTIVE_PLUGIN_KIND: "adaptive", + ComponentSpec: ( + config: Parameters[0], + options?: Parameters[1], + ) => ({ + kind: adaptive.ADAPTIVE_PLUGIN_KIND, + enabled: options?.enabled ?? true, + config, + }), + }; return { nf, + adaptive, pluginHost: { calls, defaultConfig: () => ({ version: 1, components: [] }), diff --git a/integrations/openclaw/test/live-smoke.test.ts b/integrations/openclaw/test/live-smoke.test.ts index 0cfbc0e8..0f343285 100644 --- a/integrations/openclaw/test/live-smoke.test.ts +++ b/integrations/openclaw/test/live-smoke.test.ts @@ -45,6 +45,23 @@ it( }, }, }, + { + kind: "adaptive", + enabled: true, + config: { + version: 1, + agent_id: "openclaw-live", + state: { + backend: { + kind: "in_memory", + config: {}, + }, + }, + telemetry: { + learners: ["tool_parallelism"], + }, + }, + }, ], }, },