diff --git a/Cargo.lock b/Cargo.lock index 2623cecc4..9da55d1ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -456,6 +456,7 @@ dependencies = [ "cmds-solana", "cmds-std", "flow-rpc", + "tracing", ] [[package]] diff --git a/crates/all-cmds-server/Cargo.toml b/crates/all-cmds-server/Cargo.toml index 092d537f5..9bd797d97 100644 --- a/crates/all-cmds-server/Cargo.toml +++ b/crates/all-cmds-server/Cargo.toml @@ -10,3 +10,4 @@ cmds-solana.workspace = true cmds-image.workspace = true cmds-bun.workspace = true flow-rpc.workspace = true +tracing = "0.1" diff --git a/crates/all-cmds-server/src/main.rs b/crates/all-cmds-server/src/main.rs index ed804e614..c1d35608c 100644 --- a/crates/all-cmds-server/src/main.rs +++ b/crates/all-cmds-server/src/main.rs @@ -5,5 +5,8 @@ use cmds_solana as _; use cmds_std as _; fn main() { - flow_rpc::command_side::command_server::main().unwrap(); + if let Err(error) = flow_rpc::command_side::command_server::main() { + tracing::error!("all-cmds-server exited: {error:#}"); + std::process::exit(1); + } } diff --git a/crates/cmds-bun/src/umbra/umbra_integration.test.ts b/crates/cmds-bun/src/umbra/umbra_integration.test.ts index c998ae35d..6aa5431d2 100644 --- a/crates/cmds-bun/src/umbra/umbra_integration.test.ts +++ b/crates/cmds-bun/src/umbra/umbra_integration.test.ts @@ -59,6 +59,26 @@ function getFundedKeypair(): Keypair | null { return Keypair.fromSecretKey(bs58.decode(key)); } +async function withTimeout( + promise: Promise, + ms: number, + message: string, +): Promise { + let timeoutId: ReturnType | undefined; + try { + return await Promise.race([ + promise, + new Promise((_, reject) => { + timeoutId = setTimeout(() => reject(new Error(message)), ms); + }), + ]); + } finally { + if (timeoutId !== undefined) { + clearTimeout(timeoutId); + } + } +} + // ── Read-only tests (no funds needed) ───────────────────────────────── describe("Umbra Integration — read-only (mainnet)", () => { @@ -88,11 +108,17 @@ describe("Umbra Integration — read-only (mainnet)", () => { test("fetch_utxos: returns result or throws network error", async () => { const cmd = new UmbraFetchUtxos(dummyNd); try { - const result = await cmd.run(dummyCtx, { + const run = cmd.run(dummyCtx, { ...baseInputs(freshKp.secretKey), tree_index: 0, start_index: 0, }); + run.catch(() => {}); + const result = await withTimeout( + run, + 4_000, + "fetch timed out waiting for Umbra indexer", + ); // If indexer is reachable, should return valid structure expect(Array.isArray(result.utxos)).toBe(true); expect(typeof result.count).toBe("number"); diff --git a/crates/cmds-deno/src/bin/deno-cmds-server.rs b/crates/cmds-deno/src/bin/deno-cmds-server.rs index 7ec722baa..c24650511 100644 --- a/crates/cmds-deno/src/bin/deno-cmds-server.rs +++ b/crates/cmds-deno/src/bin/deno-cmds-server.rs @@ -1,5 +1,8 @@ use cmds_deno as _; fn main() { - flow_rpc::command_side::command_server::main().unwrap(); + if let Err(error) = flow_rpc::command_side::command_server::main() { + tracing::error!("deno-cmds-server exited: {error:#}"); + std::process::exit(1); + } } diff --git a/crates/flow-server/src/api/get_info.rs b/crates/flow-server/src/api/get_info.rs index c4a121dd5..813835857 100644 --- a/crates/flow-server/src/api/get_info.rs +++ b/crates/flow-server/src/api/get_info.rs @@ -1,7 +1,9 @@ use super::prelude::*; use crate::db_worker::{GetIrohInfo, IrohInfo}; use actix::SystemService; -use actix_web::{HttpResponseBuilder, dev::ConnectionInfo, http::header::ContentType}; +use actix_web::{ + HttpResponse, HttpResponseBuilder, dev::ConnectionInfo, http::header::ContentType, +}; use url::Url; #[derive(Serialize)] @@ -21,7 +23,10 @@ pub fn service(config: &Config) -> impl HttpServiceFactory + 'static { async move { let db_worker = DBWorker::from_registry(); - let iroh = db_worker.send(GetIrohInfo).await.unwrap().unwrap(); + let iroh = db_worker + .send(GetIrohInfo) + .await? + .map_err(|error| Error::custom(StatusCode::SERVICE_UNAVAILABLE, error))?; let base_url = format!("{}://{}", info.scheme(), info.host()); let output = Output { supabase_url, @@ -30,9 +35,11 @@ pub fn service(config: &Config) -> impl HttpServiceFactory + 'static { base_url, }; let json = simd_json::to_vec(&output).unwrap(); - HttpResponseBuilder::new(StatusCode::OK) - .insert_header(ContentType::json()) - .body(json) + Ok::( + HttpResponseBuilder::new(StatusCode::OK) + .insert_header(ContentType::json()) + .body(json), + ) } })) } diff --git a/crates/flow-server/src/db_worker/mod.rs b/crates/flow-server/src/db_worker/mod.rs index c53a7833d..209ce3255 100644 --- a/crates/flow-server/src/db_worker/mod.rs +++ b/crates/flow-server/src/db_worker/mod.rs @@ -14,7 +14,6 @@ use flow_rpc::flow_side::address_book::BaseAddressBook; use futures_channel::mpsc; use futures_util::{FutureExt, StreamExt}; use iroh::Watcher; -use n0_watcher::Disconnected; use serde::Serialize; use std::{ convert::Infallible, @@ -88,7 +87,7 @@ pub struct IrohInfo { pub struct GetIrohInfo; impl actix::Message for GetIrohInfo { - type Result = Result; + type Result = anyhow::Result; } impl actix::Handler for DBWorker { @@ -97,15 +96,18 @@ impl actix::Handler for DBWorker { fn handle(&mut self, _: GetIrohInfo, _: &mut Self::Context) -> Self::Result { let endpoint = self.remote_command_address_book.endpoint().clone(); Box::pin(async move { + const IROH_INFO_TIMEOUT: Duration = Duration::from_secs(5); + let node_id = endpoint.node_id().to_string(); - let relay_url = endpoint.home_relay().initialized().await.to_string(); - let direct_addresses = endpoint - .direct_addresses() - .initialized() - .await - .into_iter() - .map(|addr| addr.addr) - .collect(); + let mut home_relay = endpoint.home_relay(); + let mut direct_addresses = endpoint.direct_addresses(); + let (relay_url, direct_addresses) = tokio::time::timeout(IROH_INFO_TIMEOUT, async { + tokio::join!(home_relay.initialized(), direct_addresses.initialized()) + }) + .await + .map_err(|_| anyhow::anyhow!("timed out waiting for iroh endpoint info"))?; + let relay_url = relay_url.to_string(); + let direct_addresses = direct_addresses.into_iter().map(|addr| addr.addr).collect(); Ok(IrohInfo { node_id, diff --git a/lib/flow-rpc/src/command_side/command_server.rs b/lib/flow-rpc/src/command_side/command_server.rs index 3efbfcd76..cf3583853 100644 --- a/lib/flow-rpc/src/command_side/command_server.rs +++ b/lib/flow-rpc/src/command_side/command_server.rs @@ -19,7 +19,7 @@ use super::{ command_trait::HTTP_CLIENT, }; -#[derive(Deserialize, schemars::JsonSchema)] +#[derive(Clone, Deserialize, schemars::JsonSchema)] pub struct FlowServerConfig { apikey: Option, #[serde(flatten)] @@ -39,7 +39,7 @@ fn default_url() -> Url { "https://dev-api.spaceoperator.com".parse().unwrap() } -#[derive(Deserialize, schemars::JsonSchema)] +#[derive(Clone, Deserialize, schemars::JsonSchema)] #[serde(untagged)] pub enum FlowServerAddressConfig { Info { @@ -75,7 +75,7 @@ struct ConfigSchema { apikey: Option, } -#[derive(Deserialize, schemars::JsonSchema)] +#[derive(Clone, Deserialize, schemars::JsonSchema)] pub struct FlowServerAddress { #[schemars(schema_with = "String::json_schema")] pub node_id: iroh::PublicKey, @@ -109,44 +109,64 @@ pub fn main() -> Result<(), anyhow::Error> { }) } -async fn ping(client: &address_book::Client) { +async fn ping(client: &address_book::Client) -> Result<(), anyhow::Error> { loop { if let Err(error) = client.ping().await { tracing::error!("ping failed: {:#}", error); - break; + return Err(error); } tokio::time::sleep(Duration::from_secs(30)).await; } } -pub async fn serve_server( +async fn sleep_or_cancel(duration: Duration, cancel: &CancellationToken) -> bool { + tokio::select! { + _ = tokio::time::sleep(duration) => false, + _ = cancel.cancelled() => true, + } +} + +async fn serve_server_once( endpoint: Endpoint, config: FlowServerConfig, availables: Vec, cancel: CancellationToken, ) -> Result<(), anyhow::Error> { + const INFO_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + const IROH_INFO_TIMEOUT: Duration = Duration::from_secs(10); + let server_addr = match config.address { FlowServerAddressConfig::Info { url } => { let info_url = url.join("/info")?; tracing::info!("using URL: {}", info_url); - let resp = reqwest::get(info_url) - .await? - .json::() - .await? - .iroh; + let resp = tokio::time::timeout(INFO_REQUEST_TIMEOUT, async { + HTTP_CLIENT + .get(info_url.clone()) + .send() + .await? + .error_for_status()? + .json::() + .await + }) + .await + .context("timed out fetching flow-server /info")? + .context("fetch flow-server /info")? + .iroh; resp } FlowServerAddressConfig::Direct(server) => server, }; - let direct_addresses: BTreeSet = endpoint - .direct_addresses() - .initialized() - .await - .into_iter() - .map(|addr| addr.addr) - .collect(); - let relay_url: Url = endpoint.home_relay().initialized().await.into(); + let mut direct_addresses = endpoint.direct_addresses(); + let mut home_relay = endpoint.home_relay(); + let (direct_addresses, relay_url) = tokio::time::timeout(IROH_INFO_TIMEOUT, async { + tokio::join!(direct_addresses.initialized(), home_relay.initialized()) + }) + .await + .context("timed out waiting for local iroh endpoint info")?; + let direct_addresses: BTreeSet = + direct_addresses.into_iter().map(|addr| addr.addr).collect(); + let relay_url: Url = relay_url.into(); let client = address_book::connect_iroh( endpoint.clone(), @@ -171,19 +191,51 @@ pub async fn serve_server( .map(|mut watcher| watcher.get()); tracing::info!("connection type {:?}", conn_type); - future::select( - std::pin::pin!(ping(&client)), - std::pin::pin!(cancel.cancelled()), - ) - .await; + let ping_result = tokio::select! { + result = ping(&client) => result, + _ = cancel.cancelled() => Ok(()), + }; const LEAVE_TIMEOUT: Duration = Duration::from_secs(3); let _ = tokio::time::timeout(LEAVE_TIMEOUT, client.leave()).await; tracing::info!("left {}", server_addr.node_id); + ping_result?; Ok(()) } +pub async fn serve_server( + endpoint: Endpoint, + config: FlowServerConfig, + availables: Vec, + cancel: CancellationToken, +) -> Result<(), anyhow::Error> { + const RETRY_DELAY: Duration = Duration::from_secs(30); + + loop { + match serve_server_once( + endpoint.clone(), + FlowServerConfig { + apikey: config.apikey.clone(), + address: config.address.clone(), + }, + availables.clone(), + cancel.clone(), + ) + .await + { + Ok(()) => return Ok(()), + Err(error) if cancel.is_cancelled() => return Err(error), + Err(error) => { + tracing::error!("command server connection failed: {error:#}; retrying"); + if sleep_or_cancel(RETRY_DELAY, &cancel).await { + return Ok(()); + } + } + } + } +} + pub async fn serve(config: Config, logs: TrackFlowRun) -> Result<(), anyhow::Error> { let endpoint = Endpoint::builder() .secret_key(