From 77741ccb5a4239ad510a3fb316d2c76682f0435d Mon Sep 17 00:00:00 2001 From: Will Killian Date: Mon, 18 May 2026 21:00:53 -0400 Subject: [PATCH 1/3] feat: enable adaptive plugin for CLI and OpenClaw Signed-off-by: Will Killian --- Cargo.lock | 1 + crates/cli/Cargo.toml | 1 + crates/cli/src/gateway.rs | 102 ++++++++++++----- crates/cli/src/plugins/config_io.rs | 4 + crates/cli/src/server.rs | 4 + crates/cli/tests/coverage/gateway_tests.rs | 83 +++++++++++++- crates/cli/tests/coverage/plugins_tests.rs | 66 ++++++++++- crates/cli/tests/coverage/server_tests.rs | 46 ++++++++ docs/integrations/openclaw-plugin.md | 43 ++++++-- integrations/openclaw/README.md | 38 +++++-- integrations/openclaw/src/modules.ts | 13 ++- integrations/openclaw/test/config.test.ts | 103 ++++++++++++++++++ integrations/openclaw/test/live-smoke.test.ts | 17 +++ 13 files changed, 475 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1a867aaf..5b8ef7b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1253,6 +1253,7 @@ dependencies = [ "http", "http-body-util", "nemo-flow", + "nemo-flow-adaptive", "reqwest", "rustls", "serde", diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 4316643a..0b6c9e5d 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -22,6 +22,7 @@ workspace = true [dependencies] nemo-flow = { workspace = true, features = ["openinference"] } +nemo-flow-adaptive = { workspace = true, features = ["redis-backend"] } async-stream = "0.3" axum = "0.8" bytes = "1" diff --git a/crates/cli/src/gateway.rs b/crates/cli/src/gateway.rs index b03ade05..d2eb4600 100644 --- a/crates/cli/src/gateway.rs +++ b/crates/cli/src/gateway.rs @@ -6,7 +6,7 @@ use std::sync::{Arc, Mutex}; use async_stream::stream; use axum::body::{Body, Bytes}; use axum::extract::State; -use axum::http::{HeaderMap, HeaderName, Method, Request, Response, StatusCode}; +use axum::http::{HeaderMap, HeaderName, HeaderValue, Method, Request, Response, StatusCode}; use futures_util::StreamExt; use nemo_flow::api::llm::{ LlmCallExecuteParams, LlmRequest, LlmStreamCallExecuteParams, llm_call_execute, @@ -297,7 +297,7 @@ fn build_buffered_func( let body_bytes = prepared.body_bytes.clone(); let headers = prepared.headers.clone(); let route = prepared.provider; - Arc::new(move |_request| { + Arc::new(move |request| { let http = http.clone(); let method = method.clone(); let url = url.clone(); @@ -307,17 +307,24 @@ fn build_buffered_func( let upstream_error = upstream_error.clone(); let response_bytes = response_bytes.clone(); Box::pin(async move { - let response = - match forward_upstream_request(&http, &method, &url, &body_bytes, &headers, route) - .await - { - Ok(response) => response, - Err(error) => { - let message = error.to_string(); - *upstream_error.lock().expect("upstream error lock poisoned") = Some(error); - return Err(FlowError::Internal(message)); - } - }; + let response = match forward_upstream_request( + &http, + &method, + &url, + &body_bytes, + &headers, + Some(&request), + route, + ) + .await + { + Ok(response) => response, + Err(error) => { + let message = error.to_string(); + *upstream_error.lock().expect("upstream error lock poisoned") = Some(error); + return Err(FlowError::Internal(message)); + } + }; let status = response.status(); let response_headers = response_headers(response.headers()); let bytes = match response.bytes().await { @@ -431,7 +438,7 @@ fn build_streaming_func( let body_bytes = prepared.body_bytes.clone(); let headers = prepared.headers.clone(); let route = prepared.provider; - Arc::new(move |_request| { + Arc::new(move |request| { let http = http.clone(); let method = method.clone(); let url = url.clone(); @@ -440,17 +447,24 @@ fn build_streaming_func( let upstream_info = upstream_info.clone(); let upstream_error = upstream_error.clone(); Box::pin(async move { - let response = - match forward_upstream_request(&http, &method, &url, &body_bytes, &headers, route) - .await - { - Ok(response) => response, - Err(error) => { - let message = error.to_string(); - *upstream_error.lock().expect("upstream error lock poisoned") = Some(error); - return Err(FlowError::Internal(message)); - } - }; + let response = match forward_upstream_request( + &http, + &method, + &url, + &body_bytes, + &headers, + Some(&request), + route, + ) + .await + { + Ok(response) => response, + Err(error) => { + let message = error.to_string(); + *upstream_error.lock().expect("upstream error lock poisoned") = Some(error); + return Err(FlowError::Internal(message)); + } + }; let status = response.status(); let response_headers = response_headers(response.headers()); *upstream_info.lock().expect("upstream info lock poisoned") = @@ -554,8 +568,10 @@ async fn forward_upstream_request( url: &str, body_bytes: &Bytes, headers: &HeaderMap, + effective_request: Option<&LlmRequest>, route: ProviderRoute, ) -> Result { + let (body_bytes, headers) = effective_upstream_request(body_bytes, headers, effective_request); // Only strip the inbound JWT when we actually have a replacement key to inject. Without one // the upstream just receives no auth and 401s, which is no better than letting it reject the // JWT itself — and stripping silently can break setups that point the gateway at an upstream @@ -566,7 +582,7 @@ async fn forward_upstream_request( .ok() .filter(|v| !v.trim().is_empty()) .is_some(); - let sanitized = strip_chatgpt_oauth_for_openai_route(headers, route, has_openai_env); + let sanitized = strip_chatgpt_oauth_for_openai_route(&headers, route, has_openai_env); let mut upstream = http.request(method.clone(), url).body(body_bytes.clone()); for (name, value) in &sanitized { if should_forward_request_header(name) { @@ -577,6 +593,39 @@ async fn forward_upstream_request( upstream.send().await } +fn effective_upstream_request( + body_bytes: &Bytes, + headers: &HeaderMap, + effective_request: Option<&LlmRequest>, +) -> (Bytes, HeaderMap) { + let Some(request) = effective_request else { + return (body_bytes.clone(), headers.clone()); + }; + + let body_bytes = serde_json::to_vec(&request.content) + .map(Bytes::from) + .unwrap_or_else(|_| body_bytes.clone()); + let mut headers = headers.clone(); + for (name, value) in &request.headers { + let Ok(name) = HeaderName::from_bytes(name.as_bytes()) else { + continue; + }; + let Some(value) = json_header_value(value) else { + continue; + }; + headers.insert(name, value); + } + (body_bytes, headers) +} + +fn json_header_value(value: &Value) -> Option { + let rendered = match value { + Value::String(value) => value.clone(), + value => serde_json::to_string(value).ok()?, + }; + HeaderValue::from_str(&rendered).ok() +} + // Builds the upstream URL for the ChatGPT backend. OpenAI API bases commonly include `/v1`, while // the ChatGPT backend base is // `chatgpt.com/backend-api/codex` (no `/v1`). Both append `/responses` to their base, so the @@ -718,6 +767,7 @@ async fn passthrough_streaming( &prepared.upstream_url, &prepared.body_bytes, &prepared.headers, + None, prepared.provider, ) .await?; diff --git a/crates/cli/src/plugins/config_io.rs b/crates/cli/src/plugins/config_io.rs index ad31143a..a759dde7 100644 --- a/crates/cli/src/plugins/config_io.rs +++ b/crates/cli/src/plugins/config_io.rs @@ -7,6 +7,7 @@ use std::path::{Path, PathBuf}; use console::style; use nemo_flow::plugin::{ConfigPolicy, PluginConfig, validate_plugin_config}; +use nemo_flow_adaptive::plugin_component::register_adaptive_component; use serde_json::{Map, Value}; use crate::config::{ @@ -115,6 +116,9 @@ pub(super) fn print_preview(config: &PluginConfig) -> Result<(), CliError> { } pub(super) fn validate_config(config: &PluginConfig) -> Result<(), CliError> { + register_adaptive_component().map_err(|error| { + CliError::Config(format!("adaptive plugin registration failed: {error}")) + })?; let report = validate_plugin_config(config); if report.has_errors() { let messages = report diff --git a/crates/cli/src/server.rs b/crates/cli/src/server.rs index 9e937152..b942ba18 100644 --- a/crates/cli/src/server.rs +++ b/crates/cli/src/server.rs @@ -8,6 +8,7 @@ use axum::http::HeaderMap; use axum::routing::{get, post}; use axum::{Json, Router}; use nemo_flow::plugin::{PluginConfig, clear_plugin_configuration, initialize_plugins}; +use nemo_flow_adaptive::plugin_component::register_adaptive_component; use reqwest::Client; use serde_json::Value; use tokio::net::TcpListener; @@ -152,6 +153,9 @@ impl PluginActivation { let Some(config) = config else { return Ok(Self { active: false }); }; + register_adaptive_component().map_err(|error| { + CliError::Config(format!("adaptive plugin registration failed: {error}")) + })?; let plugin_config: PluginConfig = serde_json::from_value(config) .map_err(|error| CliError::Config(format!("invalid plugin config: {error}")))?; initialize_plugins(plugin_config) diff --git a/crates/cli/tests/coverage/gateway_tests.rs b/crates/cli/tests/coverage/gateway_tests.rs index 6f46c382..b1c3651e 100644 --- a/crates/cli/tests/coverage/gateway_tests.rs +++ b/crates/cli/tests/coverage/gateway_tests.rs @@ -7,7 +7,7 @@ use crate::server::AppState; use crate::session::SessionManager; use axum::body::Body; use axum::extract::State; -use axum::http::{HeaderMap, HeaderValue, Method, Request, StatusCode}; +use axum::http::{HeaderMap, HeaderValue, Method, Request, StatusCode, header}; use http_body_util::BodyExt; use reqwest::Client; @@ -140,6 +140,87 @@ fn openai_upstream_url_accepts_origin_or_v1_base() { ); } +#[test] +fn effective_upstream_request_overlays_runtime_body_and_headers() { + let original_body = Bytes::from_static(br#"{"model":"original"}"#); + let mut original_headers = HeaderMap::new(); + original_headers.insert( + header::AUTHORIZATION, + HeaderValue::from_static("Bearer original"), + ); + let request = LlmRequest { + headers: Map::from_iter([ + ("x-runtime".to_string(), json!("enabled")), + ("x-runtime-json".to_string(), json!({ "enabled": true })), + ]), + content: json!({ + "model": "rewritten", + "nvext": { "agent_hints": { "priority": 1 } } + }), + }; + + let (body, headers) = + effective_upstream_request(&original_body, &original_headers, Some(&request)); + let body: Value = serde_json::from_slice(&body).unwrap(); + + assert_eq!(body["model"], json!("rewritten")); + assert_eq!(body["nvext"]["agent_hints"]["priority"], json!(1)); + assert_eq!( + headers.get(header::AUTHORIZATION).unwrap(), + "Bearer original" + ); + assert_eq!(headers.get("x-runtime").unwrap(), "enabled"); + assert_eq!( + headers.get("x-runtime-json").unwrap(), + r#"{"enabled":true}"# + ); +} + +#[test] +fn effective_upstream_request_returns_original_without_runtime_request() { + let original_body = Bytes::from_static(br#"{"model":"original"}"#); + let mut original_headers = HeaderMap::new(); + original_headers.insert( + header::AUTHORIZATION, + HeaderValue::from_static("Bearer original"), + ); + original_headers.insert("x-request-id", HeaderValue::from_static("request-1")); + + let (body, headers) = effective_upstream_request(&original_body, &original_headers, None); + + assert_eq!(body, original_body); + assert_eq!( + headers.get(header::AUTHORIZATION).unwrap(), + "Bearer original" + ); + assert_eq!(headers.get("x-request-id").unwrap(), "request-1"); +} + +#[test] +fn effective_upstream_request_skips_invalid_runtime_headers() { + let original_body = Bytes::from_static(br#"{"model":"original"}"#); + let mut original_headers = HeaderMap::new(); + original_headers.insert("x-original", HeaderValue::from_static("kept")); + let request = LlmRequest { + headers: Map::from_iter([ + ("bad header".to_string(), json!("skip")), + ("x-invalid-value".to_string(), json!("line\nbreak")), + ("x-good".to_string(), json!("ok")), + ]), + content: json!({ "model": "rewritten" }), + }; + + let (body, headers) = + effective_upstream_request(&original_body, &original_headers, Some(&request)); + let body: Value = serde_json::from_slice(&body).unwrap(); + + assert_eq!(body["model"], json!("rewritten")); + assert_eq!(headers.get("x-original").unwrap(), "kept"); + assert_eq!(headers.get("x-good").unwrap(), "ok"); + assert!(headers.get("x-invalid-value").is_none()); + assert!(headers.keys().all(|name| name.as_str() != "bad header")); +} + #[test] fn gateway_session_id_prefers_headers_and_has_fallbacks() { let mut headers = HeaderMap::new(); diff --git a/crates/cli/tests/coverage/plugins_tests.rs b/crates/cli/tests/coverage/plugins_tests.rs index 674fcd4e..6ada096e 100644 --- a/crates/cli/tests/coverage/plugins_tests.rs +++ b/crates/cli/tests/coverage/plugins_tests.rs @@ -5,6 +5,32 @@ use super::*; use crate::config::{global_plugin_config_path, project_plugin_config_path}; use nemo_flow::observability::plugin_component::OBSERVABILITY_PLUGIN_KIND; use nemo_flow::plugin::{ConfigPolicy, PluginComponentSpec, PluginConfig}; +use nemo_flow_adaptive::plugin_component::ADAPTIVE_PLUGIN_KIND; + +fn adaptive_component_config(agent_id: &str) -> serde_json::Map { + json!({ + "version": 1, + "agent_id": agent_id, + "state": { + "backend": { + "kind": "in_memory", + "config": {} + } + }, + "telemetry": { + "learners": ["tool_parallelism"] + }, + "adaptive_hints": { + "priority": 100, + "break_chain": false, + "inject_header": true, + "inject_body_path": "nvext.agent_hints" + } + }) + .as_object() + .unwrap() + .clone() +} #[test] fn target_scope_defaults_to_user_and_rejects_conflicts() { @@ -262,15 +288,39 @@ fn write_plugin_config_prunes_defaults_and_round_trips() { let path = temp.path().join("plugins.toml"); let mut config = PluginConfig::default(); ensure_observability_component(&mut config).unwrap(); + config.components.push(PluginComponentSpec { + kind: ADAPTIVE_PLUGIN_KIND.to_string(), + enabled: true, + config: adaptive_component_config("cli-roundtrip"), + }); write_plugin_config(&path, &config).unwrap(); let rendered = std::fs::read_to_string(&path).unwrap(); assert!(rendered.contains("kind = \"observability\"")); + assert!(rendered.contains("kind = \"adaptive\"")); assert!(!rendered.contains("enabled = true")); let round_tripped = read_plugin_config(&path).unwrap(); - assert_eq!(round_tripped.components.len(), 1); + assert_eq!(round_tripped.components.len(), 2); assert_eq!(round_tripped.components[0].kind, OBSERVABILITY_PLUGIN_KIND); + let adaptive = round_tripped + .components + .iter() + .find(|component| component.kind == ADAPTIVE_PLUGIN_KIND) + .unwrap(); + assert_eq!( + adaptive.config.get("agent_id"), + Some(&json!("cli-roundtrip")) + ); + let adaptive_hints = adaptive + .config + .get("adaptive_hints") + .and_then(Value::as_object) + .unwrap(); + assert_eq!( + adaptive_hints.get("inject_body_path"), + Some(&json!("nvext.agent_hints")) + ); } #[test] @@ -322,6 +372,20 @@ fn validate_config_reports_plugin_diagnostics() { assert!(error.contains("ATOF mode"), "error was: {error}"); } +#[test] +fn validate_config_accepts_adaptive_component() { + let config = PluginConfig { + components: vec![PluginComponentSpec { + kind: ADAPTIVE_PLUGIN_KIND.to_string(), + enabled: true, + config: adaptive_component_config("cli-validation"), + }], + ..PluginConfig::default() + }; + + validate_config(&config).unwrap(); +} + #[test] fn display_helpers_render_scalars_json_and_defaults() { assert_eq!(display_value(&json!("logs")), "logs"); diff --git a/crates/cli/tests/coverage/server_tests.rs b/crates/cli/tests/coverage/server_tests.rs index a2a8c142..3fd4a57c 100644 --- a/crates/cli/tests/coverage/server_tests.rs +++ b/crates/cli/tests/coverage/server_tests.rs @@ -358,6 +358,48 @@ async fn serve_listener_activates_any_registered_plugin_kind() { let _ = deregister_plugin(GENERIC_TEST_PLUGIN_KIND); } +#[tokio::test] +async fn serve_listener_activates_adaptive_plugin_config() { + let _guard = PLUGIN_TEST_LOCK.lock().await; + let _ = nemo_flow::plugin::clear_plugin_configuration(); + + let mut config = test_config(); + config.plugin_config = Some(json!({ + "version": 1, + "components": [ + { + "kind": "adaptive", + "enabled": true, + "config": { + "version": 1, + "agent_id": "cli-test", + "state": { + "backend": { + "kind": "in_memory", + "config": {} + } + } + } + } + ] + })); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let address = listener.local_addr().unwrap(); + let url = format!("http://{address}"); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let handle = + tokio::spawn(async move { serve_listener(listener, config, Some(shutdown_rx)).await }); + + wait_for_gateway(&url).await; + let report = nemo_flow::plugin::active_plugin_report().unwrap(); + assert!(report.diagnostics.is_empty()); + + shutdown_tx.send(()).unwrap(); + handle.await.unwrap().unwrap(); + assert!(nemo_flow::plugin::active_plugin_report().is_none()); +} + #[tokio::test] async fn serve_listener_rejects_invalid_plugin_config() { let _guard = PLUGIN_TEST_LOCK.lock().await; @@ -751,9 +793,13 @@ async fn spawn_upstream(streaming: bool) -> TestServer { let payload: Value = serde_json::from_slice(&body).unwrap(); Json(json!({ "model": payload["model"], + "input": payload["input"], "authorization": headers .get(header::AUTHORIZATION) .and_then(|value| value.to_str().ok()), + "x_test_intercept": headers + .get("x-test-intercept") + .and_then(|value| value.to_str().ok()), "connection": headers .get(header::CONNECTION) .and_then(|value| value.to_str().ok()) diff --git a/docs/integrations/openclaw-plugin.md b/docs/integrations/openclaw-plugin.md index 2548ba42..32de9a48 100644 --- a/docs/integrations/openclaw-plugin.md +++ b/docs/integrations/openclaw-plugin.md @@ -9,13 +9,14 @@ Use the OpenClaw plugin when OpenClaw owns the agent, tool, and LLM lifecycle that needs NeMo Flow observability. The plugin observes supported OpenClaw plugin hooks and converts them into NeMo Flow sessions, LLM spans, tool spans, and marks that the generic NeMo Flow observability component can export as -Agent Trajectory Interchange Format (ATIF) JSON, OpenTelemetry spans, and -OpenInference/Phoenix spans. +Agent Trajectory Interchange Format (ATIF) JSON, OpenTelemetry spans, +OpenInference/Phoenix spans, and adaptive telemetry inputs. -This public OpenClaw plugin provides observability support only. It does not -add NeMo Flow security middleware or adaptive optimization behavior to OpenClaw -execution. For middleware-backed behavior, use the patch-based OpenClaw -integration from the NeMo Flow repository. +This public OpenClaw plugin uses OpenClaw public hooks. It can initialize +generic NeMo Flow plugin components such as `observability` and `adaptive`, but +hook-backed mode does not rewrite OpenClaw tool execution, provider routing, or +model requests. For middleware-backed behavior that changes execution, use the +patch-based OpenClaw integration from the NeMo Flow repository. Use this guide to install the plugin, enable it in OpenClaw, configure telemetry outputs, verify exported traces, and understand current LLM replay fidelity. @@ -103,6 +104,23 @@ access, and place the OpenClaw plugin configuration under "service_name": "openclaw-nemo-flow" } } + }, + { + "kind": "adaptive", + "enabled": true, + "config": { + "version": 1, + "agent_id": "openclaw", + "state": { + "backend": { + "kind": "in_memory", + "config": {} + } + }, + "telemetry": { + "learners": ["tool_parallelism"] + } + } } ] }, @@ -138,7 +156,8 @@ do not use, or set their `enabled` fields to `false`. - `config.enabled` disables or enables the NeMo Flow OpenClaw wrapper without removing the plugin entry. `config.backend` currently supports only `hooks`. - `config.plugins` is the generic NeMo Flow plugin configuration document. Use - this object to configure built-in components such as `observability`. + this object to configure built-in components such as `observability` and + `adaptive`. - `config.plugins.components[].config.atif` writes ATIF trajectory JSON files. Set `output_directory` to the directory where OpenClaw should write files. - `config.plugins.components[].config.opentelemetry` sends generic OTLP spans to @@ -146,6 +165,10 @@ do not use, or set their `enabled` fields to `false`. - `config.plugins.components[].config.openinference` sends OpenInference OTLP spans to Phoenix or another OpenInference-compatible collector when `enabled` is `true`. +- `config.plugins.components[]` entries with `kind: "adaptive"` initialize the + Adaptive plugin. In hook-backed OpenClaw mode, adaptive telemetry can consume + replayed NeMo Flow events, while request-rewrite features such as adaptive + hints require a managed execution path. - `config.capture` controls prompt, response, tool argument, and tool result capture. Tool arguments and tool results are stripped by default because they often contain user data, local paths, tokens, or large payloads. @@ -223,11 +246,11 @@ reason when present. ## Runtime Mapping -The plugin maps supported OpenClaw hook events into NeMo Flow telemetry without -changing OpenClaw execution behavior. +The plugin maps supported OpenClaw hook events into NeMo Flow telemetry and +adaptive inputs without changing OpenClaw execution behavior. It does not change OpenClaw tool execution, provider routing, policy decisions, -or adaptive behavior. +or provider request payloads. | OpenClaw hook | NeMo Flow behavior | | --- | --- | diff --git a/integrations/openclaw/README.md b/integrations/openclaw/README.md index 7ca9166c..f31ebe9e 100644 --- a/integrations/openclaw/README.md +++ b/integrations/openclaw/README.md @@ -9,12 +9,13 @@ SPDX-License-Identifier: Apache-2.0 OpenClaw. It converts supported OpenClaw hook events into NeMo Flow sessions, LLM spans, tool spans, and lifecycle marks that the generic NeMo Flow observability component can export as ATIF JSON, OpenTelemetry spans, and -OpenInference/Phoenix spans. +OpenInference/Phoenix spans. The same generic plugin config path can initialize +Adaptive components for hook-backed telemetry learning. -This public OpenClaw plugin package provides observability support only. It -does not add NeMo Flow security middleware or adaptive optimization behavior to -OpenClaw execution. For middleware-backed behavior, use the patch-based -OpenClaw integration from the NeMo Flow repository. +This public OpenClaw plugin package uses OpenClaw public hooks. It does not +rewrite OpenClaw tool execution, provider routing, policy decisions, or model +requests. For middleware-backed behavior that changes execution, use the +patch-based OpenClaw integration from the NeMo Flow repository. ## Why Use It? @@ -30,6 +31,7 @@ OpenClaw integration from the NeMo Flow repository. - OpenClaw plugin ID `nemo-flow`. - Generic NeMo Flow plugin initialization through `config.plugins`. - ATIF JSON export through the built-in `observability` component. +- Adaptive plugin initialization through `config.plugins`. - Optional OpenTelemetry OTLP export. - Optional OpenInference/Phoenix OTLP export. - Bounded LLM replay correlation across supported OpenClaw hooks. @@ -98,6 +100,23 @@ OpenClaw plugin configuration under `plugins.entries["nemo-flow"].config`: "service_name": "openclaw-nemo-flow" } } + }, + { + "kind": "adaptive", + "enabled": true, + "config": { + "version": 1, + "agent_id": "openclaw", + "state": { + "backend": { + "kind": "in_memory", + "config": {} + } + }, + "telemetry": { + "learners": ["tool_parallelism"] + } + } } ] }, @@ -132,7 +151,8 @@ do not use, or set their `enabled` fields to `false`. - `config.enabled` disables or enables the NeMo Flow OpenClaw wrapper without removing the plugin entry. `config.backend` currently supports only `hooks`. - `config.plugins` is the generic NeMo Flow plugin configuration document. Use - this object to configure built-in components such as `observability`. + this object to configure built-in components such as `observability` and + `adaptive`. - `config.plugins.components[].config.atif` writes ATIF trajectory JSON files. Set `output_directory` to the directory where OpenClaw should write files. - `config.plugins.components[].config.opentelemetry` sends generic OTLP spans to @@ -140,6 +160,10 @@ do not use, or set their `enabled` fields to `false`. - `config.plugins.components[].config.openinference` sends OpenInference OTLP spans to Phoenix or another OpenInference-compatible collector when `enabled` is `true`. +- `config.plugins.components[]` entries with `kind: "adaptive"` initialize the + Adaptive plugin. In hook-backed OpenClaw mode, adaptive telemetry can consume + replayed NeMo Flow events, while request-rewrite features such as adaptive + hints require a managed execution path. - `config.capture` controls prompt, response, tool argument, and tool result capture. Tool arguments and tool results are stripped by default because they often contain user data, local paths, tokens, or large payloads. @@ -184,7 +208,7 @@ The plugin maps supported OpenClaw hook events into NeMo Flow telemetry without changing OpenClaw execution behavior. It does not change OpenClaw tool execution, provider routing, policy decisions, -or adaptive behavior. +or provider request payloads. Current OpenClaw public hooks expose request, response, message-write, and provider timing details through separate event streams. The plugin correlates diff --git a/integrations/openclaw/src/modules.ts b/integrations/openclaw/src/modules.ts index e9a1170a..5d5c2c59 100644 --- a/integrations/openclaw/src/modules.ts +++ b/integrations/openclaw/src/modules.ts @@ -8,6 +8,7 @@ * when the native binding is unavailable, then degrade only at runtime start. */ import type * as NemoFlowRuntime from "nemo-flow-node"; +import type * as NemoFlowAdaptive from "nemo-flow-node/adaptive"; import type * as NemoFlowPluginHost from "nemo-flow-node/plugin"; type NemoFlowRuntimeKeys = @@ -24,6 +25,7 @@ type NemoFlowRuntimeKeys = | "toolCallEnd"; type NemoFlowPluginHostKeys = "defaultConfig" | "validate" | "initialize" | "clear"; +type NemoFlowAdaptiveKeys = "ADAPTIVE_PLUGIN_KIND" | "ComponentSpec"; export type ConfigDiagnostic = NemoFlowPluginHost.ConfigDiagnostic; export type ConfigReport = NemoFlowPluginHost.ConfigReport; @@ -44,22 +46,31 @@ export type NemoFlowRuntimeModule = Omit; +/** + * @internal Adaptive helper subset loaded so the package verifies the built-in + * adaptive plugin path is available alongside the generic plugin host. + */ +export type NemoFlowAdaptiveModule = Pick; + export type NemoFlowModules = { nf: NemoFlowRuntimeModule; pluginHost: NemoFlowPluginHostModule; + adaptive: NemoFlowAdaptiveModule; }; export type NemoFlowModuleLoader = () => Promise; /** Load the runtime and plugin-host modules used by the OpenClaw integration. */ export const defaultNemoFlowModuleLoader: NemoFlowModuleLoader = async () => { - const [nf, pluginHost] = await Promise.all([ + const [nf, pluginHost, adaptive] = await Promise.all([ import("nemo-flow-node"), import("nemo-flow-node/plugin"), + import("nemo-flow-node/adaptive"), ]); return { nf: nf as NemoFlowRuntimeModule, pluginHost: pluginHost as NemoFlowPluginHostModule, + adaptive: adaptive as NemoFlowAdaptiveModule, }; }; diff --git a/integrations/openclaw/test/config.test.ts b/integrations/openclaw/test/config.test.ts index 1817f108..75cdd997 100644 --- a/integrations/openclaw/test/config.test.ts +++ b/integrations/openclaw/test/config.test.ts @@ -69,6 +69,41 @@ describe("nemo-flow OpenClaw plugin shell", () => { assert.deepEqual(config.plugins, pluginConfig); }); + it("keeps adaptive components in the generic plugin config path", () => { + const pluginConfig = { + version: 1, + components: [ + { + kind: "adaptive", + enabled: true, + config: { + version: 1, + agent_id: "openclaw", + state: { + backend: { + kind: "in_memory", + config: {}, + }, + }, + telemetry: { + learners: ["tool_parallelism"], + }, + adaptive_hints: { + priority: 100, + break_chain: false, + inject_header: true, + inject_body_path: "nvext.agent_hints", + }, + }, + }, + ], + }; + + const config = parseConfig({ plugins: pluginConfig }); + + assert.deepEqual(config.plugins, pluginConfig); + }); + it("rejects unsupported backends and invalid correlation values", () => { assert.throws( () => parseConfig({ backend: "managed_execution" }), @@ -209,6 +244,23 @@ describe("nemo-flow OpenClaw plugin shell", () => { openinference: { enabled: true, endpoint: "http://phoenix.example" }, }, }, + { + kind: "adaptive", + enabled: true, + config: { + version: 1, + agent_id: "openclaw", + state: { + backend: { + kind: "in_memory", + config: {}, + }, + }, + telemetry: { + learners: ["tool_parallelism"], + }, + }, + }, ], }; const modules = createModules(); @@ -234,6 +286,46 @@ describe("nemo-flow OpenClaw plugin shell", () => { } }); + it("passes adaptive helper components through plugin host initialization", async () => { + const modules = createModules(); + const configuredPlugins = { + version: 1, + components: [ + modules.adaptive.ComponentSpec({ + version: 1, + agent_id: "openclaw-helper", + state: { + backend: { + kind: "in_memory", + config: {}, + }, + }, + adaptive_hints: { + priority: 100, + break_chain: false, + inject_header: true, + inject_body_path: "nvext.agent_hints", + }, + }), + ], + }; + const api = createApi({ pluginConfig: { plugins: configuredPlugins } }); + + registerPlugin(api, async () => modules); + const service = api.calls.services[0]; + assert.ok(service); + try { + await service.start({ stateDir: "/tmp/openclaw-state", config: {} as never, logger: api.logger }); + + assert.deepEqual(modules.pluginHost.calls.validate, [configuredPlugins]); + assert.deepEqual(modules.pluginHost.calls.initialize, [configuredPlugins]); + assert.equal(configuredPlugins.components[0]?.kind, modules.adaptive.ADAPTIVE_PLUGIN_KIND); + assert.equal(configuredPlugins.components[0]?.enabled, true); + } finally { + await service.stop?.({ stateDir: "/tmp/openclaw-state", config: {} as never, logger: api.logger }); + } + }); + it("continues hook-backed telemetry when plugin host validation fails", async () => { const modules = createModules({ validateDiagnostics: [{ level: "error", code: "bad_config", message: "invalid" }], @@ -658,6 +750,17 @@ function createModules(params: { const calls: TestPluginHost["calls"] = { validate: [], initialize: [], clear: 0 }; return { nf, + adaptive: { + ADAPTIVE_PLUGIN_KIND: "adaptive", + ComponentSpec: ( + config: Parameters[0], + options?: Parameters[1], + ) => ({ + kind: "adaptive", + enabled: options?.enabled ?? true, + config, + }), + }, pluginHost: { calls, defaultConfig: () => ({ version: 1, components: [] }), diff --git a/integrations/openclaw/test/live-smoke.test.ts b/integrations/openclaw/test/live-smoke.test.ts index 0cfbc0e8..0f343285 100644 --- a/integrations/openclaw/test/live-smoke.test.ts +++ b/integrations/openclaw/test/live-smoke.test.ts @@ -45,6 +45,23 @@ it( }, }, }, + { + kind: "adaptive", + enabled: true, + config: { + version: 1, + agent_id: "openclaw-live", + state: { + backend: { + kind: "in_memory", + config: {}, + }, + }, + telemetry: { + learners: ["tool_parallelism"], + }, + }, + }, ], }, }, From 850f4aec27e166c980546d80ebccced739176a0e Mon Sep 17 00:00:00 2001 From: Will Killian Date: Tue, 19 May 2026 20:43:17 -0400 Subject: [PATCH 2/3] Address coderabbit feedback Signed-off-by: Will Killian --- crates/cli/src/gateway.rs | 10 +++++++--- crates/cli/tests/coverage/gateway_tests.rs | 18 +++++++++++++++++ integrations/openclaw/test/config.test.ts | 23 +++++++++++----------- 3 files changed, 37 insertions(+), 14 deletions(-) diff --git a/crates/cli/src/gateway.rs b/crates/cli/src/gateway.rs index d2eb4600..4666e506 100644 --- a/crates/cli/src/gateway.rs +++ b/crates/cli/src/gateway.rs @@ -602,9 +602,13 @@ fn effective_upstream_request( return (body_bytes.clone(), headers.clone()); }; - let body_bytes = serde_json::to_vec(&request.content) - .map(Bytes::from) - .unwrap_or_else(|_| body_bytes.clone()); + let body_bytes = if request.content.is_null() { + body_bytes.clone() + } else { + serde_json::to_vec(&request.content) + .map(Bytes::from) + .unwrap_or_else(|_| body_bytes.clone()) + }; let mut headers = headers.clone(); for (name, value) in &request.headers { let Ok(name) = HeaderName::from_bytes(name.as_bytes()) else { diff --git a/crates/cli/tests/coverage/gateway_tests.rs b/crates/cli/tests/coverage/gateway_tests.rs index b1c3651e..19bd3b7a 100644 --- a/crates/cli/tests/coverage/gateway_tests.rs +++ b/crates/cli/tests/coverage/gateway_tests.rs @@ -196,6 +196,24 @@ fn effective_upstream_request_returns_original_without_runtime_request() { assert_eq!(headers.get("x-request-id").unwrap(), "request-1"); } +#[test] +fn effective_upstream_request_preserves_original_body_for_null_runtime_content() { + let original_body = Bytes::from_static(b"not-json-but-still-upstream-body"); + let mut original_headers = HeaderMap::new(); + original_headers.insert("x-original", HeaderValue::from_static("kept")); + let request = LlmRequest { + headers: Map::from_iter([("x-runtime".to_string(), json!("enabled"))]), + content: Value::Null, + }; + + let (body, headers) = + effective_upstream_request(&original_body, &original_headers, Some(&request)); + + assert_eq!(body, original_body); + assert_eq!(headers.get("x-original").unwrap(), "kept"); + assert_eq!(headers.get("x-runtime").unwrap(), "enabled"); +} + #[test] fn effective_upstream_request_skips_invalid_runtime_headers() { let original_body = Bytes::from_static(br#"{"model":"original"}"#); diff --git a/integrations/openclaw/test/config.test.ts b/integrations/openclaw/test/config.test.ts index 75cdd997..9d6682b1 100644 --- a/integrations/openclaw/test/config.test.ts +++ b/integrations/openclaw/test/config.test.ts @@ -748,19 +748,20 @@ function createModules(params: { } = {}): TestModules { const nf = createNemoFlowRuntime(); const calls: TestPluginHost["calls"] = { validate: [], initialize: [], clear: 0 }; + const adaptive: TestModules["adaptive"] = { + ADAPTIVE_PLUGIN_KIND: "adaptive", + ComponentSpec: ( + config: Parameters[0], + options?: Parameters[1], + ) => ({ + kind: adaptive.ADAPTIVE_PLUGIN_KIND, + enabled: options?.enabled ?? true, + config, + }), + }; return { nf, - adaptive: { - ADAPTIVE_PLUGIN_KIND: "adaptive", - ComponentSpec: ( - config: Parameters[0], - options?: Parameters[1], - ) => ({ - kind: "adaptive", - enabled: options?.enabled ?? true, - config, - }), - }, + adaptive, pluginHost: { calls, defaultConfig: () => ({ version: 1, components: [] }), From 827485305a0422f65dbb92077b5bdb545b69bcc3 Mon Sep 17 00:00:00 2001 From: Will Killian Date: Wed, 20 May 2026 15:36:20 -0400 Subject: [PATCH 3/3] fix: clarify gateway auth header handling Signed-off-by: Will Killian --- crates/cli/src/gateway.rs | 28 +++++++++++++--------- crates/cli/tests/coverage/gateway_tests.rs | 8 +++---- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/crates/cli/src/gateway.rs b/crates/cli/src/gateway.rs index 03f23656..1f347a72 100644 --- a/crates/cli/src/gateway.rs +++ b/crates/cli/src/gateway.rs @@ -675,7 +675,7 @@ async fn forward_upstream_request( route: ProviderRoute, ) -> Result { let (body_bytes, headers) = effective_upstream_request(body_bytes, headers, effective_request); - let sanitized = gateway_forward_headers(&headers, route); + let sanitized = strip_replaceable_agent_auth_headers(&headers, route); let mut upstream = http.request(method.clone(), url).body(body_bytes.clone()); for (name, value) in &sanitized { if should_forward_request_header(name) { @@ -698,9 +698,15 @@ fn effective_upstream_request( let body_bytes = if request.content.is_null() { body_bytes.clone() } else { - serde_json::to_vec(&request.content) - .map(Bytes::from) - .unwrap_or_else(|_| body_bytes.clone()) + match serde_json::to_vec(&request.content) { + Ok(serialized) => Bytes::from(serialized), + Err(error) => { + eprintln!( + "nemo-flow CLI gateway: failed to serialize rewritten LLM request body; forwarding original request: {error}" + ); + return (body_bytes.clone(), headers.clone()); + } + } }; let mut headers = headers.clone(); for (name, value) in &request.headers { @@ -849,7 +855,7 @@ pub(crate) async fn models( .unwrap_or(parts.uri.path()); let upstream_url = gateway_upstream_url_override(provider, &parts.headers, path_and_query) .unwrap_or_else(|| provider.upstream_url(&state.config, path_and_query)); - let sanitized = gateway_forward_headers(&parts.headers, provider); + let sanitized = strip_replaceable_agent_auth_headers(&parts.headers, provider); let mut upstream = state.http.get(upstream_url); for (name, value) in &sanitized { if should_forward_request_header(name) { @@ -985,18 +991,18 @@ fn gateway_upstream_url_override_with_openai_key_state( ) } -// Lets alignment adapters normalize agent-native credentials before the gateway injects standard -// provider API keys. Whitespace-only env vars are treated as missing because forwarding an empty -// bearer value only replaces one authentication failure with another. -fn gateway_forward_headers(headers: &HeaderMap, route: ProviderRoute) -> HeaderMap { - gateway_forward_headers_with_openai_key_state( +// Lets alignment adapters strip agent-native credentials only when the gateway can replace them +// with standard provider API keys. Whitespace-only env vars are treated as missing because +// forwarding an empty bearer value only replaces one authentication failure with another. +fn strip_replaceable_agent_auth_headers(headers: &HeaderMap, route: ProviderRoute) -> HeaderMap { + strip_replaceable_agent_auth_headers_with_openai_key_state( headers, route, env_var_is_nonempty("OPENAI_API_KEY"), ) } -fn gateway_forward_headers_with_openai_key_state( +fn strip_replaceable_agent_auth_headers_with_openai_key_state( headers: &HeaderMap, route: ProviderRoute, has_openai_replacement_key: bool, diff --git a/crates/cli/tests/coverage/gateway_tests.rs b/crates/cli/tests/coverage/gateway_tests.rs index 3c5e7cf0..7c0f75c9 100644 --- a/crates/cli/tests/coverage/gateway_tests.rs +++ b/crates/cli/tests/coverage/gateway_tests.rs @@ -443,7 +443,7 @@ fn strips_chatgpt_plus_jwt_from_openai_route_inbound() { "authorization", HeaderValue::from_static("Bearer eyJhbGciOiJIUzI1NiJ9.deadbeef.signature"), ); - let sanitized = gateway_forward_headers_with_openai_key_state( + let sanitized = strip_replaceable_agent_auth_headers_with_openai_key_state( &inbound, ProviderRoute::OpenAiResponses, true, @@ -460,7 +460,7 @@ fn preserves_real_bearer_keys_on_openai_route() { "authorization", HeaderValue::from_static("Bearer sk-real-provider-key"), ); - let sanitized = gateway_forward_headers_with_openai_key_state( + let sanitized = strip_replaceable_agent_auth_headers_with_openai_key_state( &inbound, ProviderRoute::OpenAiResponses, true, @@ -481,7 +481,7 @@ fn does_not_touch_anthropic_route_authorization() { "authorization", HeaderValue::from_static("Bearer eyJ.anthropic.case"), ); - let sanitized = gateway_forward_headers_with_openai_key_state( + let sanitized = strip_replaceable_agent_auth_headers_with_openai_key_state( &inbound, ProviderRoute::AnthropicMessages, true, @@ -499,7 +499,7 @@ fn preserves_jwt_when_no_replacement_key_available() { "authorization", HeaderValue::from_static("Bearer eyJhbGciOiJIUzI1NiJ9.deadbeef.signature"), ); - let sanitized = gateway_forward_headers_with_openai_key_state( + let sanitized = strip_replaceable_agent_auth_headers_with_openai_key_state( &inbound, ProviderRoute::OpenAiResponses, false,