Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions crates/traverse-runtime/src/executor/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ mod tests {
atomic::{AtomicUsize, Ordering},
};
use std::thread;
use std::time::{Duration, Instant};
use std::time::Duration;

use serde_json::json;

Expand Down Expand Up @@ -302,16 +302,23 @@ mod tests {

#[test]
fn concurrent_calls_run_in_parallel() {
let active_calls = Arc::new(AtomicUsize::new(0));
let max_active_calls = Arc::new(AtomicUsize::new(0));
let active_for_handler = Arc::clone(&active_calls);
let max_for_handler = Arc::clone(&max_active_calls);

let result = new_executor(
2,
Box::new(NativeExecutor::new(|input| {
Box::new(NativeExecutor::new(move |input| {
let current = active_for_handler.fetch_add(1, Ordering::SeqCst) + 1;
max_for_handler.fetch_max(current, Ordering::SeqCst);
thread::sleep(Duration::from_millis(100));
active_for_handler.fetch_sub(1, Ordering::SeqCst);
Ok(input.clone())
})),
)
.map(|executor| {
let executor = Arc::new(executor);
let started = Instant::now();
let first = {
let executor = Arc::clone(&executor);
thread::spawn(move || execute_json(&executor, &json!({ "call": 1 })))
Expand All @@ -320,26 +327,21 @@ mod tests {
let executor = Arc::clone(&executor);
thread::spawn(move || execute_json(&executor, &json!({ "call": 2 })))
};
(
result_debug(first.join()),
result_debug(second.join()),
started.elapsed(),
)
(result_debug(first.join()), result_debug(second.join()))
});

let first_result = result
.as_ref()
.map(|(first, _, _)| first.as_ref().map_err(String::as_str));
.map(|(first, _)| first.as_ref().map_err(String::as_str));
let second_result = result
.as_ref()
.map(|(_, second, _)| second.as_ref().map_err(String::as_str));
let elapsed = result.as_ref().map(|(_, _, elapsed)| *elapsed);
.map(|(_, second)| second.as_ref().map_err(String::as_str));

assert_eq!(first_result, Ok(Ok(&Ok(json!({ "call": 1 })))));
assert_eq!(second_result, Ok(Ok(&Ok(json!({ "call": 2 })))));
assert!(
matches!(elapsed, Ok(duration) if duration < Duration::from_millis(150)),
"parallel calls took {elapsed:?}"
max_active_calls.load(Ordering::SeqCst) >= 2,
"expected two active calls to overlap"
);
}

Expand Down
329 changes: 329 additions & 0 deletions crates/traverse-runtime/tests/thread_pool_integration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,329 @@
//! Integration tests: `ThreadPoolExecutor` through the `TraverseRuntime` stack.
//!
//! Governed by spec `047-thread-pool-executor`.

use std::{
error::Error,
sync::{Arc, Mutex, PoisonError},
};

use serde_json::{Value, json};
use traverse_contracts::{
BinaryFormat, CapabilityContract, Condition, Entrypoint, EntrypointKind, Execution,
ExecutionConstraints, ExecutionTarget, FilesystemAccess, HostApiAccess, Lifecycle,
NetworkAccess, Owner, Provenance, ProvenanceSource, SchemaContainer, ServiceType, SideEffect,
SideEffectKind,
};
use traverse_runtime::{
events::{EventCatalog, InProcessBroker},
executor::{
ArtifactType, CapabilityExecutor, ExecutorCapability, ExecutorError, NativeExecutor,
ThreadPoolExecutor, ThreadPoolExecutorConfig,
},
placement::{PlacementConstraintEvaluator, RuntimeSnapshot},
router::{CapabilityExecutorRegistry, PlacementRouter, RouterRequest},
trace::TraceStore,
};

const TEST_SPEC: &str = "047-thread-pool-executor@1.0.0";
type SharedTraceStore = Arc<Mutex<TraceStore>>;
type TestResult<T> = Result<T, Box<dyn Error>>;

// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------

fn error_handler(_input: &Value) -> Result<Value, String> {
Err("deliberate error".to_string())
}

fn pool_executor(
capacity: usize,
handler: impl Fn(&Value) -> Result<Value, String> + Send + Sync + 'static,
) -> TestResult<ThreadPoolExecutor> {
Ok(ThreadPoolExecutor::new(
ThreadPoolExecutorConfig { capacity },
Box::new(NativeExecutor::new(handler)),
)?)
}

fn test_contract() -> CapabilityContract {
CapabilityContract {
kind: "capability_contract".to_string(),
schema_version: "1.0.0".to_string(),
id: "pool.integration.subject".to_string(),
namespace: "pool.integration".to_string(),
name: "subject".to_string(),
version: "0.1.0".to_string(),
lifecycle: Lifecycle::Draft,
owner: Owner {
team: "traverse-core".to_string(),
contact: "test@example.com".to_string(),
},
summary: "Thread pool integration test subject.".to_string(),
description: "Used only in thread pool integration tests.".to_string(),
inputs: SchemaContainer {
schema: json!({ "type": "object" }),
},
outputs: SchemaContainer {
schema: json!({ "type": "object" }),
},
preconditions: vec![Condition {
id: "always-met".to_string(),
description: "No preconditions.".to_string(),
}],
postconditions: vec![Condition {
id: "always-met".to_string(),
description: "No postconditions.".to_string(),
}],
side_effects: vec![SideEffect {
kind: SideEffectKind::MemoryOnly,
description: "No durable side effects.".to_string(),
}],
emits: Vec::new(),
consumes: Vec::new(),
permissions: Vec::new(),
execution: Execution {
binary_format: BinaryFormat::Wasm,
entrypoint: Entrypoint {
kind: EntrypointKind::WasiCommand,
command: "pool.integration.subject".to_string(),
},
preferred_targets: vec![ExecutionTarget::Local],
constraints: ExecutionConstraints {
host_api_access: HostApiAccess::None,
network_access: NetworkAccess::Forbidden,
filesystem_access: FilesystemAccess::None,
},
},
policies: Vec::new(),
dependencies: Vec::new(),
provenance: Provenance {
source: ProvenanceSource::Greenfield,
author: "traverse-core".to_string(),
created_at: "2026-07-03T00:00:00Z".to_string(),
spec_ref: Some(TEST_SPEC.to_string()),
adr_refs: Vec::new(),
exception_refs: Vec::new(),
},
evidence: Vec::new(),
service_type: ServiceType::Stateless,
permitted_targets: vec![ExecutionTarget::Local],
event_trigger: None,
connector_requirements: Vec::new(),
state_schema: None,
}
}

fn executor_cap(artifact_type: ArtifactType) -> ExecutorCapability {
ExecutorCapability {
capability_id: "pool.integration.subject".to_string(),
artifact_type,
wasm_binary_path: None,
wasm_checksum: None,
host_abi_version: None,
}
}

fn idle_snapshot() -> RuntimeSnapshot {
RuntimeSnapshot {
target_loads: [(ExecutionTarget::Local, 0.0)].into_iter().collect(),
}
}

fn build_router(
executor: Box<dyn CapabilityExecutor>,
) -> TestResult<(PlacementRouter, SharedTraceStore)> {
let catalog = Arc::new(EventCatalog::new());
let broker = Arc::new(InProcessBroker::new(Arc::clone(&catalog))?);
let trace_store = Arc::new(Mutex::new(TraceStore::new()));
let mut registry = CapabilityExecutorRegistry::new();
registry.insert(ArtifactType::Native, executor);
let router = PlacementRouter::new(
PlacementConstraintEvaluator,
registry,
Arc::clone(&trace_store),
broker,
);
Ok((router, trace_store))
}

fn make_request(input: Value) -> RouterRequest {
RouterRequest {
capability_id: "pool.integration.subject".to_string(),
artifact_type: ArtifactType::Native,
contract: test_contract(),
target_hint: Some(ExecutionTarget::Local),
runtime_snapshot: idle_snapshot(),
input,
executor_capability: executor_cap(ArtifactType::Native),
emitted_events: Vec::new(),
}
}

// ---------------------------------------------------------------------------
// Drop-in replacement
// ---------------------------------------------------------------------------

#[test]
fn native_executor_and_thread_pool_produce_identical_output() -> TestResult<()> {
let input = json!({ "key": "value" });
let cap = executor_cap(ArtifactType::Native);

let native = NativeExecutor::new(|input| Ok(input.clone()));
let native_result = native.execute(&cap, &input);

let pool = pool_executor(2, |input| Ok(input.clone()))?;
let pool_result = pool.execute(&cap, &input);

assert_eq!(native_result.ok(), pool_result.ok());
Ok(())
}

#[test]
fn thread_pool_executor_satisfies_capability_executor_trait_object() -> TestResult<()> {
let executor: Box<dyn CapabilityExecutor> =
Box::new(pool_executor(2, |input| Ok(input.clone()))?);
let cap = executor_cap(ArtifactType::Native);
let result = executor.execute(&cap, &json!({}));
assert!(result.is_ok(), "trait object execute failed: {result:?}");
Ok(())
}

// ---------------------------------------------------------------------------
// Router integration
// ---------------------------------------------------------------------------

#[test]
fn router_routes_to_thread_pool_executor() -> TestResult<()> {
let (router, _) = build_router(Box::new(pool_executor(2, |input| Ok(input.clone()))?))?;
let input = json!({ "x": 1 });
let result = router.execute(make_request(input.clone()));
assert!(result.is_ok(), "router execute failed: {result:?}");
assert_eq!(result.ok().map(|r| r.output), Some(input));
Ok(())
}

#[test]
fn router_concurrent_requests_to_same_capability() -> TestResult<()> {
let (router, _) = build_router(Box::new(pool_executor(8, |input| Ok(input.clone()))?))?;
let router = Arc::new(router);
let errors: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(vec![]));

std::thread::scope(|s| {
for i in 0_u32..8 {
let router = Arc::clone(&router);
let errors = Arc::clone(&errors);
s.spawn(
move || match router.execute(make_request(json!({ "i": i }))) {
Ok(resp) => {
if resp.output != json!({ "i": i }) {
errors
.lock()
.unwrap_or_else(PoisonError::into_inner)
.push(format!("thread {i}: wrong output {:?}", resp.output));
}
}
Err(e) => {
errors
.lock()
.unwrap_or_else(PoisonError::into_inner)
.push(format!("thread {i}: router error {e:?}"));
}
},
);
}
});

let errors = errors.lock().unwrap_or_else(PoisonError::into_inner);
assert!(errors.is_empty(), "concurrent router errors: {errors:?}");
Ok(())
}

// ---------------------------------------------------------------------------
// Trace correctness
// ---------------------------------------------------------------------------

#[test]
fn concurrent_executions_produce_isolated_traces() -> TestResult<()> {
let (router, trace_store) =
build_router(Box::new(pool_executor(4, |input| Ok(input.clone()))?))?;
let router = Arc::new(router);

std::thread::scope(|s| {
for i in 0_u32..4 {
let router = Arc::clone(&router);
s.spawn(move || {
let _ = router.execute(make_request(json!({ "i": i })));
});
}
});

let store = trace_store.lock().unwrap_or_else(PoisonError::into_inner);
let entries = store.list_public(None);
assert_eq!(
entries.len(),
4,
"expected 4 trace entries, got {}",
entries.len()
);
assert!(
entries
.iter()
.all(|e| e.capability_id == "pool.integration.subject"),
"unexpected capability_id in traces"
);
Ok(())
}

#[test]
fn trace_capability_id_matches_executed_capability() -> TestResult<()> {
let (router, trace_store) =
build_router(Box::new(pool_executor(2, |input| Ok(input.clone()))?))?;
let _ = router.execute(make_request(json!({})));
let store = trace_store.lock().unwrap_or_else(PoisonError::into_inner);
let entries = store.list_public(Some("pool.integration.subject"));
assert!(!entries.is_empty(), "no trace entries found");
assert!(
entries
.iter()
.all(|e| e.capability_id == "pool.integration.subject"),
"capability_id mismatch in trace"
);
Ok(())
}

#[test]
fn failed_execution_returns_router_error() -> TestResult<()> {
// The router returns early with RouterError::ExecutionFailed before writing a trace.
// This test verifies the error propagates correctly through the pool dispatch path.
let (router, _) = build_router(Box::new(pool_executor(1, error_handler)?))?;
let result = router.execute(make_request(json!({})));
assert!(
result.is_err(),
"expected error from failing capability, got ok"
);
let err_msg = format!("{:?}", result.err());
assert!(
err_msg.contains("ExecutionFailed") || err_msg.contains("deliberate"),
"unexpected error shape: {err_msg}"
);
Ok(())
}

// ---------------------------------------------------------------------------
// WASM path unchanged
// ---------------------------------------------------------------------------

#[test]
fn wasm_capability_type_rejected_by_thread_pool_executor() -> TestResult<()> {
let pool = pool_executor(2, |input| Ok(input.clone()))?;
let wasm_cap = executor_cap(ArtifactType::Wasm);
let result = pool.execute(&wasm_cap, &json!({}));
assert_eq!(
result,
Err(ExecutorError::UnsupportedArtifactType),
"expected UnsupportedArtifactType for Wasm artifact"
);
Ok(())
}
Loading