Skip to content
Merged

patch #336

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/all-cmds-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ cmds-solana.workspace = true
cmds-image.workspace = true
cmds-bun.workspace = true
flow-rpc.workspace = true
tracing = "0.1"
5 changes: 4 additions & 1 deletion crates/all-cmds-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@ use cmds_solana as _;
use cmds_std as _;

fn main() {
flow_rpc::command_side::command_server::main().unwrap();
if let Err(error) = flow_rpc::command_side::command_server::main() {
tracing::error!("all-cmds-server exited: {error:#}");
std::process::exit(1);
}
}
28 changes: 27 additions & 1 deletion crates/cmds-bun/src/umbra/umbra_integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,26 @@ function getFundedKeypair(): Keypair | null {
return Keypair.fromSecretKey(bs58.decode(key));
}

async function withTimeout<T>(
promise: Promise<T>,
ms: number,
message: string,
): Promise<T> {
let timeoutId: ReturnType<typeof setTimeout> | undefined;
try {
return await Promise.race([
promise,
new Promise<never>((_, reject) => {
timeoutId = setTimeout(() => reject(new Error(message)), ms);
}),
]);
} finally {
if (timeoutId !== undefined) {
clearTimeout(timeoutId);
}
}
}

// ── Read-only tests (no funds needed) ─────────────────────────────────

