From 9d8c05b80cfe5c5b05feab01dfe72bc9bfc0a04a Mon Sep 17 00:00:00 2001 From: Enrico Piovesan Date: Fri, 3 Jul 2026 11:31:02 -0600 Subject: [PATCH 1/3] test: integration tests for ThreadPoolExecutor through TraverseRuntime (spec 047) 8 tests covering: drop-in replacement, router routing, concurrent requests, trace isolation, capability_id matching, error propagation, WASM rejection. Closes #507 Co-Authored-By: Claude Sonnet 4.6 --- .../tests/thread_pool_integration.rs | 305 ++++++++++++++++++ 1 file changed, 305 insertions(+) create mode 100644 crates/traverse-runtime/tests/thread_pool_integration.rs diff --git a/crates/traverse-runtime/tests/thread_pool_integration.rs b/crates/traverse-runtime/tests/thread_pool_integration.rs new file mode 100644 index 00000000..9ad648d1 --- /dev/null +++ b/crates/traverse-runtime/tests/thread_pool_integration.rs @@ -0,0 +1,305 @@ +//! Integration tests: ThreadPoolExecutor through the TraverseRuntime stack. +//! +//! Governed by spec `047-thread-pool-executor`. + +use std::sync::{Arc, Mutex}; + +use serde_json::{Value, json}; +use traverse_contracts::{ + BinaryFormat, CapabilityContract, Condition, Entrypoint, EntrypointKind, Execution, + ExecutionConstraints, ExecutionTarget, FilesystemAccess, HostApiAccess, Lifecycle, + NetworkAccess, Owner, Provenance, ProvenanceSource, SchemaContainer, ServiceType, SideEffect, + SideEffectKind, +}; +use traverse_runtime::{ + events::{EventCatalog, InProcessBroker}, + executor::{ + ArtifactType, CapabilityExecutor, ExecutorCapability, ExecutorError, NativeExecutor, + ThreadPoolExecutor, ThreadPoolExecutorConfig, + }, + placement::{PlacementConstraintEvaluator, RuntimeSnapshot}, + router::{CapabilityExecutorRegistry, PlacementRouter, RouterRequest}, + trace::TraceStore, +}; + +const TEST_SPEC: &str = "047-thread-pool-executor@1.0.0"; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +fn echo_handler(input: &Value) -> Result { + Ok(input.clone()) +} + +fn error_handler(_input: &Value) -> Result { + Err("deliberate error".to_string()) +} + +fn pool_executor( + capacity: usize, + handler: impl Fn(&Value) -> Result + Send + Sync + 'static, +) -> ThreadPoolExecutor { + ThreadPoolExecutor::new( + ThreadPoolExecutorConfig { capacity }, + Box::new(NativeExecutor::new(handler)), + ) + .unwrap_or_else(|e| panic!("ThreadPoolExecutor construction failed: {e}")) +} + +fn test_contract() -> CapabilityContract { + CapabilityContract { + kind: "capability_contract".to_string(), + schema_version: "1.0.0".to_string(), + id: "pool.integration.subject".to_string(), + namespace: "pool.integration".to_string(), + name: "subject".to_string(), + version: "0.1.0".to_string(), + lifecycle: Lifecycle::Draft, + owner: Owner { + team: "traverse-core".to_string(), + contact: "test@example.com".to_string(), + }, + summary: "Thread pool integration test subject.".to_string(), + description: "Used only in thread pool integration tests.".to_string(), + inputs: SchemaContainer { schema: json!({ "type": "object" }) }, + outputs: SchemaContainer { schema: json!({ "type": "object" }) }, + preconditions: vec![Condition { + id: "always-met".to_string(), + description: "No preconditions.".to_string(), + }], + postconditions: vec![Condition { + id: "always-met".to_string(), + description: "No postconditions.".to_string(), + }], + side_effects: vec![SideEffect { + kind: SideEffectKind::MemoryOnly, + description: "No durable side effects.".to_string(), + }], + emits: Vec::new(), + consumes: Vec::new(), + permissions: Vec::new(), + execution: Execution { + binary_format: BinaryFormat::Wasm, + entrypoint: Entrypoint { + kind: EntrypointKind::WasiCommand, + command: "pool.integration.subject".to_string(), + }, + preferred_targets: vec![ExecutionTarget::Local], + constraints: ExecutionConstraints { + host_api_access: HostApiAccess::None, + network_access: NetworkAccess::Forbidden, + filesystem_access: FilesystemAccess::None, + }, + }, + policies: Vec::new(), + dependencies: Vec::new(), + provenance: Provenance { + source: ProvenanceSource::Greenfield, + author: "traverse-core".to_string(), + created_at: "2026-07-03T00:00:00Z".to_string(), + spec_ref: Some(TEST_SPEC.to_string()), + adr_refs: Vec::new(), + exception_refs: Vec::new(), + }, + evidence: Vec::new(), + service_type: ServiceType::Stateless, + permitted_targets: vec![ExecutionTarget::Local], + event_trigger: None, + connector_requirements: Vec::new(), + state_schema: None, + } +} + +fn executor_cap(artifact_type: ArtifactType) -> ExecutorCapability { + ExecutorCapability { + capability_id: "pool.integration.subject".to_string(), + artifact_type, + wasm_binary_path: None, + wasm_checksum: None, + host_abi_version: None, + } +} + +fn idle_snapshot() -> RuntimeSnapshot { + RuntimeSnapshot { + target_loads: [(ExecutionTarget::Local, 0.0)].into_iter().collect(), + } +} + +fn build_router( + executor: Box, +) -> (PlacementRouter, Arc>) { + let catalog = Arc::new(EventCatalog::new()); + let broker = Arc::new( + InProcessBroker::new(Arc::clone(&catalog)) + .unwrap_or_else(|e| panic!("broker init failed: {e}")), + ); + let trace_store = Arc::new(Mutex::new(TraceStore::new())); + let mut registry = CapabilityExecutorRegistry::new(); + registry.insert(ArtifactType::Native, executor); + let router = PlacementRouter::new( + PlacementConstraintEvaluator, + registry, + Arc::clone(&trace_store), + broker, + ); + (router, trace_store) +} + +fn make_request(input: Value) -> RouterRequest { + RouterRequest { + capability_id: "pool.integration.subject".to_string(), + artifact_type: ArtifactType::Native, + contract: test_contract(), + target_hint: Some(ExecutionTarget::Local), + runtime_snapshot: idle_snapshot(), + input, + executor_capability: executor_cap(ArtifactType::Native), + emitted_events: Vec::new(), + } +} + +// --------------------------------------------------------------------------- +// Drop-in replacement +// --------------------------------------------------------------------------- + +#[test] +fn native_executor_and_thread_pool_produce_identical_output() { + let input = json!({ "key": "value" }); + let cap = executor_cap(ArtifactType::Native); + + let native = NativeExecutor::new(echo_handler); + let native_result = native.execute(&cap, &input); + + let pool = pool_executor(2, echo_handler); + let pool_result = pool.execute(&cap, &input); + + assert_eq!(native_result.ok(), pool_result.ok()); +} + +#[test] +fn thread_pool_executor_satisfies_capability_executor_trait_object() { + let executor: Box = Box::new(pool_executor(2, echo_handler)); + let cap = executor_cap(ArtifactType::Native); + let result = executor.execute(&cap, &json!({})); + assert!(result.is_ok(), "trait object execute failed: {result:?}"); +} + +// --------------------------------------------------------------------------- +// Router integration +// --------------------------------------------------------------------------- + +#[test] +fn router_routes_to_thread_pool_executor() { + let (router, _) = build_router(Box::new(pool_executor(2, echo_handler))); + let input = json!({ "x": 1 }); + let result = router.execute(make_request(input.clone())); + assert!(result.is_ok(), "router execute failed: {result:?}"); + assert_eq!(result.ok().map(|r| r.output), Some(input)); +} + +#[test] +fn router_concurrent_requests_to_same_capability() { + let (router, _) = build_router(Box::new(pool_executor(8, echo_handler))); + let router = Arc::new(router); + let errors: Arc>> = Arc::new(Mutex::new(vec![])); + + std::thread::scope(|s| { + for i in 0_u32..8 { + let router = Arc::clone(&router); + let errors = Arc::clone(&errors); + s.spawn(move || { + match router.execute(make_request(json!({ "i": i }))) { + Ok(resp) => { + if resp.output != json!({ "i": i }) { + errors + .lock() + .unwrap_or_else(|e| e.into_inner()) + .push(format!("thread {i}: wrong output {:?}", resp.output)); + } + } + Err(e) => { + errors + .lock() + .unwrap_or_else(|e2| e2.into_inner()) + .push(format!("thread {i}: router error {e:?}")); + } + } + }); + } + }); + + let errors = errors.lock().unwrap_or_else(|e| e.into_inner()); + assert!(errors.is_empty(), "concurrent router errors: {errors:?}"); +} + +// --------------------------------------------------------------------------- +// Trace correctness +// --------------------------------------------------------------------------- + +#[test] +fn concurrent_executions_produce_isolated_traces() { + let (router, trace_store) = build_router(Box::new(pool_executor(4, echo_handler))); + let router = Arc::new(router); + + std::thread::scope(|s| { + for i in 0_u32..4 { + let router = Arc::clone(&router); + s.spawn(move || { + let _ = router.execute(make_request(json!({ "i": i }))); + }); + } + }); + + let store = trace_store.lock().unwrap_or_else(|e| e.into_inner()); + let entries = store.list_public(None); + assert_eq!(entries.len(), 4, "expected 4 trace entries, got {}", entries.len()); + assert!( + entries.iter().all(|e| e.capability_id == "pool.integration.subject"), + "unexpected capability_id in traces" + ); +} + +#[test] +fn trace_capability_id_matches_executed_capability() { + let (router, trace_store) = build_router(Box::new(pool_executor(2, echo_handler))); + let _ = router.execute(make_request(json!({}))); + let store = trace_store.lock().unwrap_or_else(|e| e.into_inner()); + let entries = store.list_public(Some("pool.integration.subject")); + assert!(!entries.is_empty(), "no trace entries found"); + assert!( + entries.iter().all(|e| e.capability_id == "pool.integration.subject"), + "capability_id mismatch in trace" + ); +} + +#[test] +fn failed_execution_returns_router_error() { + // The router returns early with RouterError::ExecutionFailed before writing a trace. + // This test verifies the error propagates correctly through the pool dispatch path. + let (router, _) = build_router(Box::new(pool_executor(1, error_handler))); + let result = router.execute(make_request(json!({}))); + assert!(result.is_err(), "expected error from failing capability, got ok"); + let err_msg = format!("{:?}", result.err()); + assert!( + err_msg.contains("ExecutionFailed") || err_msg.contains("deliberate"), + "unexpected error shape: {err_msg}" + ); +} + +// --------------------------------------------------------------------------- +// WASM path unchanged +// --------------------------------------------------------------------------- + +#[test] +fn wasm_capability_type_rejected_by_thread_pool_executor() { + let pool = pool_executor(2, echo_handler); + let wasm_cap = executor_cap(ArtifactType::Wasm); + let result = pool.execute(&wasm_cap, &json!({})); + assert_eq!( + result, + Err(ExecutorError::UnsupportedArtifactType), + "expected UnsupportedArtifactType for Wasm artifact" + ); +} From 4527671a164e866d8f35059a4534fd325ad324ac Mon Sep 17 00:00:00 2001 From: Enrico Piovesan Date: Fri, 3 Jul 2026 12:06:40 -0600 Subject: [PATCH 2/3] test: satisfy rust checks for thread pool integration --- .../tests/thread_pool_integration.rs | 120 +++++++++++------- 1 file changed, 72 insertions(+), 48 deletions(-) diff --git a/crates/traverse-runtime/tests/thread_pool_integration.rs b/crates/traverse-runtime/tests/thread_pool_integration.rs index 9ad648d1..317753e8 100644 --- a/crates/traverse-runtime/tests/thread_pool_integration.rs +++ b/crates/traverse-runtime/tests/thread_pool_integration.rs @@ -1,8 +1,11 @@ -//! Integration tests: ThreadPoolExecutor through the TraverseRuntime stack. +//! Integration tests: `ThreadPoolExecutor` through the `TraverseRuntime` stack. //! //! Governed by spec `047-thread-pool-executor`. -use std::sync::{Arc, Mutex}; +use std::{ + error::Error, + sync::{Arc, Mutex, PoisonError}, +}; use serde_json::{Value, json}; use traverse_contracts::{ @@ -23,15 +26,13 @@ use traverse_runtime::{ }; const TEST_SPEC: &str = "047-thread-pool-executor@1.0.0"; +type SharedTraceStore = Arc>; +type TestResult = Result>; // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- -fn echo_handler(input: &Value) -> Result { - Ok(input.clone()) -} - fn error_handler(_input: &Value) -> Result { Err("deliberate error".to_string()) } @@ -39,12 +40,11 @@ fn error_handler(_input: &Value) -> Result { fn pool_executor( capacity: usize, handler: impl Fn(&Value) -> Result + Send + Sync + 'static, -) -> ThreadPoolExecutor { - ThreadPoolExecutor::new( +) -> TestResult { + Ok(ThreadPoolExecutor::new( ThreadPoolExecutorConfig { capacity }, Box::new(NativeExecutor::new(handler)), - ) - .unwrap_or_else(|e| panic!("ThreadPoolExecutor construction failed: {e}")) + )?) } fn test_contract() -> CapabilityContract { @@ -62,8 +62,12 @@ fn test_contract() -> CapabilityContract { }, summary: "Thread pool integration test subject.".to_string(), description: "Used only in thread pool integration tests.".to_string(), - inputs: SchemaContainer { schema: json!({ "type": "object" }) }, - outputs: SchemaContainer { schema: json!({ "type": "object" }) }, + inputs: SchemaContainer { + schema: json!({ "type": "object" }), + }, + outputs: SchemaContainer { + schema: json!({ "type": "object" }), + }, preconditions: vec![Condition { id: "always-met".to_string(), description: "No preconditions.".to_string(), @@ -129,12 +133,9 @@ fn idle_snapshot() -> RuntimeSnapshot { fn build_router( executor: Box, -) -> (PlacementRouter, Arc>) { +) -> TestResult<(PlacementRouter, SharedTraceStore)> { let catalog = Arc::new(EventCatalog::new()); - let broker = Arc::new( - InProcessBroker::new(Arc::clone(&catalog)) - .unwrap_or_else(|e| panic!("broker init failed: {e}")), - ); + let broker = Arc::new(InProcessBroker::new(Arc::clone(&catalog))?); let trace_store = Arc::new(Mutex::new(TraceStore::new())); let mut registry = CapabilityExecutorRegistry::new(); registry.insert(ArtifactType::Native, executor); @@ -144,7 +145,7 @@ fn build_router( Arc::clone(&trace_store), broker, ); - (router, trace_store) + Ok((router, trace_store)) } fn make_request(input: Value) -> RouterRequest { @@ -165,25 +166,28 @@ fn make_request(input: Value) -> RouterRequest { // --------------------------------------------------------------------------- #[test] -fn native_executor_and_thread_pool_produce_identical_output() { +fn native_executor_and_thread_pool_produce_identical_output() -> TestResult<()> { let input = json!({ "key": "value" }); let cap = executor_cap(ArtifactType::Native); - let native = NativeExecutor::new(echo_handler); + let native = NativeExecutor::new(|input| Ok(input.clone())); let native_result = native.execute(&cap, &input); - let pool = pool_executor(2, echo_handler); + let pool = pool_executor(2, |input| Ok(input.clone()))?; let pool_result = pool.execute(&cap, &input); assert_eq!(native_result.ok(), pool_result.ok()); + Ok(()) } #[test] -fn thread_pool_executor_satisfies_capability_executor_trait_object() { - let executor: Box = Box::new(pool_executor(2, echo_handler)); +fn thread_pool_executor_satisfies_capability_executor_trait_object() -> TestResult<()> { + let executor: Box = + Box::new(pool_executor(2, |input| Ok(input.clone()))?); let cap = executor_cap(ArtifactType::Native); let result = executor.execute(&cap, &json!({})); assert!(result.is_ok(), "trait object execute failed: {result:?}"); + Ok(()) } // --------------------------------------------------------------------------- @@ -191,17 +195,18 @@ fn thread_pool_executor_satisfies_capability_executor_trait_object() { // --------------------------------------------------------------------------- #[test] -fn router_routes_to_thread_pool_executor() { - let (router, _) = build_router(Box::new(pool_executor(2, echo_handler))); +fn router_routes_to_thread_pool_executor() -> TestResult<()> { + let (router, _) = build_router(Box::new(pool_executor(2, |input| Ok(input.clone()))?))?; let input = json!({ "x": 1 }); let result = router.execute(make_request(input.clone())); assert!(result.is_ok(), "router execute failed: {result:?}"); assert_eq!(result.ok().map(|r| r.output), Some(input)); + Ok(()) } #[test] -fn router_concurrent_requests_to_same_capability() { - let (router, _) = build_router(Box::new(pool_executor(8, echo_handler))); +fn router_concurrent_requests_to_same_capability() -> TestResult<()> { + let (router, _) = build_router(Box::new(pool_executor(8, |input| Ok(input.clone()))?))?; let router = Arc::new(router); let errors: Arc>> = Arc::new(Mutex::new(vec![])); @@ -209,29 +214,30 @@ fn router_concurrent_requests_to_same_capability() { for i in 0_u32..8 { let router = Arc::clone(&router); let errors = Arc::clone(&errors); - s.spawn(move || { - match router.execute(make_request(json!({ "i": i }))) { + s.spawn( + move || match router.execute(make_request(json!({ "i": i }))) { Ok(resp) => { if resp.output != json!({ "i": i }) { errors .lock() - .unwrap_or_else(|e| e.into_inner()) + .unwrap_or_else(PoisonError::into_inner) .push(format!("thread {i}: wrong output {:?}", resp.output)); } } Err(e) => { errors .lock() - .unwrap_or_else(|e2| e2.into_inner()) + .unwrap_or_else(PoisonError::into_inner) .push(format!("thread {i}: router error {e:?}")); } - } - }); + }, + ); } }); - let errors = errors.lock().unwrap_or_else(|e| e.into_inner()); + let errors = errors.lock().unwrap_or_else(PoisonError::into_inner); assert!(errors.is_empty(), "concurrent router errors: {errors:?}"); + Ok(()) } // --------------------------------------------------------------------------- @@ -239,8 +245,9 @@ fn router_concurrent_requests_to_same_capability() { // --------------------------------------------------------------------------- #[test] -fn concurrent_executions_produce_isolated_traces() { - let (router, trace_store) = build_router(Box::new(pool_executor(4, echo_handler))); +fn concurrent_executions_produce_isolated_traces() -> TestResult<()> { + let (router, trace_store) = + build_router(Box::new(pool_executor(4, |input| Ok(input.clone()))?))?; let router = Arc::new(router); std::thread::scope(|s| { @@ -252,40 +259,56 @@ fn concurrent_executions_produce_isolated_traces() { } }); - let store = trace_store.lock().unwrap_or_else(|e| e.into_inner()); + let store = trace_store.lock().unwrap_or_else(PoisonError::into_inner); let entries = store.list_public(None); - assert_eq!(entries.len(), 4, "expected 4 trace entries, got {}", entries.len()); + assert_eq!( + entries.len(), + 4, + "expected 4 trace entries, got {}", + entries.len() + ); assert!( - entries.iter().all(|e| e.capability_id == "pool.integration.subject"), + entries + .iter() + .all(|e| e.capability_id == "pool.integration.subject"), "unexpected capability_id in traces" ); + Ok(()) } #[test] -fn trace_capability_id_matches_executed_capability() { - let (router, trace_store) = build_router(Box::new(pool_executor(2, echo_handler))); +fn trace_capability_id_matches_executed_capability() -> TestResult<()> { + let (router, trace_store) = + build_router(Box::new(pool_executor(2, |input| Ok(input.clone()))?))?; let _ = router.execute(make_request(json!({}))); - let store = trace_store.lock().unwrap_or_else(|e| e.into_inner()); + let store = trace_store.lock().unwrap_or_else(PoisonError::into_inner); let entries = store.list_public(Some("pool.integration.subject")); assert!(!entries.is_empty(), "no trace entries found"); assert!( - entries.iter().all(|e| e.capability_id == "pool.integration.subject"), + entries + .iter() + .all(|e| e.capability_id == "pool.integration.subject"), "capability_id mismatch in trace" ); + Ok(()) } #[test] -fn failed_execution_returns_router_error() { +fn failed_execution_returns_router_error() -> TestResult<()> { // The router returns early with RouterError::ExecutionFailed before writing a trace. // This test verifies the error propagates correctly through the pool dispatch path. - let (router, _) = build_router(Box::new(pool_executor(1, error_handler))); + let (router, _) = build_router(Box::new(pool_executor(1, error_handler)?))?; let result = router.execute(make_request(json!({}))); - assert!(result.is_err(), "expected error from failing capability, got ok"); + assert!( + result.is_err(), + "expected error from failing capability, got ok" + ); let err_msg = format!("{:?}", result.err()); assert!( err_msg.contains("ExecutionFailed") || err_msg.contains("deliberate"), "unexpected error shape: {err_msg}" ); + Ok(()) } // --------------------------------------------------------------------------- @@ -293,8 +316,8 @@ fn failed_execution_returns_router_error() { // --------------------------------------------------------------------------- #[test] -fn wasm_capability_type_rejected_by_thread_pool_executor() { - let pool = pool_executor(2, echo_handler); +fn wasm_capability_type_rejected_by_thread_pool_executor() -> TestResult<()> { + let pool = pool_executor(2, |input| Ok(input.clone()))?; let wasm_cap = executor_cap(ArtifactType::Wasm); let result = pool.execute(&wasm_cap, &json!({})); assert_eq!( @@ -302,4 +325,5 @@ fn wasm_capability_type_rejected_by_thread_pool_executor() { Err(ExecutorError::UnsupportedArtifactType), "expected UnsupportedArtifactType for Wasm artifact" ); + Ok(()) } From 1278280c5727741bea6d3631f58707373f9ecb19 Mon Sep 17 00:00:00 2001 From: Enrico Piovesan Date: Fri, 3 Jul 2026 12:13:20 -0600 Subject: [PATCH 3/3] test: make thread pool parallelism check deterministic --- .../src/executor/thread_pool.rs | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/crates/traverse-runtime/src/executor/thread_pool.rs b/crates/traverse-runtime/src/executor/thread_pool.rs index a25d8b72..8db06901 100644 --- a/crates/traverse-runtime/src/executor/thread_pool.rs +++ b/crates/traverse-runtime/src/executor/thread_pool.rs @@ -129,7 +129,7 @@ mod tests { atomic::{AtomicUsize, Ordering}, }; use std::thread; - use std::time::{Duration, Instant}; + use std::time::Duration; use serde_json::json; @@ -302,16 +302,23 @@ mod tests { #[test] fn concurrent_calls_run_in_parallel() { + let active_calls = Arc::new(AtomicUsize::new(0)); + let max_active_calls = Arc::new(AtomicUsize::new(0)); + let active_for_handler = Arc::clone(&active_calls); + let max_for_handler = Arc::clone(&max_active_calls); + let result = new_executor( 2, - Box::new(NativeExecutor::new(|input| { + Box::new(NativeExecutor::new(move |input| { + let current = active_for_handler.fetch_add(1, Ordering::SeqCst) + 1; + max_for_handler.fetch_max(current, Ordering::SeqCst); thread::sleep(Duration::from_millis(100)); + active_for_handler.fetch_sub(1, Ordering::SeqCst); Ok(input.clone()) })), ) .map(|executor| { let executor = Arc::new(executor); - let started = Instant::now(); let first = { let executor = Arc::clone(&executor); thread::spawn(move || execute_json(&executor, &json!({ "call": 1 }))) @@ -320,26 +327,21 @@ mod tests { let executor = Arc::clone(&executor); thread::spawn(move || execute_json(&executor, &json!({ "call": 2 }))) }; - ( - result_debug(first.join()), - result_debug(second.join()), - started.elapsed(), - ) + (result_debug(first.join()), result_debug(second.join())) }); let first_result = result .as_ref() - .map(|(first, _, _)| first.as_ref().map_err(String::as_str)); + .map(|(first, _)| first.as_ref().map_err(String::as_str)); let second_result = result .as_ref() - .map(|(_, second, _)| second.as_ref().map_err(String::as_str)); - let elapsed = result.as_ref().map(|(_, _, elapsed)| *elapsed); + .map(|(_, second)| second.as_ref().map_err(String::as_str)); assert_eq!(first_result, Ok(Ok(&Ok(json!({ "call": 1 }))))); assert_eq!(second_result, Ok(Ok(&Ok(json!({ "call": 2 }))))); assert!( - matches!(elapsed, Ok(duration) if duration < Duration::from_millis(150)), - "parallel calls took {elapsed:?}" + max_active_calls.load(Ordering::SeqCst) >= 2, + "expected two active calls to overlap" ); }