describe("Umbra Integration — read-only (mainnet)", () => {
Expand Down Expand Up @@ -88,11 +108,17 @@ describe("Umbra Integration — read-only (mainnet)", () => {
test("fetch_utxos: returns result or throws network error", async () => {
const cmd = new UmbraFetchUtxos(dummyNd);
try {
const result = await cmd.run(dummyCtx, {
const run = cmd.run(dummyCtx, {
...baseInputs(freshKp.secretKey),
tree_index: 0,
start_index: 0,
});
run.catch(() => {});
const result = await withTimeout(
run,
4_000,
"fetch timed out waiting for Umbra indexer",
);
// If indexer is reachable, should return valid structure
expect(Array.isArray(result.utxos)).toBe(true);
expect(typeof result.count).toBe("number");
Expand Down
5 changes: 4 additions & 1 deletion crates/cmds-deno/src/bin/deno-cmds-server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use cmds_deno as _;

fn main() {
flow_rpc::command_side::command_server::main().unwrap();
if let Err(error) = flow_rpc::command_side::command_server::main() {
tracing::error!("deno-cmds-server exited: {error:#}");
std::process::exit(1);
}
}
17 changes: 12 additions & 5 deletions crates/flow-server/src/api/get_info.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use super::prelude::*;
use crate::db_worker::{GetIrohInfo, IrohInfo};
use actix::SystemService;
use actix_web::{HttpResponseBuilder, dev::ConnectionInfo, http::header::ContentType};
use actix_web::{
HttpResponse, HttpResponseBuilder, dev::ConnectionInfo, http::header::ContentType,
};
use url::Url;

#[derive(Serialize)]
Expand All @@ -21,7 +23,10 @@ pub fn service(config: &Config) -> impl HttpServiceFactory + 'static {

async move {
let db_worker = DBWorker::from_registry();
let iroh = db_worker.send(GetIrohInfo).await.unwrap().unwrap();
let iroh = db_worker
.send(GetIrohInfo)
.await?
.map_err(|error| Error::custom(StatusCode::SERVICE_UNAVAILABLE, error))?;
let base_url = format!("{}://{}", info.scheme(), info.host());
let output = Output {
supabase_url,
Expand All @@ -30,9 +35,11 @@ pub fn service(config: &Config) -> impl HttpServiceFactory + 'static {
base_url,
};
let json = simd_json::to_vec(&output).unwrap();
HttpResponseBuilder::new(StatusCode::OK)
.insert_header(ContentType::json())
.body(json)
Ok::<HttpResponse, Error>(
HttpResponseBuilder::new(StatusCode::OK)
.insert_header(ContentType::json())
.body(json),
)
}
}))
}
22 changes: 12 additions & 10 deletions crates/flow-server/src/db_worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use flow_rpc::flow_side::address_book::BaseAddressBook;
use futures_channel::mpsc;
use futures_util::{FutureExt, StreamExt};
use iroh::Watcher;
use n0_watcher::Disconnected;
use serde::Serialize;
use std::{
convert::Infallible,
Expand Down Expand Up @@ -88,7 +87,7 @@ pub struct IrohInfo {
pub struct GetIrohInfo;

impl actix::Message for GetIrohInfo {
type Result = Result<IrohInfo, Disconnected>;
type Result = anyhow::Result<IrohInfo>;
}

impl actix::Handler<GetIrohInfo> for DBWorker {
Expand All @@ -97,15 +96,18 @@ impl actix::Handler<GetIrohInfo> for DBWorker {
fn handle(&mut self, _: GetIrohInfo, _: &mut Self::Context) -> Self::Result {
let endpoint = self.remote_command_address_book.endpoint().clone();
Box::pin(async move {
const IROH_INFO_TIMEOUT: Duration = Duration::from_secs(5);

let node_id = endpoint.node_id().to_string();
let relay_url = endpoint.home_relay().initialized().await.to_string();
let direct_addresses = endpoint
.direct_addresses()
.initialized()
.await
.into_iter()
.map(|addr| addr.addr)
.collect();
let mut home_relay = endpoint.home_relay();
let mut direct_addresses = endpoint.direct_addresses();
let (relay_url, direct_addresses) = tokio::time::timeout(IROH_INFO_TIMEOUT, async {
tokio::join!(home_relay.initialized(), direct_addresses.initialized())
})
.await
.map_err(|_| anyhow::anyhow!("timed out waiting for iroh endpoint info"))?;
let relay_url = relay_url.to_string();
let direct_addresses = direct_addresses.into_iter().map(|addr| addr.addr).collect();

Ok(IrohInfo {
node_id,
Expand Down
100 changes: 76 additions & 24 deletions lib/flow-rpc/src/command_side/command_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use super::{
command_trait::HTTP_CLIENT,
};

#[derive(Deserialize, schemars::JsonSchema)]
#[derive(Clone, Deserialize, schemars::JsonSchema)]
pub struct FlowServerConfig {
apikey: Option<String>,
#[serde(flatten)]
Expand All @@ -39,7 +39,7 @@ fn default_url() -> Url {
"https://dev-api.spaceoperator.com".parse().unwrap()
}

#[derive(Deserialize, schemars::JsonSchema)]
#[derive(Clone, Deserialize, schemars::JsonSchema)]
#[serde(untagged)]
pub enum FlowServerAddressConfig {
Info {
Expand Down Expand Up @@ -75,7 +75,7 @@ struct ConfigSchema {
apikey: Option<String>,
}

#[derive(Deserialize, schemars::JsonSchema)]
#[derive(Clone, Deserialize, schemars::JsonSchema)]
pub struct FlowServerAddress {
#[schemars(schema_with = "String::json_schema")]
pub node_id: iroh::PublicKey,
Expand Down Expand Up @@ -109,44 +109,64 @@ pub fn main() -> Result<(), anyhow::Error> {
})
}

async fn ping(client: &address_book::Client) {
async fn ping(client: &address_book::Client) -> Result<(), anyhow::Error> {
loop {
if let Err(error) = client.ping().await {
tracing::error!("ping failed: {:#}", error);
break;
return Err(error);
}
tokio::time::sleep(Duration::from_secs(30)).await;
}
}

pub async fn serve_server(
async fn sleep_or_cancel(duration: Duration, cancel: &CancellationToken) -> bool {
tokio::select! {
_ = tokio::time::sleep(duration) => false,
_ = cancel.cancelled() => true,
}
}

async fn serve_server_once(
endpoint: Endpoint,
config: FlowServerConfig,
availables: Vec<MatchCommand>,
cancel: CancellationToken,
) -> Result<(), anyhow::Error> {
const INFO_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
const IROH_INFO_TIMEOUT: Duration = Duration::from_secs(10);

let server_addr = match config.address {
FlowServerAddressConfig::Info { url } => {
let info_url = url.join("/info")?;
tracing::info!("using URL: {}", info_url);
let resp = reqwest::get(info_url)
.await?
.json::<InfoResponse>()
.await?
.iroh;
let resp = tokio::time::timeout(INFO_REQUEST_TIMEOUT, async {
HTTP_CLIENT
.get(info_url.clone())
.send()
.await?
.error_for_status()?
.json::<InfoResponse>()
.await
})
.await
.context("timed out fetching flow-server /info")?
.context("fetch flow-server /info")?
.iroh;
resp
}
FlowServerAddressConfig::Direct(server) => server,
};

let direct_addresses: BTreeSet<SocketAddr> = endpoint
.direct_addresses()
.initialized()
.await
.into_iter()
.map(|addr| addr.addr)
.collect();
let relay_url: Url = endpoint.home_relay().initialized().await.into();
let mut direct_addresses = endpoint.direct_addresses();
let mut home_relay = endpoint.home_relay();
let (direct_addresses, relay_url) = tokio::time::timeout(IROH_INFO_TIMEOUT, async {
tokio::join!(direct_addresses.initialized(), home_relay.initialized())
})
.await
.context("timed out waiting for local iroh endpoint info")?;
let direct_addresses: BTreeSet<SocketAddr> =
direct_addresses.into_iter().map(|addr| addr.addr).collect();
let relay_url: Url = relay_url.into();

let client = address_book::connect_iroh(
endpoint.clone(),
Expand All @@ -171,19 +191,51 @@ pub async fn serve_server(
.map(|mut watcher| watcher.get());
tracing::info!("connection type {:?}", conn_type);

future::select(
std::pin::pin!(ping(&client)),
std::pin::pin!(cancel.cancelled()),
)
.await;
let ping_result = tokio::select! {
result = ping(&client) => result,
_ = cancel.cancelled() => Ok(()),
};

const LEAVE_TIMEOUT: Duration = Duration::from_secs(3);
let _ = tokio::time::timeout(LEAVE_TIMEOUT, client.leave()).await;
tracing::info!("left {}", server_addr.node_id);

ping_result?;
Ok(())
}

pub async fn serve_server(
endpoint: Endpoint,
config: FlowServerConfig,
availables: Vec<MatchCommand>,
cancel: CancellationToken,
) -> Result<(), anyhow::Error> {
const RETRY_DELAY: Duration = Duration::from_secs(30);

loop {
match serve_server_once(
endpoint.clone(),
FlowServerConfig {
apikey: config.apikey.clone(),
address: config.address.clone(),
},
availables.clone(),
cancel.clone(),
)
.await
{
Ok(()) => return Ok(()),
Err(error) if cancel.is_cancelled() => return Err(error),
Err(error) => {
tracing::error!("command server connection failed: {error:#}; retrying");
if sleep_or_cancel(RETRY_DELAY, &cancel).await {
return Ok(());
}
}
}
}
}

pub async fn serve(config: Config, logs: TrackFlowRun) -> Result<(), anyhow::Error> {
let endpoint = Endpoint::builder()
.secret_key(
Expand Down
Loading