From 1dc0b4cbcec1737af21fa085e62bbc9a47f00222 Mon Sep 17 00:00:00 2001 From: Edson Petry Date: Fri, 6 Mar 2026 09:45:31 -0500 Subject: [PATCH 01/16] feat: add new rpc for automatic worker discovery --- console/examples/cluster.rs | 26 ++- console/examples/console_worker.rs | 20 ++ src/flight_service/worker.rs | 6 +- src/lib.rs | 10 +- src/observability/generated/observability.rs | 233 +++++++++++++++---- src/observability/mod.rs | 5 +- src/observability/proto/observability.proto | 7 + src/observability/service.rs | 26 ++- 8 files changed, 268 insertions(+), 65 deletions(-) diff --git a/console/examples/cluster.rs b/console/examples/cluster.rs index 73260c05..5b3c14f7 100644 --- a/console/examples/cluster.rs +++ b/console/examples/cluster.rs @@ -1,9 +1,12 @@ -use datafusion_distributed::Worker; +use datafusion::error::DataFusionError; +use datafusion_distributed::{Worker, WorkerResolver}; use std::error::Error; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::sync::Arc; use structopt::StructOpt; use tokio::net::TcpListener; use tonic::transport::Server; +use url::Url; #[derive(StructOpt)] #[structopt( @@ -38,6 +41,7 @@ async fn main() -> Result<(), Box> { .expect("port overflow: base_port + workers exceeds u16::MAX") }, ); + let localhost_resolver = Arc::new(LocalhostWorkerResolver { ports }); let listener = TcpListener::bind(addr).await?; let port = listener.local_addr()?.port(); ports.push(port); @@ -46,7 +50,7 @@ async fn main() -> Result<(), Box> { let worker = Worker::default(); Server::builder() - .add_service(worker.with_observability_service()) + .add_service(worker.with_observability_service(localhost_resolver)) .add_service(worker.into_flight_server()) .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) .await @@ -78,3 +82,21 @@ async fn main() -> Result<(), Box> { Ok(()) } + +#[derive(Clone)] +struct LocalhostWorkerResolver { + ports: Vec, +} + +#[async_trait] +impl WorkerResolver for LocalhostWorkerResolver { + fn get_urls(&self) -> Result, DataFusionError> { + self.ports + .iter() + .map(|port| { + let url_string = format!("http://localhost:{port}"); + Url::parse(&url_string).map_err(|e| DataFusionError::External(Box::new(e))) + }) + .collect::, _>>() + } +} diff --git a/console/examples/console_worker.rs b/console/examples/console_worker.rs index 189e8230..7934caf6 100644 --- a/console/examples/console_worker.rs +++ b/console/examples/console_worker.rs @@ -12,11 +12,13 @@ use tonic::transport::Server; struct Args { #[structopt(default_value = "8080")] port: u16, + } #[tokio::main] async fn main() -> Result<(), Box> { let args = Args::from_args(); + let localhost_worker_resolver = Arc::new(LocalhostWorkerResolver { ports: }) let worker = Worker::default(); @@ -28,3 +30,21 @@ async fn main() -> Result<(), Box> { Ok(()) } + +#[derive(Clone)] +struct LocalhostWorkerResolver { + ports: Vec, +} + +#[async_trait] +impl WorkerResolver for LocalhostWorkerResolver { + fn get_urls(&self) -> Result, DataFusionError> { + self.ports + .iter() + .map(|port| { + let url_string = format!("http://localhost:{port}"); + Url::parse(&url_string).map_err(|e| DataFusionError::External(Box::new(e))) + }) + .collect::, _>>() + } +} diff --git a/src/flight_service/worker.rs b/src/flight_service/worker.rs index b277b400..a47d4d3f 100644 --- a/src/flight_service/worker.rs +++ b/src/flight_service/worker.rs @@ -2,7 +2,9 @@ use crate::flight_service::WorkerSessionBuilder; use crate::flight_service::do_action::{INIT_ACTION_TYPE, TaskData}; use crate::flight_service::single_write_multi_read::SingleWriteMultiRead; use crate::protobuf::StageKey; -use crate::{DefaultSessionBuilder, ObservabilityServiceImpl, ObservabilityServiceServer}; +use crate::{ + DefaultSessionBuilder, ObservabilityServiceImpl, ObservabilityServiceServer, WorkerResolver, +}; use arrow_flight::flight_service_server::{FlightService, FlightServiceServer}; use arrow_flight::{ Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, @@ -135,9 +137,11 @@ impl Worker { pub fn with_observability_service( &self, + worker_resolver: Arc, ) -> ObservabilityServiceServer { ObservabilityServiceServer::new(ObservabilityServiceImpl::new( self.task_data_entries.clone(), + worker_resolver, )) } diff --git a/src/lib.rs b/src/lib.rs index f56f0384..ae205e3c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -46,12 +46,10 @@ pub use stage::{ }; pub use observability::{ - GetTaskProgressRequest, GetTaskProgressResponse, ObservabilityService, - ObservabilityServiceClient, ObservabilityServiceImpl, ObservabilityServiceServer, PingRequest, - PingResponse, StageKey as ObservabilityStageKey, TaskProgress, TaskStatus, WorkerMetrics, + GetClusterWorkersRequest, GetClusterWorkersResponse, GetTaskProgressRequest, + GetTaskProgressResponse, ObservabilityService, ObservabilityServiceClient, + ObservabilityServiceImpl, ObservabilityServiceServer, PingRequest, PingResponse, + StageKey as ObservabilityStageKey, TaskProgress, TaskStatus, WorkerMetrics, }; pub use protobuf::StageKey; - -#[cfg(any(feature = "integration", test))] -pub use execution_plans::benchmarks::ShuffleBench; diff --git a/src/observability/generated/observability.rs b/src/observability/generated/observability.rs index a032eb07..522a47b3 100644 --- a/src/observability/generated/observability.rs +++ b/src/observability/generated/observability.rs @@ -46,6 +46,13 @@ pub struct GetTaskProgressResponse { #[prost(message, optional, tag = "2")] pub worker_metrics: ::core::option::Option, } +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct GetClusterWorkersRequest {} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct GetClusterWorkersResponse { + #[prost(string, repeated, tag = "1")] + pub worker_urls: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum TaskStatus { @@ -79,10 +86,10 @@ pub mod observability_service_client { dead_code, missing_docs, clippy::wildcard_imports, - clippy::let_unit_value + clippy::let_unit_value, )] - use tonic::codegen::http::Uri; use tonic::codegen::*; + use tonic::codegen::http::Uri; #[derive(Debug, Clone)] pub struct ObservabilityServiceClient { inner: tonic::client::Grpc, @@ -121,13 +128,14 @@ pub mod observability_service_client { F: tonic::service::Interceptor, T::ResponseBody: Default, T: tonic::codegen::Service< - http::Request, - Response = http::Response< - >::ResponseBody, - >, + http::Request, + Response = http::Response< + >::ResponseBody, >, - >>::Error: - Into + std::marker::Send + std::marker::Sync, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, { ObservabilityServiceClient::new(InterceptedService::new(inner, interceptor)) } @@ -166,36 +174,79 @@ pub mod observability_service_client { &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::unknown(format!("Service was not ready: {}", e.into())) - })?; + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic_prost::ProstCodec::default(); - let path = - http::uri::PathAndQuery::from_static("/observability.ObservabilityService/Ping"); + let path = http::uri::PathAndQuery::from_static( + "/observability.ObservabilityService/Ping", + ); let mut req = request.into_request(); - req.extensions_mut().insert(GrpcMethod::new( - "observability.ObservabilityService", - "Ping", - )); + req.extensions_mut() + .insert(GrpcMethod::new("observability.ObservabilityService", "Ping")); self.inner.unary(req, path, codec).await } pub async fn get_task_progress( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> - { - self.inner.ready().await.map_err(|e| { - tonic::Status::unknown(format!("Service was not ready: {}", e.into())) - })?; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic_prost::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( "/observability.ObservabilityService/GetTaskProgress", ); let mut req = request.into_request(); - req.extensions_mut().insert(GrpcMethod::new( - "observability.ObservabilityService", - "GetTaskProgress", - )); + req.extensions_mut() + .insert( + GrpcMethod::new( + "observability.ObservabilityService", + "GetTaskProgress", + ), + ); + self.inner.unary(req, path, codec).await + } + pub async fn get_cluster_workers( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/observability.ObservabilityService/GetClusterWorkers", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "observability.ObservabilityService", + "GetClusterWorkers", + ), + ); self.inner.unary(req, path, codec).await } } @@ -207,7 +258,7 @@ pub mod observability_service_server { dead_code, missing_docs, clippy::wildcard_imports, - clippy::let_unit_value + clippy::let_unit_value, )] use tonic::codegen::*; /// Generated trait containing gRPC methods that should be implemented for use with ObservabilityServiceServer. @@ -220,7 +271,17 @@ pub mod observability_service_server { async fn get_task_progress( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn get_cluster_workers( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } #[derive(Debug)] pub struct ObservabilityServiceServer { @@ -243,7 +304,10 @@ pub mod observability_service_server { max_encoding_message_size: None, } } - pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService where F: tonic::service::Interceptor, { @@ -278,7 +342,8 @@ pub mod observability_service_server { self } } - impl tonic::codegen::Service> for ObservabilityServiceServer + impl tonic::codegen::Service> + for ObservabilityServiceServer where T: ObservabilityService, B: Body + std::marker::Send + 'static, @@ -298,9 +363,14 @@ pub mod observability_service_server { "/observability.ObservabilityService/Ping" => { #[allow(non_camel_case_types)] struct PingSvc(pub Arc); - impl tonic::server::UnaryService for PingSvc { + impl< + T: ObservabilityService, + > tonic::server::UnaryService for PingSvc { type Response = super::PingResponse; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, @@ -337,19 +407,25 @@ pub mod observability_service_server { "/observability.ObservabilityService/GetTaskProgress" => { #[allow(non_camel_case_types)] struct GetTaskProgressSvc(pub Arc); - impl - tonic::server::UnaryService - for GetTaskProgressSvc - { + impl< + T: ObservabilityService, + > tonic::server::UnaryService + for GetTaskProgressSvc { type Response = super::GetTaskProgressResponse; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::get_task_progress(&inner, request) + ::get_task_progress( + &inner, + request, + ) .await }; Box::pin(fut) @@ -377,19 +453,74 @@ pub mod observability_service_server { }; Box::pin(fut) } - _ => Box::pin(async move { - let mut response = http::Response::new(tonic::body::Body::default()); - let headers = response.headers_mut(); - headers.insert( - tonic::Status::GRPC_STATUS, - (tonic::Code::Unimplemented as i32).into(), - ); - headers.insert( - http::header::CONTENT_TYPE, - tonic::metadata::GRPC_CONTENT_TYPE, - ); - Ok(response) - }), + "/observability.ObservabilityService/GetClusterWorkers" => { + #[allow(non_camel_case_types)] + struct GetClusterWorkersSvc(pub Arc); + impl< + T: ObservabilityService, + > tonic::server::UnaryService + for GetClusterWorkersSvc { + type Response = super::GetClusterWorkersResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_cluster_workers( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetClusterWorkersSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + let mut response = http::Response::new( + tonic::body::Body::default(), + ); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } } } } diff --git a/src/observability/mod.rs b/src/observability/mod.rs index 6c07b7b5..c48906ae 100644 --- a/src/observability/mod.rs +++ b/src/observability/mod.rs @@ -7,7 +7,8 @@ pub use generated::observability::observability_service_server::{ }; pub use generated::observability::{ - GetTaskProgressRequest, GetTaskProgressResponse, PingRequest, PingResponse, StageKey, - TaskProgress, TaskStatus, WorkerMetrics, + GetClusterWorkersRequest, GetClusterWorkersResponse, GetTaskProgressRequest, + GetTaskProgressResponse, PingRequest, PingResponse, StageKey, TaskProgress, TaskStatus, + WorkerMetrics, }; pub use service::ObservabilityServiceImpl; diff --git a/src/observability/proto/observability.proto b/src/observability/proto/observability.proto index 8e0f8d15..6c0c403d 100644 --- a/src/observability/proto/observability.proto +++ b/src/observability/proto/observability.proto @@ -4,6 +4,7 @@ package observability; service ObservabilityService { rpc Ping (PingRequest) returns (PingResponse); rpc GetTaskProgress (GetTaskProgressRequest) returns (GetTaskProgressResponse); + rpc GetClusterWorkers (GetClusterWorkersRequest) returns (GetClusterWorkersResponse); } message PingRequest {} @@ -44,3 +45,9 @@ message GetTaskProgressResponse { repeated TaskProgress tasks = 1; WorkerMetrics worker_metrics = 2; } + +message GetClusterWorkersRequest {} + +message GetClusterWorkersResponse { + repeated string worker_urls = 1; +} diff --git a/src/observability/service.rs b/src/observability/service.rs index 3a9cd38a..00672677 100644 --- a/src/observability/service.rs +++ b/src/observability/service.rs @@ -1,4 +1,5 @@ use crate::flight_service::{SingleWriteMultiRead, TaskData}; +use crate::networking::WorkerResolver; use crate::protobuf::StageKey; use datafusion::error::DataFusionError; use datafusion::physical_plan::ExecutionPlan; @@ -13,14 +14,18 @@ use tokio::sync::watch; use tonic::{Request, Response, Status}; use super::{ - GetTaskProgressResponse, ObservabilityService, TaskProgress, TaskStatus, WorkerMetrics, - generated::observability::{GetTaskProgressRequest, PingRequest, PingResponse}, + GetClusterWorkersResponse, GetTaskProgressResponse, ObservabilityService, TaskProgress, + TaskStatus, WorkerMetrics, + generated::observability::{ + GetClusterWorkersRequest, GetTaskProgressRequest, PingRequest, PingResponse, + }, }; type ResultTaskData = Result>; pub struct ObservabilityServiceImpl { task_data_entries: Arc>>>, + worker_resolver: Arc, #[cfg(feature = "system-metrics")] system: watch::Receiver, } @@ -28,6 +33,7 @@ pub struct ObservabilityServiceImpl { impl ObservabilityServiceImpl { pub fn new( task_data_entries: Arc>>>, + worker_resolver: Arc, ) -> Self { #[cfg(feature = "system-metrics")] let (tx, rx) = tokio::sync::watch::channel(WorkerMetrics::default()); @@ -64,9 +70,9 @@ impl ObservabilityServiceImpl { } }); } - Self { task_data_entries, + worker_resolver, #[cfg(feature = "system-metrics")] system: rx, } @@ -112,6 +118,20 @@ impl ObservabilityService for ObservabilityServiceImpl { worker_metrics, })) } + + async fn get_cluster_workers( + &self, + _request: Request, + ) -> Result, Status> { + let urls = self + .worker_resolver + .get_urls() + .map_err(|e| Status::internal(format!("Failed to resolve workers: {e}")))?; + + let worker_urls = urls.into_iter().map(|url| url.to_string()).collect(); + + Ok(Response::new(GetClusterWorkersResponse { worker_urls })) + } } impl ObservabilityServiceImpl { From d06a63ea6cf94b893efa3f4aa7d0dbcd64eee074 Mon Sep 17 00:00:00 2001 From: Edson Petry Date: Fri, 6 Mar 2026 09:46:05 -0500 Subject: [PATCH 02/16] feat: change observability service method to accept user defined worker resolver --- console/examples/console_worker.rs | 15 +++++++++++---- src/flight_service/worker.rs | 9 +++++++-- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/console/examples/console_worker.rs b/console/examples/console_worker.rs index 7934caf6..aefbe9d1 100644 --- a/console/examples/console_worker.rs +++ b/console/examples/console_worker.rs @@ -1,8 +1,11 @@ -use datafusion_distributed::Worker; +use async_trait::async_trait; +use datafusion::error::DataFusionError; +use datafusion_distributed::{Worker, WorkerResolver}; use std::error::Error; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use structopt::StructOpt; use tonic::transport::Server; +use url::Url; #[derive(StructOpt)] #[structopt( @@ -13,17 +16,21 @@ struct Args { #[structopt(default_value = "8080")] port: u16, + /// The ports holding Distributed DataFusion workers. + #[structopt(long = "cluster-ports", use_delimiter = true)] + cluster_ports: Vec, } #[tokio::main] async fn main() -> Result<(), Box> { let args = Args::from_args(); - let localhost_worker_resolver = Arc::new(LocalhostWorkerResolver { ports: }) - + let localhost_resolver = LocalhostWorkerResolver { + ports: args.cluster_ports.clone(), + }; let worker = Worker::default(); Server::builder() - .add_service(worker.with_observability_service()) + .add_service(worker.with_observability_service(localhost_resolver)) .add_service(worker.into_flight_server()) .serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), args.port)) .await?; diff --git a/src/flight_service/worker.rs b/src/flight_service/worker.rs index a47d4d3f..dbdc2004 100644 --- a/src/flight_service/worker.rs +++ b/src/flight_service/worker.rs @@ -135,13 +135,18 @@ impl Worker { .max_encoding_message_size(usize::MAX) } + /// Creates an [`ObservabilityServiceServer`] that exposes task progress and cluster + /// worker discovery via the provided [`WorkerResolver`]. + /// + /// The returned server is meant to be added to the same [`tonic::transport::Server`] as the + /// Flight service — gRPC multiplexes both services on a single port. pub fn with_observability_service( &self, - worker_resolver: Arc, + worker_resolver: impl WorkerResolver + Send + Sync + 'static, ) -> ObservabilityServiceServer { ObservabilityServiceServer::new(ObservabilityServiceImpl::new( self.task_data_entries.clone(), - worker_resolver, + Arc::new(worker_resolver), )) } From 381e9eef4b85274722dee3dfbe038d7b8da7f5cb Mon Sep 17 00:00:00 2001 From: Edson Petry Date: Fri, 6 Mar 2026 10:19:51 -0500 Subject: [PATCH 03/16] feat(console): add automatic worker discovery via GetCluserWorkers RPC Console now supports two modes: - Auto-discovery (default): connects to a seed worker, calls GetClusterWorkers to find all cluster workers, and re-polls every 5s for topology changes - Manual mode (--cluster-ports): unchanged behavior for local dev New --connect flag specifies a seed URL; defaults to localhost:6789 --- console/examples/cluster.rs | 21 +++++++++-- console/src/app.rs | 72 +++++++++++++++++++++++++++++++++++-- console/src/main.rs | 33 +++++++++++++---- console/src/worker.rs | 29 ++++++++++++++- 4 files changed, 141 insertions(+), 14 deletions(-) diff --git a/console/examples/cluster.rs b/console/examples/cluster.rs index 5b3c14f7..2221479c 100644 --- a/console/examples/cluster.rs +++ b/console/examples/cluster.rs @@ -29,7 +29,9 @@ async fn main() -> Result<(), Box> { let args = Args::from_args(); let mut ports = Vec::new(); + let mut listeners = Vec::new(); + // Bind all listeners first so we know all ports before starting workers for i in 0..args.workers { let addr = SocketAddr::new( IpAddr::V4(Ipv4Addr::LOCALHOST), @@ -45,12 +47,21 @@ async fn main() -> Result<(), Box> { let listener = TcpListener::bind(addr).await?; let port = listener.local_addr()?.port(); ports.push(port); + listeners.push(listener); + } + + // Create a shared resolver that knows about all workers + let resolver = LocalhostClusterResolver { + ports: Arc::new(RwLock::new(ports.clone())), + }; + for listener in listeners { + let resolver = resolver.clone(); tokio::spawn(async move { let worker = Worker::default(); Server::builder() - .add_service(worker.with_observability_service(localhost_resolver)) + .add_service(worker.with_observability_service(resolver)) .add_service(worker.into_flight_server()) .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) .await @@ -65,7 +76,12 @@ async fn main() -> Result<(), Box> { .join(","); println!("Started {} workers on ports: {ports_csv}\n", args.workers); - println!("Console:"); + println!("Console (auto-discovery via any worker):"); + println!( + "\tcargo run -p datafusion-distributed-console -- --connect http://localhost:{}", + ports[0] + ); + println!("Console (manual):"); println!("\tcargo run -p datafusion-distributed-console -- --cluster-ports {ports_csv}"); println!("TPC-DS runner:"); println!( @@ -77,7 +93,6 @@ async fn main() -> Result<(), Box> { ); println!("Press Ctrl+C to stop all workers."); - // Block forever tokio::signal::ctrl_c().await?; Ok(()) diff --git a/console/src/app.rs b/console/src/app.rs index 43a23e3d..91d303f6 100644 --- a/console/src/app.rs +++ b/console/src/app.rs @@ -1,9 +1,17 @@ use crate::state::{ClusterViewState, SortColumn, SortDirection, View, WorkerViewState}; -use crate::worker::{ConnectionStatus, WorkerConn}; +use crate::worker::{ConnectionStatus, WorkerConn, discover_cluster_workers}; use std::collections::HashSet; use std::time::{Duration, Instant}; use url::Url; +/// How the console discovers workers. +pub(crate) enum DiscoveryMode { + /// Manual mode: static list of worker URLs from `--cluster-ports`. + Manual, + /// Discovery mode: periodically call `GetClusterWorkers` on the seed URL. + Auto { seed_url: Url }, +} + /// App holds the main application state. pub(crate) struct App { pub(crate) workers: Vec, @@ -21,6 +29,10 @@ pub(crate) struct App { prev_output_rows_time: Option, /// Smoothed cluster-wide throughput in rows/s. pub(crate) current_throughput: f64, + /// How the console discovers workers. + discovery_mode: DiscoveryMode, + /// Last time we ran worker discovery. + last_discovery: Option, } /// Cluster-wide statistics for the header. @@ -35,11 +47,22 @@ pub(crate) struct ClusterStats { pub(crate) active_queries: usize, } +/// Interval between worker discovery polls. +const DISCOVERY_INTERVAL: Duration = Duration::from_secs(5); + impl App { - /// Create a new App with the given worker URLs. - pub(crate) fn new(worker_urls: Vec) -> Self { + /// Create a new App in manual mode with an explicit list of worker URLs. + pub(crate) fn new_manual(worker_urls: Vec) -> Self { let workers = worker_urls.into_iter().map(WorkerConn::new).collect(); + Self::with_workers(workers, DiscoveryMode::Manual) + } + + /// Create a new App in auto-discovery mode with a seed URL. + pub(crate) fn new_discovery(seed_url: Url) -> Self { + Self::with_workers(Vec::new(), DiscoveryMode::Auto { seed_url }) + } + fn with_workers(workers: Vec, discovery_mode: DiscoveryMode) -> Self { App { workers, active_query_count: 0, @@ -54,6 +77,8 @@ impl App { prev_output_rows_total: 0, prev_output_rows_time: None, current_throughput: 0.0, + discovery_mode, + last_discovery: None, } } @@ -63,6 +88,9 @@ impl App { return; } + // Run worker discovery if in auto mode + self.maybe_discover_workers().await; + // Attempt connection for workers in Connecting or Disconnected state for worker in &mut self.workers { if worker.should_retry_connection() { @@ -89,6 +117,44 @@ impl App { self.update_throughput(); } + /// Periodically discovers workers via `GetClusterWorkers` on the seed URL. + async fn maybe_discover_workers(&mut self) { + let seed_url = match &self.discovery_mode { + DiscoveryMode::Manual => return, + DiscoveryMode::Auto { seed_url } => seed_url.clone(), + }; + + let should_discover = match self.last_discovery { + None => true, + Some(last) => last.elapsed() >= DISCOVERY_INTERVAL, + }; + + if !should_discover { + return; + } + + self.last_discovery = Some(Instant::now()); + + let discovered_urls = match discover_cluster_workers(&seed_url).await { + Ok(urls) => urls, + Err(_) => return, + }; + + // Build set of currently known URLs (owned to avoid borrow conflict) + let known_urls: HashSet = self.workers.iter().map(|w| w.url.clone()).collect(); + + // Add new workers + for url in &discovered_urls { + if !known_urls.contains(url) { + self.workers.push(WorkerConn::new(url.clone())); + } + } + + // Remove workers that are no longer in the discovered set + let discovered_set: HashSet<&Url> = discovered_urls.iter().collect(); + self.workers.retain(|w| discovered_set.contains(&w.url)); + } + /// Update cluster-wide throughput from output rows delta. fn update_throughput(&mut self) { let current_total: u64 = self.workers.iter().map(|w| w.output_rows_total).sum(); diff --git a/console/src/main.rs b/console/src/main.rs index bb3b6f57..dbae5f9b 100644 --- a/console/src/main.rs +++ b/console/src/main.rs @@ -6,6 +6,7 @@ mod worker; use app::App; use crossterm::event::{self, Event}; +use datafusion_distributed::DEFAULT_WORKER_PORT; use ratatui::DefaultTerminal; use std::time::{Duration, Instant}; use structopt::StructOpt; @@ -21,6 +22,12 @@ struct Args { #[structopt(long = "cluster-ports", use_delimiter = true)] cluster_ports: Vec, + /// URL of a seed worker for auto-discovery (e.g. http://localhost:6789). + /// The console will call GetClusterWorkers to discover all workers. + /// Mutually exclusive with --cluster-ports. + #[arg(long = "connect", conflicts_with = "cluster_ports")] + connect: Option, + /// Polling interval in milliseconds #[structopt(long = "poll-interval", default_value = "100")] poll_interval: u64, @@ -32,14 +39,26 @@ async fn main() -> color_eyre::Result<()> { let args = Args::from_args(); - let worker_urls: Vec = args - .cluster_ports - .iter() - .map(|port| Url::parse(&format!("http://localhost:{port}")).expect("valid localhost URL")) - .collect(); - let poll_interval = Duration::from_millis(args.poll_interval); - let mut app = App::new(worker_urls); + + let mut app = if !args.cluster_ports.is_empty() { + // Manual mode: explicit list of localhost ports + let worker_urls: Vec = args + .cluster_ports + .iter() + .map(|port| { + Url::parse(&format!("http://localhost:{port}")).expect("valid localhost URL") + }) + .collect(); + App::new_manual(worker_urls) + } else { + // Discovery mode: connect to seed URL (default: localhost:DEFAULT_WORKER_PORT) + let seed_url = args.connect.unwrap_or_else(|| { + Url::parse(&format!("http://localhost:{DEFAULT_WORKER_PORT}")) + .expect("valid default URL") + }); + App::new_discovery(seed_url) + }; let mut terminal = ratatui::init(); terminal.clear()?; diff --git a/console/src/worker.rs b/console/src/worker.rs index 6bf4a919..ffc4c14e 100644 --- a/console/src/worker.rs +++ b/console/src/worker.rs @@ -1,5 +1,6 @@ use datafusion_distributed::{ - GetTaskProgressRequest, ObservabilityServiceClient, PingRequest, TaskProgress, TaskStatus, + GetClusterWorkersRequest, GetTaskProgressRequest, ObservabilityServiceClient, PingRequest, + TaskProgress, TaskStatus, }; use std::collections::{HashMap, HashSet, VecDeque}; use std::time::{Duration, Instant}; @@ -364,3 +365,29 @@ fn push_history(buf: &mut VecDeque, value: u64) { } buf.push_back(value); } + +/// Connects to a seed worker and calls `GetClusterWorkers` to discover all worker URLs. +pub(crate) async fn discover_cluster_workers(seed_url: &Url) -> Result, String> { + let mut client = ObservabilityServiceClient::connect(seed_url.to_string()) + .await + .map_err(|e| format!("Failed to connect to seed worker {seed_url}: {e}"))?; + + client + .ping(PingRequest {}) + .await + .map_err(|e| format!("Seed worker {seed_url} ping failed: {e}"))?; + + let response = client + .get_cluster_workers(GetClusterWorkersRequest {}) + .await + .map_err(|e| format!("GetClusterWorkers failed on {seed_url}: {e}"))?; + + let urls = response + .into_inner() + .worker_urls + .into_iter() + .filter_map(|s| Url::parse(&s).ok()) + .collect(); + + Ok(urls) +} From 61d254c71017a20d0b3488f35b2c324febad75c0 Mon Sep 17 00:00:00 2001 From: Edson Petry Date: Fri, 6 Mar 2026 10:20:22 -0500 Subject: [PATCH 04/16] feat(benchmarks): add observability service to benchmark worker.rs --- benchmarks/cdk/bin/worker.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/benchmarks/cdk/bin/worker.rs b/benchmarks/cdk/bin/worker.rs index 5d019c5e..a58d4330 100644 --- a/benchmarks/cdk/bin/worker.rs +++ b/benchmarks/cdk/bin/worker.rs @@ -206,6 +206,7 @@ async fn main() -> Result<(), Box> { ), ); let grpc_server = Server::builder() + .add_service(worker.with_observability_service(Ec2WorkerResolver::new())) .add_service(worker.into_flight_server()) .serve(WORKER_ADDR.parse()?); From b4e218d08d3564c9f2ec3b852b0be70415fdd1e0 Mon Sep 17 00:00:00 2001 From: Edson Petry Date: Fri, 6 Mar 2026 11:10:01 -0500 Subject: [PATCH 05/16] refactor(console): remove manual --cluster-ports mode, always use auto-discovery --- benchmarks/cdk/bin/worker.rs | 15 ++++++------ console/examples/cluster.md | 5 +++- console/examples/cluster.rs | 4 +--- console/examples/console.md | 21 ++++++++++++----- console/examples/console_worker.rs | 4 ++-- console/examples/tpcds_runner.md | 18 ++++++++------- console/src/app.rs | 37 ++++++------------------------ console/src/main.rs | 36 ++++++++--------------------- src/lib.rs | 3 +++ 9 files changed, 59 insertions(+), 84 deletions(-) diff --git a/benchmarks/cdk/bin/worker.rs b/benchmarks/cdk/bin/worker.rs index a58d4330..39f76877 100644 --- a/benchmarks/cdk/bin/worker.rs +++ b/benchmarks/cdk/bin/worker.rs @@ -10,9 +10,10 @@ use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::physical_plan::execute_stream; use datafusion::prelude::SessionContext; use datafusion_distributed::{ - ChannelResolver, DistributedExt, DistributedMetricsFormat, DistributedPhysicalOptimizerRule, - Worker, WorkerResolver, display_plan_ascii, get_distributed_channel_resolver, - get_distributed_worker_resolver, rewrite_distributed_plan_with_metrics, + ChannelResolver, DEFAULT_WORKER_PORT, DistributedExt, DistributedMetricsFormat, + DistributedPhysicalOptimizerRule, Worker, WorkerResolver, display_plan_ascii, + get_distributed_channel_resolver, get_distributed_worker_resolver, + rewrite_distributed_plan_with_metrics, }; use futures::{StreamExt, TryFutureExt}; use log::{error, info, warn}; @@ -73,7 +74,7 @@ async fn main() -> Result<(), Box> { let cmd = Cmd::from_args(); const LISTENER_ADDR: &str = "0.0.0.0:9000"; - const WORKER_ADDR: &str = "0.0.0.0:9001"; + let worker_addr = format!("0.0.0.0:{DEFAULT_WORKER_PORT}"); info!("Starting HTTP listener on {LISTENER_ADDR}..."); let listener = tokio::net::TcpListener::bind(LISTENER_ADDR).await?; @@ -208,10 +209,10 @@ async fn main() -> Result<(), Box> { let grpc_server = Server::builder() .add_service(worker.with_observability_service(Ec2WorkerResolver::new())) .add_service(worker.into_flight_server()) - .serve(WORKER_ADDR.parse()?); + .serve(worker_addr.parse()?); info!("Started listener HTTP server in {LISTENER_ADDR}"); - info!("Started distributed DataFusion worker in {WORKER_ADDR}"); + info!("Started distributed DataFusion worker in {worker_addr}"); tokio::select! { result = http_server => result?, @@ -291,7 +292,7 @@ async fn background_ec2_worker_resolver(urls: Arc>>) { for reservation in result.reservations() { for instance in reservation.instances() { if let Some(private_ip) = instance.private_ip_address() { - let url = Url::parse(&format!("http://{private_ip}:9001")).unwrap(); + let url = Url::parse(&format!("http://{private_ip}:{DEFAULT_WORKER_PORT}")).unwrap(); workers.push(url); } } diff --git a/console/examples/cluster.md b/console/examples/cluster.md index 253656c3..6ae31577 100644 --- a/console/examples/cluster.md +++ b/console/examples/cluster.md @@ -38,11 +38,14 @@ After starting, the example prints ready-to-use commands. For example: Started 4 workers on ports: 9000,9001,9002,9003 Console: - cargo run -p datafusion-distributed-console -- --cluster-ports 9000,9001,9002,9003 + cargo run -p datafusion-distributed-console -- --connect http://localhost:9000 TPC-DS runner: cargo run -p datafusion-distributed-console --example tpcds_runner -- --cluster-ports 9000,9001,9002,9003 Single query: cargo run -p datafusion-distributed-console --example console_run -- --cluster-ports 9000,9001,9002,9003 "SELECT 1" ``` +Note: if one of the workers is on the default port (9001), you can just run +`cargo run -p datafusion-distributed-console` without any arguments. + Press `Ctrl+C` to stop all workers. diff --git a/console/examples/cluster.rs b/console/examples/cluster.rs index 2221479c..dc93eff4 100644 --- a/console/examples/cluster.rs +++ b/console/examples/cluster.rs @@ -76,13 +76,11 @@ async fn main() -> Result<(), Box> { .join(","); println!("Started {} workers on ports: {ports_csv}\n", args.workers); - println!("Console (auto-discovery via any worker):"); + println!("Console (connect to any worker for auto-discovery):"); println!( "\tcargo run -p datafusion-distributed-console -- --connect http://localhost:{}", ports[0] ); - println!("Console (manual):"); - println!("\tcargo run -p datafusion-distributed-console -- --cluster-ports {ports_csv}"); println!("TPC-DS runner:"); println!( "\tcargo run -p datafusion-distributed-console --example tpcds_runner -- --cluster-ports {ports_csv}" diff --git a/console/examples/console.md b/console/examples/console.md index 03710e86..a7e6a21d 100644 --- a/console/examples/console.md +++ b/console/examples/console.md @@ -6,25 +6,34 @@ while running distributed queries. ## Terminal 1: Start workers with observability ```bash - cargo run -p datafusion-distributed-console --example console_worker -- 8080 - cargo run -p datafusion-distributed-console --example console_worker -- 8081 + cargo run -p datafusion-distributed-console --example console_worker + cargo run -p datafusion-distributed-console --example console_worker -- 9002 ``` +The first worker starts on the default port (9001). The second on 9002. + ## Terminal 2: Start console ```bash - cargo run -p datafusion-distributed-console -- --cluster-ports 8080,8081 + cargo run -p datafusion-distributed-console ``` -The TUI console will start and connect to the workers. It will show "Waiting for tasks..." -until queries are executed. +The console connects to `localhost:9001` by default and auto-discovers all +workers in the cluster via `GetClusterWorkers`. It will show "Waiting for +tasks..." until queries are executed. + +To connect to a worker on a non-default port, use `--connect`: + +```bash + cargo run -p datafusion-distributed-console -- --connect http://localhost:9002 +``` ## Terminal 3: Run a query ```bash cargo run -p datafusion-distributed-console --example console_run -- \ "SELECT * FROM weather LIMIT 100" \ - --cluster-ports 8080,8081 + --cluster-ports 9001,9002 ``` The console will display real-time task progress across all workers. diff --git a/console/examples/console_worker.rs b/console/examples/console_worker.rs index aefbe9d1..e647e203 100644 --- a/console/examples/console_worker.rs +++ b/console/examples/console_worker.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; use datafusion::error::DataFusionError; -use datafusion_distributed::{Worker, WorkerResolver}; +use datafusion_distributed::{DEFAULT_WORKER_PORT, Worker, WorkerResolver}; use std::error::Error; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use structopt::StructOpt; @@ -13,7 +13,7 @@ use url::Url; about = "A localhost DataFusion worker with observability" )] struct Args { - #[structopt(default_value = "8080")] + #[structopt(default_value = DEFAULT_WORKER_PORT)] port: u16, /// The ports holding Distributed DataFusion workers. diff --git a/console/examples/tpcds_runner.md b/console/examples/tpcds_runner.md index 630d9396..f68613cd 100644 --- a/console/examples/tpcds_runner.md +++ b/console/examples/tpcds_runner.md @@ -22,10 +22,10 @@ may take a few minutes). Start 4 workers on different ports in different terminals: ```bash -cargo run -p datafusion-distributed-console --example -- console_worker -- 8080 -cargo run -p datafusion-distributed-console --example -- console_worker -- 8081 -cargo run -p datafusion-distributed-console --example -- console_worker -- 8082 -cargo run -p datafusion-distributed-console --example -- console_worker -- 8083 +cargo run -p datafusion-distributed-console --example console_worker +cargo run -p datafusion-distributed-console --example console_worker -- 9002 +cargo run -p datafusion-distributed-console --example console_worker -- 9003 +cargo run -p datafusion-distributed-console --example console_worker -- 9004 ``` ### Step 2: Start the Console (Terminal 5) @@ -34,19 +34,21 @@ cargo run -p datafusion-distributed-console --example -- console_worker -- 8083 cargo run -p datafusion-distributed-console ``` +The console auto-discovers all workers via the default port (9001). + ### Step 3: Run TPC-DS Queries (Terminal 6) #### Run a single query ```bash -cargo run -p datafusion-distributed-console --example tpcds_runner \ - --cluster-ports 8080,8081,8082,8083 \ +cargo run -p datafusion-distributed-console --example tpcds_runner -- \ + --cluster-ports 9001,9002,9003,9004 \ --query q99 ``` #### Run all TPC-DS queries sequentially ```bash -cargo run -p datafusion-distributed-console --example tpcds_runner \ - --cluster-ports 8080,8081,8082,8083 \ +cargo run -p datafusion-distributed-console --example tpcds_runner -- \ + --cluster-ports 9001,9002,9003,9004 ``` diff --git a/console/src/app.rs b/console/src/app.rs index 91d303f6..04ec2b19 100644 --- a/console/src/app.rs +++ b/console/src/app.rs @@ -4,14 +4,6 @@ use std::collections::HashSet; use std::time::{Duration, Instant}; use url::Url; -/// How the console discovers workers. -pub(crate) enum DiscoveryMode { - /// Manual mode: static list of worker URLs from `--cluster-ports`. - Manual, - /// Discovery mode: periodically call `GetClusterWorkers` on the seed URL. - Auto { seed_url: Url }, -} - /// App holds the main application state. pub(crate) struct App { pub(crate) workers: Vec, @@ -29,8 +21,8 @@ pub(crate) struct App { prev_output_rows_time: Option, /// Smoothed cluster-wide throughput in rows/s. pub(crate) current_throughput: f64, - /// How the console discovers workers. - discovery_mode: DiscoveryMode, + /// Seed URL for worker discovery via `GetClusterWorkers`. + seed_url: Url, /// Last time we ran worker discovery. last_discovery: Option, } @@ -51,20 +43,10 @@ pub(crate) struct ClusterStats { const DISCOVERY_INTERVAL: Duration = Duration::from_secs(5); impl App { - /// Create a new App in manual mode with an explicit list of worker URLs. - pub(crate) fn new_manual(worker_urls: Vec) -> Self { - let workers = worker_urls.into_iter().map(WorkerConn::new).collect(); - Self::with_workers(workers, DiscoveryMode::Manual) - } - - /// Create a new App in auto-discovery mode with a seed URL. - pub(crate) fn new_discovery(seed_url: Url) -> Self { - Self::with_workers(Vec::new(), DiscoveryMode::Auto { seed_url }) - } - - fn with_workers(workers: Vec, discovery_mode: DiscoveryMode) -> Self { + /// Create a new App that discovers workers via `GetClusterWorkers` on the seed URL. + pub(crate) fn new(seed_url: Url) -> Self { App { - workers, + workers: Vec::new(), active_query_count: 0, current_view: View::ClusterOverview, cluster_state: ClusterViewState::default(), @@ -77,7 +59,7 @@ impl App { prev_output_rows_total: 0, prev_output_rows_time: None, current_throughput: 0.0, - discovery_mode, + seed_url, last_discovery: None, } } @@ -119,11 +101,6 @@ impl App { /// Periodically discovers workers via `GetClusterWorkers` on the seed URL. async fn maybe_discover_workers(&mut self) { - let seed_url = match &self.discovery_mode { - DiscoveryMode::Manual => return, - DiscoveryMode::Auto { seed_url } => seed_url.clone(), - }; - let should_discover = match self.last_discovery { None => true, Some(last) => last.elapsed() >= DISCOVERY_INTERVAL, @@ -135,7 +112,7 @@ impl App { self.last_discovery = Some(Instant::now()); - let discovered_urls = match discover_cluster_workers(&seed_url).await { + let discovered_urls = match discover_cluster_workers(&self.seed_url).await { Ok(urls) => urls, Err(_) => return, }; diff --git a/console/src/main.rs b/console/src/main.rs index dbae5f9b..2ea7fe1e 100644 --- a/console/src/main.rs +++ b/console/src/main.rs @@ -18,14 +18,10 @@ use url::Url; about = "Console for monitoring DataFusion distributed workers" )] struct Args { - /// Comma-delimited list of worker ports (assumed localhost) - #[structopt(long = "cluster-ports", use_delimiter = true)] - cluster_ports: Vec, - - /// URL of a seed worker for auto-discovery (e.g. http://localhost:6789). - /// The console will call GetClusterWorkers to discover all workers. - /// Mutually exclusive with --cluster-ports. - #[arg(long = "connect", conflicts_with = "cluster_ports")] + /// URL of a worker to connect to for auto-discovery (e.g. http://localhost:9001). + /// The console calls GetClusterWorkers on this worker to discover the full cluster. + /// Defaults to http://localhost:9001. + #[arg(long = "connect")] connect: Option, /// Polling interval in milliseconds @@ -39,26 +35,12 @@ async fn main() -> color_eyre::Result<()> { let args = Args::from_args(); - let poll_interval = Duration::from_millis(args.poll_interval); + let seed_url = args.connect.unwrap_or_else(|| { + Url::parse(&format!("http://localhost:{DEFAULT_WORKER_PORT}")).expect("valid default URL") + }); - let mut app = if !args.cluster_ports.is_empty() { - // Manual mode: explicit list of localhost ports - let worker_urls: Vec = args - .cluster_ports - .iter() - .map(|port| { - Url::parse(&format!("http://localhost:{port}")).expect("valid localhost URL") - }) - .collect(); - App::new_manual(worker_urls) - } else { - // Discovery mode: connect to seed URL (default: localhost:DEFAULT_WORKER_PORT) - let seed_url = args.connect.unwrap_or_else(|| { - Url::parse(&format!("http://localhost:{DEFAULT_WORKER_PORT}")) - .expect("valid default URL") - }); - App::new_discovery(seed_url) - }; + let poll_interval = Duration::from_millis(args.poll_interval); + let mut app = App::new(seed_url); let mut terminal = ratatui::init(); terminal.clear()?; diff --git a/src/lib.rs b/src/lib.rs index ae205e3c..be71d2e0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,3 +53,6 @@ pub use observability::{ }; pub use protobuf::StageKey; +/// Default port for workers to listen on (Flight + Observability gRPC services). +/// The console connects to `localhost:DEFAULT_WORKER_PORT` when no arguments are provided. +pub const DEFAULT_WORKER_PORT: u16 = 9001; From 629d294c0bc7a5027832341b28876885e496c138 Mon Sep 17 00:00:00 2001 From: Edson Petry Date: Fri, 13 Mar 2026 11:22:57 -0400 Subject: [PATCH 06/16] fmt: cargo fmt --- benchmarks/cdk/bin/worker.rs | 3 +- src/observability/generated/observability.rs | 192 +++++++------------ 2 files changed, 69 insertions(+), 126 deletions(-) diff --git a/benchmarks/cdk/bin/worker.rs b/benchmarks/cdk/bin/worker.rs index 39f76877..9956342c 100644 --- a/benchmarks/cdk/bin/worker.rs +++ b/benchmarks/cdk/bin/worker.rs @@ -292,7 +292,8 @@ async fn background_ec2_worker_resolver(urls: Arc>>) { for reservation in result.reservations() { for instance in reservation.instances() { if let Some(private_ip) = instance.private_ip_address() { - let url = Url::parse(&format!("http://{private_ip}:{DEFAULT_WORKER_PORT}")).unwrap(); + let url = Url::parse(&format!("http://{private_ip}:{DEFAULT_WORKER_PORT}")) + .unwrap(); workers.push(url); } } diff --git a/src/observability/generated/observability.rs b/src/observability/generated/observability.rs index 522a47b3..cc7fe513 100644 --- a/src/observability/generated/observability.rs +++ b/src/observability/generated/observability.rs @@ -86,10 +86,10 @@ pub mod observability_service_client { dead_code, missing_docs, clippy::wildcard_imports, - clippy::let_unit_value, + clippy::let_unit_value )] - use tonic::codegen::*; use tonic::codegen::http::Uri; + use tonic::codegen::*; #[derive(Debug, Clone)] pub struct ObservabilityServiceClient { inner: tonic::client::Grpc, @@ -128,14 +128,13 @@ pub mod observability_service_client { F: tonic::service::Interceptor, T::ResponseBody: Default, T: tonic::codegen::Service< - http::Request, - Response = http::Response< - >::ResponseBody, + http::Request, + Response = http::Response< + >::ResponseBody, + >, >, - >, - , - >>::Error: Into + std::marker::Send + std::marker::Sync, + >>::Error: + Into + std::marker::Send + std::marker::Sync, { ObservabilityServiceClient::new(InterceptedService::new(inner, interceptor)) } @@ -174,79 +173,55 @@ pub mod observability_service_client { &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result, tonic::Status> { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; let codec = tonic_prost::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/observability.ObservabilityService/Ping", - ); + let path = + http::uri::PathAndQuery::from_static("/observability.ObservabilityService/Ping"); let mut req = request.into_request(); - req.extensions_mut() - .insert(GrpcMethod::new("observability.ObservabilityService", "Ping")); + req.extensions_mut().insert(GrpcMethod::new( + "observability.ObservabilityService", + "Ping", + )); self.inner.unary(req, path, codec).await } pub async fn get_task_progress( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result, tonic::Status> + { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; let codec = tonic_prost::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( "/observability.ObservabilityService/GetTaskProgress", ); let mut req = request.into_request(); - req.extensions_mut() - .insert( - GrpcMethod::new( - "observability.ObservabilityService", - "GetTaskProgress", - ), - ); + req.extensions_mut().insert(GrpcMethod::new( + "observability.ObservabilityService", + "GetTaskProgress", + )); self.inner.unary(req, path, codec).await } pub async fn get_cluster_workers( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result, tonic::Status> + { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; let codec = tonic_prost::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( "/observability.ObservabilityService/GetClusterWorkers", ); let mut req = request.into_request(); - req.extensions_mut() - .insert( - GrpcMethod::new( - "observability.ObservabilityService", - "GetClusterWorkers", - ), - ); + req.extensions_mut().insert(GrpcMethod::new( + "observability.ObservabilityService", + "GetClusterWorkers", + )); self.inner.unary(req, path, codec).await } } @@ -258,7 +233,7 @@ pub mod observability_service_server { dead_code, missing_docs, clippy::wildcard_imports, - clippy::let_unit_value, + clippy::let_unit_value )] use tonic::codegen::*; /// Generated trait containing gRPC methods that should be implemented for use with ObservabilityServiceServer. @@ -271,17 +246,11 @@ pub mod observability_service_server { async fn get_task_progress( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; async fn get_cluster_workers( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; } #[derive(Debug)] pub struct ObservabilityServiceServer { @@ -304,10 +273,7 @@ pub mod observability_service_server { max_encoding_message_size: None, } } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> InterceptedService + pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService where F: tonic::service::Interceptor, { @@ -342,8 +308,7 @@ pub mod observability_service_server { self } } - impl tonic::codegen::Service> - for ObservabilityServiceServer + impl tonic::codegen::Service> for ObservabilityServiceServer where T: ObservabilityService, B: Body + std::marker::Send + 'static, @@ -363,14 +328,9 @@ pub mod observability_service_server { "/observability.ObservabilityService/Ping" => { #[allow(non_camel_case_types)] struct PingSvc(pub Arc); - impl< - T: ObservabilityService, - > tonic::server::UnaryService for PingSvc { + impl tonic::server::UnaryService for PingSvc { type Response = super::PingResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, @@ -407,25 +367,19 @@ pub mod observability_service_server { "/observability.ObservabilityService/GetTaskProgress" => { #[allow(non_camel_case_types)] struct GetTaskProgressSvc(pub Arc); - impl< - T: ObservabilityService, - > tonic::server::UnaryService - for GetTaskProgressSvc { + impl + tonic::server::UnaryService + for GetTaskProgressSvc + { type Response = super::GetTaskProgressResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::get_task_progress( - &inner, - request, - ) + ::get_task_progress(&inner, request) .await }; Box::pin(fut) @@ -456,25 +410,19 @@ pub mod observability_service_server { "/observability.ObservabilityService/GetClusterWorkers" => { #[allow(non_camel_case_types)] struct GetClusterWorkersSvc(pub Arc); - impl< - T: ObservabilityService, - > tonic::server::UnaryService - for GetClusterWorkersSvc { + impl + tonic::server::UnaryService + for GetClusterWorkersSvc + { type Response = super::GetClusterWorkersResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::get_cluster_workers( - &inner, - request, - ) + ::get_cluster_workers(&inner, request) .await }; Box::pin(fut) @@ -502,25 +450,19 @@ pub mod observability_service_server { }; Box::pin(fut) } - _ => { - Box::pin(async move { - let mut response = http::Response::new( - tonic::body::Body::default(), - ); - let headers = response.headers_mut(); - headers - .insert( - tonic::Status::GRPC_STATUS, - (tonic::Code::Unimplemented as i32).into(), - ); - headers - .insert( - http::header::CONTENT_TYPE, - tonic::metadata::GRPC_CONTENT_TYPE, - ); - Ok(response) - }) - } + _ => Box::pin(async move { + let mut response = http::Response::new(tonic::body::Body::default()); + let headers = response.headers_mut(); + headers.insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers.insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }), } } } From 9b4dd5ae2eef21af5d4500fd3cc9c031624d9077 Mon Sep 17 00:00:00 2001 From: Edson Petry Date: Tue, 17 Mar 2026 11:31:51 -0400 Subject: [PATCH 07/16] feat: add observability service to benchmark ec2 worker --- benchmarks/cdk/bin/worker.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/benchmarks/cdk/bin/worker.rs b/benchmarks/cdk/bin/worker.rs index 9956342c..4ad571c7 100644 --- a/benchmarks/cdk/bin/worker.rs +++ b/benchmarks/cdk/bin/worker.rs @@ -206,8 +206,9 @@ async fn main() -> Result<(), Box> { }), ), ); + let ec2_worker_resolver = Arc::new(Ec2WorkerResolver::new()); let grpc_server = Server::builder() - .add_service(worker.with_observability_service(Ec2WorkerResolver::new())) + .add_service(worker.with_observability_service(ec2_worker_resolver)) .add_service(worker.into_flight_server()) .serve(worker_addr.parse()?); From 17f5b8064c1305ce2884491f7a22b0c4f4b8335a Mon Sep 17 00:00:00 2001 From: Edson Petry Date: Tue, 17 Mar 2026 11:33:24 -0400 Subject: [PATCH 08/16] refactor: change worker resolver field in observability_service type param --- src/flight_service/worker.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/flight_service/worker.rs b/src/flight_service/worker.rs index dbdc2004..d98cb497 100644 --- a/src/flight_service/worker.rs +++ b/src/flight_service/worker.rs @@ -142,11 +142,11 @@ impl Worker { /// Flight service — gRPC multiplexes both services on a single port. pub fn with_observability_service( &self, - worker_resolver: impl WorkerResolver + Send + Sync + 'static, + worker_resolver: Arc, ) -> ObservabilityServiceServer { ObservabilityServiceServer::new(ObservabilityServiceImpl::new( self.task_data_entries.clone(), - Arc::new(worker_resolver), + worker_resolver, )) } From 295896ce73ace3cea79bd2c7316488557eced864 Mon Sep 17 00:00:00 2001 From: Edson Petry Date: Tue, 17 Mar 2026 11:33:56 -0400 Subject: [PATCH 09/16] fix: remove clap `arg` for structopt --- console/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/console/src/main.rs b/console/src/main.rs index 2ea7fe1e..d17d6315 100644 --- a/console/src/main.rs +++ b/console/src/main.rs @@ -21,7 +21,7 @@ struct Args { /// URL of a worker to connect to for auto-discovery (e.g. http://localhost:9001). /// The console calls GetClusterWorkers on this worker to discover the full cluster. /// Defaults to http://localhost:9001. - #[arg(long = "connect")] + #[structopt(long = "connect")] connect: Option, /// Polling interval in milliseconds From 972c1fd062bff246c6bfee86fd4a4a2e248438a8 Mon Sep 17 00:00:00 2001 From: Edson Petry Date: Tue, 17 Mar 2026 11:38:23 -0400 Subject: [PATCH 10/16] fix: remove duplicate worker resolvers, wrap worker resolvers in Arc --- console/examples/cluster.rs | 11 +++++------ console/examples/console_worker.rs | 15 ++++++++++----- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/console/examples/cluster.rs b/console/examples/cluster.rs index dc93eff4..4b8567d7 100644 --- a/console/examples/cluster.rs +++ b/console/examples/cluster.rs @@ -1,3 +1,4 @@ +use async_trait::async_trait; use datafusion::error::DataFusionError; use datafusion_distributed::{Worker, WorkerResolver}; use std::error::Error; @@ -43,20 +44,18 @@ async fn main() -> Result<(), Box> { .expect("port overflow: base_port + workers exceeds u16::MAX") }, ); - let localhost_resolver = Arc::new(LocalhostWorkerResolver { ports }); let listener = TcpListener::bind(addr).await?; let port = listener.local_addr()?.port(); ports.push(port); listeners.push(listener); } - // Create a shared resolver that knows about all workers - let resolver = LocalhostClusterResolver { - ports: Arc::new(RwLock::new(ports.clone())), - }; + let localhost_resolver = Arc::new(LocalhostWorkerResolver { + ports: ports.clone(), + }); for listener in listeners { - let resolver = resolver.clone(); + let resolver = localhost_resolver.clone(); tokio::spawn(async move { let worker = Worker::default(); diff --git a/console/examples/console_worker.rs b/console/examples/console_worker.rs index e647e203..44b96517 100644 --- a/console/examples/console_worker.rs +++ b/console/examples/console_worker.rs @@ -3,6 +3,7 @@ use datafusion::error::DataFusionError; use datafusion_distributed::{DEFAULT_WORKER_PORT, Worker, WorkerResolver}; use std::error::Error; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::sync::Arc; use structopt::StructOpt; use tonic::transport::Server; use url::Url; @@ -13,8 +14,8 @@ use url::Url; about = "A localhost DataFusion worker with observability" )] struct Args { - #[structopt(default_value = DEFAULT_WORKER_PORT)] - port: u16, + #[structopt(long = "cluster-ports")] + port: Option, /// The ports holding Distributed DataFusion workers. #[structopt(long = "cluster-ports", use_delimiter = true)] @@ -24,15 +25,19 @@ struct Args { #[tokio::main] async fn main() -> Result<(), Box> { let args = Args::from_args(); - let localhost_resolver = LocalhostWorkerResolver { + + let localhost_resolver = Arc::new(LocalhostWorkerResolver { ports: args.cluster_ports.clone(), - }; + }); let worker = Worker::default(); Server::builder() .add_service(worker.with_observability_service(localhost_resolver)) .add_service(worker.into_flight_server()) - .serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), args.port)) + .serve(SocketAddr::new( + IpAddr::V4(Ipv4Addr::LOCALHOST), + args.port.unwrap_or(DEFAULT_WORKER_PORT), + )) .await?; Ok(()) From 280af177cf654a1fad1cdf8d5bcb356792a70950 Mon Sep 17 00:00:00 2001 From: Edson Petry Date: Tue, 17 Mar 2026 11:38:44 -0400 Subject: [PATCH 11/16] fix: add ShuffleBench to lib.rs --- src/lib.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index be71d2e0..eb72042e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,6 +53,8 @@ pub use observability::{ }; pub use protobuf::StageKey; -/// Default port for workers to listen on (Flight + Observability gRPC services). -/// The console connects to `localhost:DEFAULT_WORKER_PORT` when no arguments are provided. + +#[cfg(any(feature = "integration", test))] +pub use execution_plans::benchmarks::ShuffleBench; + pub const DEFAULT_WORKER_PORT: u16 = 9001; From 1af846264792fbec81c42d85b7988a239b030c8a Mon Sep 17 00:00:00 2001 From: Edson Petry Date: Tue, 17 Mar 2026 12:14:12 -0400 Subject: [PATCH 12/16] docs(console): add README and update examples for auto-discovery workflow --- console/README.md | 88 ++++++++++++++++++++++++++++++++ console/examples/cluster.md | 43 ++++++++++------ console/examples/cluster.rs | 13 ++--- console/examples/console.md | 9 ++++ console/examples/tpcds_runner.md | 10 +++- 5 files changed, 137 insertions(+), 26 deletions(-) create mode 100644 console/README.md diff --git a/console/README.md b/console/README.md new file mode 100644 index 00000000..e2537ee6 --- /dev/null +++ b/console/README.md @@ -0,0 +1,88 @@ +# datafusion-distributed-console + +A terminal UI (TUI) for monitoring [DataFusion Distributed](../README.md) +clusters in real time. Built with [ratatui](https://ratatui.rs). + +## Quick-start + +```bash +# Start a local cluster (16 workers on ports 9001-9016) +cargo run -p datafusion-distributed-console --example cluster + +# In another terminal, open the console +cargo run -p datafusion-distributed-console +``` + +The console connects to `localhost:9001` by default and auto-discovers all +workers in the cluster via the `GetClusterWorkers` RPC. + +## Usage + +``` +datafusion-distributed-console [OPTIONS] +``` + +| Flag | Default | Description | +|--------------------|----------------------------|------------------------------------------------------| +| `--connect ` | `http://localhost:9001` | Seed worker URL for auto-discovery | +| `--poll-interval` | `100` | Polling interval in milliseconds | + +## Views + +### Cluster Overview (`1`) + +A table of all workers showing connection status, active tasks, queries in +flight, CPU usage, memory, and throughput. Columns are sortable. + +### Worker Detail (`2`) + +Drill into a single worker to see per-task progress (active and completed), +CPU/memory sparklines, and task durations. + +## Worker Discovery + +The console uses a single seed URL (`--connect`) to discover the full cluster. +On startup and every 5 seconds, it calls `GetClusterWorkers` on the seed worker, +which returns URLs for all known workers via its `WorkerResolver`. New workers +are added automatically; removed workers are cleaned up. + +## Monitoring an EC2 Benchmark Cluster + +The benchmark workers in [`benchmarks/cdk/`](../benchmarks/cdk/README.md) run on +EC2 instances with the observability service enabled. Each worker listens on port +9001 (gRPC/Flight + Observability) and port 9000 (HTTP benchmarks). The +`Ec2WorkerResolver` discovers peers via `DescribeInstances`, so connecting the +console to any single worker exposes the full cluster. + +To run the console, SSH into any instance in the cluster and install it there +(the console runs inside the VPC so it can reach all workers on their private IPs): + +```bash +# Connect to an instance via SSM +aws ssm start-session --target "$INSTANCE_ID" --region "$AWS_REGION" + +# Install the Rust toolchain +curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y +source "$HOME/.cargo/env" + +# Install the console binary from the repo +cargo install --locked --git https://github.com/datafusion-contrib/datafusion-distributed.git \ + datafusion-distributed-console + +# Run — auto-discovers all workers via localhost:9001 +datafusion-distributed-console +``` + +To connect to a specific worker (e.g. one on a different port or IP): + +```bash +datafusion-distributed-console --connect http://:9001 +``` + +## Examples + +| Example | Description | +|--------------------------------------------------|------------------------------------------------| +| [`cluster`](examples/cluster.md) | Start a local multi-worker cluster | +| [`console_worker`](examples/console.md) | Start individual workers with observability | +| [`tpcds_runner`](examples/tpcds_runner.md) | Run TPC-DS queries with live monitoring | diff --git a/console/examples/cluster.md b/console/examples/cluster.md index 6ae31577..438af92f 100644 --- a/console/examples/cluster.md +++ b/console/examples/cluster.md @@ -4,48 +4,61 @@ Starts an in-memory cluster of DataFusion distributed workers with observability enabled. This is the fastest way to get a local cluster running for use with the console TUI or the TPC-DS runner. -## Usage +## Quick-start -### Start a cluster with OS-assigned ports +```bash +# Terminal 1 — start 16 workers +cargo run -p datafusion-distributed-console --example cluster + +# Terminal 2 — open the console (auto-discovers all workers) +cargo run -p datafusion-distributed-console +``` + +No flags needed. The cluster connects to a worker on port 9001 by default. + +## Usage ```bash cargo run -p datafusion-distributed-console --example cluster ``` -This starts 16 workers on random ports and prints the commands to connect. +This starts 16 workers on ports 9001 through 9016 and prints the commands to connect. ### Customize the cluster ```bash cargo run -p datafusion-distributed-console --example cluster -- \ --workers 8 \ - --base-port 9000 + --base-port 5000 ``` | Flag | Default | Description | |-----------------|---------|----------------------------------------------------------| | `--workers` | 16 | Number of workers to start | -| `--base-port` | 0 | Starting port (0 = OS-assigned random ports) | +| `--base-port` | 9001 | Starting port; workers bind to consecutive ports | + +Workers bind to consecutive ports starting from `--base-port` +(e.g. `--base-port 5000` gives 5000, 5001, ..., 5015 for 16 workers). + +If you change the base port, tell the console which worker to connect to: -When `--base-port` is set, workers bind to consecutive ports starting from that -value (e.g. 9000, 9001, ..., 9007). +```bash +cargo run -p datafusion-distributed-console -- --connect http://localhost:5000 +``` ## Connecting to the cluster After starting, the example prints ready-to-use commands. For example: ``` -Started 4 workers on ports: 9000,9001,9002,9003 +Started 16 workers on ports: 9001,9002,...,9016 -Console: - cargo run -p datafusion-distributed-console -- --connect http://localhost:9000 +Console (connect to any worker for auto-discovery): + cargo run -p datafusion-distributed-console -- --connect http://localhost:9001 TPC-DS runner: - cargo run -p datafusion-distributed-console --example tpcds_runner -- --cluster-ports 9000,9001,9002,9003 + cargo run -p datafusion-distributed-console --example tpcds_runner -- --cluster-ports 9001,9002,...,9016 Single query: - cargo run -p datafusion-distributed-console --example console_run -- --cluster-ports 9000,9001,9002,9003 "SELECT 1" + cargo run -p datafusion-distributed-console --example console_run -- --cluster-ports 9001,9002,...,9016 "SELECT 1" ``` -Note: if one of the workers is on the default port (9001), you can just run -`cargo run -p datafusion-distributed-console` without any arguments. - Press `Ctrl+C` to stop all workers. diff --git a/console/examples/cluster.rs b/console/examples/cluster.rs index 4b8567d7..f06c8ff0 100644 --- a/console/examples/cluster.rs +++ b/console/examples/cluster.rs @@ -20,8 +20,7 @@ struct Args { workers: usize, /// Starting port. Workers bind to consecutive ports from this value. - /// If 0, the OS assigns random ports. - #[structopt(long, default_value = "0")] + #[structopt(long, default_value = "9001")] base_port: u16, } @@ -36,13 +35,9 @@ async fn main() -> Result<(), Box> { for i in 0..args.workers { let addr = SocketAddr::new( IpAddr::V4(Ipv4Addr::LOCALHOST), - if args.base_port == 0 { - 0 - } else { - args.base_port - .checked_add(i as u16) - .expect("port overflow: base_port + workers exceeds u16::MAX") - }, + args.base_port + .checked_add(i as u16) + .expect("port overflow: base_port + workers exceeds u16::MAX"), ); let listener = TcpListener::bind(addr).await?; let port = listener.local_addr()?.port(); diff --git a/console/examples/console.md b/console/examples/console.md index a7e6a21d..dd674371 100644 --- a/console/examples/console.md +++ b/console/examples/console.md @@ -5,6 +5,15 @@ while running distributed queries. ## Terminal 1: Start workers with observability +The easiest way is to use the cluster example, which starts 16 workers on ports +9001-9016 (see [cluster.md](cluster.md)): + +```bash + cargo run -p datafusion-distributed-console --example cluster +``` + +Or start individual workers manually: + ```bash cargo run -p datafusion-distributed-console --example console_worker cargo run -p datafusion-distributed-console --example console_worker -- 9002 diff --git a/console/examples/tpcds_runner.md b/console/examples/tpcds_runner.md index f68613cd..8d3d3fc7 100644 --- a/console/examples/tpcds_runner.md +++ b/console/examples/tpcds_runner.md @@ -17,9 +17,15 @@ may take a few minutes). ## Usage -### Step 1: Start Workers with Observability (Terminals 1-4) +### Step 1: Start Workers with Observability -Start 4 workers on different ports in different terminals: +The quickest way is to start a cluster (defaults to 16 workers on ports 9001-9016): + +```bash +cargo run -p datafusion-distributed-console --example cluster +``` + +Or start workers individually in separate terminals: ```bash cargo run -p datafusion-distributed-console --example console_worker From b7dda940bf0f632f6643372bb257605a323b2bc0 Mon Sep 17 00:00:00 2001 From: Edson Petry Date: Tue, 17 Mar 2026 15:23:32 -0400 Subject: [PATCH 13/16] doc(console): update README --- console/README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/console/README.md b/console/README.md index e2537ee6..42af4448 100644 --- a/console/README.md +++ b/console/README.md @@ -58,6 +58,9 @@ To run the console, SSH into any instance in the cluster and install it there (the console runs inside the VPC so it can reach all workers on their private IPs): ```bash +cd benchmarks/cdk/ +npm run deploy + # Connect to an instance via SSM aws ssm start-session --target "$INSTANCE_ID" --region "$AWS_REGION" From f5f7a8ebcdf2c143cdf77a99ce208e018753ef7d Mon Sep 17 00:00:00 2001 From: Edson Petry Date: Wed, 18 Mar 2026 10:07:35 -0400 Subject: [PATCH 14/16] refactor(console): replace --connect with required positional port argument Remove DEFAULT_WORKER_PORT from the library and make the console port a required positional argument instead of silently defaulting to 9001. --- benchmarks/cdk/bin/worker.rs | 17 ++++++++--------- console/README.md | 30 ++++++++++++------------------ console/examples/cluster.md | 10 ++++------ console/examples/cluster.rs | 2 +- console/examples/console.md | 15 ++++++--------- console/examples/console_worker.rs | 8 ++++---- console/examples/tpcds_runner.md | 6 ++---- console/src/main.rs | 12 ++++-------- src/lib.rs | 2 -- 9 files changed, 41 insertions(+), 61 deletions(-) diff --git a/benchmarks/cdk/bin/worker.rs b/benchmarks/cdk/bin/worker.rs index 4ad571c7..04d8cb0b 100644 --- a/benchmarks/cdk/bin/worker.rs +++ b/benchmarks/cdk/bin/worker.rs @@ -10,10 +10,9 @@ use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::physical_plan::execute_stream; use datafusion::prelude::SessionContext; use datafusion_distributed::{ - ChannelResolver, DEFAULT_WORKER_PORT, DistributedExt, DistributedMetricsFormat, - DistributedPhysicalOptimizerRule, Worker, WorkerResolver, display_plan_ascii, - get_distributed_channel_resolver, get_distributed_worker_resolver, - rewrite_distributed_plan_with_metrics, + ChannelResolver, DistributedExt, DistributedMetricsFormat, DistributedPhysicalOptimizerRule, + Worker, WorkerResolver, display_plan_ascii, get_distributed_channel_resolver, + get_distributed_worker_resolver, rewrite_distributed_plan_with_metrics, }; use futures::{StreamExt, TryFutureExt}; use log::{error, info, warn}; @@ -74,7 +73,7 @@ async fn main() -> Result<(), Box> { let cmd = Cmd::from_args(); const LISTENER_ADDR: &str = "0.0.0.0:9000"; - let worker_addr = format!("0.0.0.0:{DEFAULT_WORKER_PORT}"); + const WORKER_ADDR: &str = "0.0.0.0:9001"; info!("Starting HTTP listener on {LISTENER_ADDR}..."); let listener = tokio::net::TcpListener::bind(LISTENER_ADDR).await?; @@ -210,10 +209,10 @@ async fn main() -> Result<(), Box> { let grpc_server = Server::builder() .add_service(worker.with_observability_service(ec2_worker_resolver)) .add_service(worker.into_flight_server()) - .serve(worker_addr.parse()?); + .serve(WORKER_ADDR.parse()?); info!("Started listener HTTP server in {LISTENER_ADDR}"); - info!("Started distributed DataFusion worker in {worker_addr}"); + info!("Started distributed DataFusion worker in {WORKER_ADDR}"); tokio::select! { result = http_server => result?, @@ -293,8 +292,8 @@ async fn background_ec2_worker_resolver(urls: Arc>>) { for reservation in result.reservations() { for instance in reservation.instances() { if let Some(private_ip) = instance.private_ip_address() { - let url = Url::parse(&format!("http://{private_ip}:{DEFAULT_WORKER_PORT}")) - .unwrap(); + let url = + Url::parse(&format!("http://{private_ip}:9001")).unwrap(); workers.push(url); } } diff --git a/console/README.md b/console/README.md index 42af4448..81058f22 100644 --- a/console/README.md +++ b/console/README.md @@ -9,23 +9,23 @@ clusters in real time. Built with [ratatui](https://ratatui.rs). # Start a local cluster (16 workers on ports 9001-9016) cargo run -p datafusion-distributed-console --example cluster -# In another terminal, open the console -cargo run -p datafusion-distributed-console +# In another terminal, open the console (connect to any worker port) +cargo run -p datafusion-distributed-console -- 9001 ``` -The console connects to `localhost:9001` by default and auto-discovers all -workers in the cluster via the `GetClusterWorkers` RPC. +The console requires a port argument and auto-discovers all workers in the +cluster via the `GetClusterWorkers` RPC. ## Usage ``` -datafusion-distributed-console [OPTIONS] +datafusion-distributed-console [OPTIONS] ``` -| Flag | Default | Description | -|--------------------|----------------------------|------------------------------------------------------| -| `--connect ` | `http://localhost:9001` | Seed worker URL for auto-discovery | -| `--poll-interval` | `100` | Polling interval in milliseconds | +| Argument / Flag | Required | Description | +|--------------------|----------|------------------------------------------------------| +| `PORT` | Yes | Port of a seed worker for auto-discovery | +| `--poll-interval` | No | Polling interval in milliseconds (default: 100) | ## Views @@ -41,7 +41,7 @@ CPU/memory sparklines, and task durations. ## Worker Discovery -The console uses a single seed URL (`--connect`) to discover the full cluster. +The console uses a single seed port to discover the full cluster. On startup and every 5 seconds, it calls `GetClusterWorkers` on the seed worker, which returns URLs for all known workers via its `WorkerResolver`. New workers are added automatically; removed workers are cleaned up. @@ -72,14 +72,8 @@ source "$HOME/.cargo/env" cargo install --locked --git https://github.com/datafusion-contrib/datafusion-distributed.git \ datafusion-distributed-console -# Run — auto-discovers all workers via localhost:9001 -datafusion-distributed-console -``` - -To connect to a specific worker (e.g. one on a different port or IP): - -```bash -datafusion-distributed-console --connect http://:9001 +# Run — connect to the local worker on port 9001 +datafusion-distributed-console 9001 ``` ## Examples diff --git a/console/examples/cluster.md b/console/examples/cluster.md index 438af92f..312efdc0 100644 --- a/console/examples/cluster.md +++ b/console/examples/cluster.md @@ -10,12 +10,10 @@ console TUI or the TPC-DS runner. # Terminal 1 — start 16 workers cargo run -p datafusion-distributed-console --example cluster -# Terminal 2 — open the console (auto-discovers all workers) -cargo run -p datafusion-distributed-console +# Terminal 2 — open the console (connect to any worker port) +cargo run -p datafusion-distributed-console -- 9001 ``` -No flags needed. The cluster connects to a worker on port 9001 by default. - ## Usage ```bash @@ -43,7 +41,7 @@ Workers bind to consecutive ports starting from `--base-port` If you change the base port, tell the console which worker to connect to: ```bash -cargo run -p datafusion-distributed-console -- --connect http://localhost:5000 +cargo run -p datafusion-distributed-console -- 5000 ``` ## Connecting to the cluster @@ -54,7 +52,7 @@ After starting, the example prints ready-to-use commands. For example: Started 16 workers on ports: 9001,9002,...,9016 Console (connect to any worker for auto-discovery): - cargo run -p datafusion-distributed-console -- --connect http://localhost:9001 + cargo run -p datafusion-distributed-console -- 9001 TPC-DS runner: cargo run -p datafusion-distributed-console --example tpcds_runner -- --cluster-ports 9001,9002,...,9016 Single query: diff --git a/console/examples/cluster.rs b/console/examples/cluster.rs index f06c8ff0..cb541240 100644 --- a/console/examples/cluster.rs +++ b/console/examples/cluster.rs @@ -72,7 +72,7 @@ async fn main() -> Result<(), Box> { println!("Started {} workers on ports: {ports_csv}\n", args.workers); println!("Console (connect to any worker for auto-discovery):"); println!( - "\tcargo run -p datafusion-distributed-console -- --connect http://localhost:{}", + "\tcargo run -p datafusion-distributed-console -- {}", ports[0] ); println!("TPC-DS runner:"); diff --git a/console/examples/console.md b/console/examples/console.md index dd674371..871db718 100644 --- a/console/examples/console.md +++ b/console/examples/console.md @@ -15,26 +15,23 @@ The easiest way is to use the cluster example, which starts 16 workers on ports Or start individual workers manually: ```bash - cargo run -p datafusion-distributed-console --example console_worker + cargo run -p datafusion-distributed-console --example console_worker -- 9001 cargo run -p datafusion-distributed-console --example console_worker -- 9002 ``` -The first worker starts on the default port (9001). The second on 9002. - ## Terminal 2: Start console ```bash - cargo run -p datafusion-distributed-console + cargo run -p datafusion-distributed-console -- 9001 ``` -The console connects to `localhost:9001` by default and auto-discovers all -workers in the cluster via `GetClusterWorkers`. It will show "Waiting for -tasks..." until queries are executed. +The console auto-discovers all workers in the cluster via `GetClusterWorkers`. +It will show "Waiting for tasks..." until queries are executed. -To connect to a worker on a non-default port, use `--connect`: +To connect to a worker on a different port: ```bash - cargo run -p datafusion-distributed-console -- --connect http://localhost:9002 + cargo run -p datafusion-distributed-console -- 9002 ``` ## Terminal 3: Run a query diff --git a/console/examples/console_worker.rs b/console/examples/console_worker.rs index 44b96517..699110b0 100644 --- a/console/examples/console_worker.rs +++ b/console/examples/console_worker.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; use datafusion::error::DataFusionError; -use datafusion_distributed::{DEFAULT_WORKER_PORT, Worker, WorkerResolver}; +use datafusion_distributed::{Worker, WorkerResolver}; use std::error::Error; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Arc; @@ -14,8 +14,8 @@ use url::Url; about = "A localhost DataFusion worker with observability" )] struct Args { - #[structopt(long = "cluster-ports")] - port: Option, + /// Port to listen on. + port: u16, /// The ports holding Distributed DataFusion workers. #[structopt(long = "cluster-ports", use_delimiter = true)] @@ -36,7 +36,7 @@ async fn main() -> Result<(), Box> { .add_service(worker.into_flight_server()) .serve(SocketAddr::new( IpAddr::V4(Ipv4Addr::LOCALHOST), - args.port.unwrap_or(DEFAULT_WORKER_PORT), + args.port, )) .await?; diff --git a/console/examples/tpcds_runner.md b/console/examples/tpcds_runner.md index 8d3d3fc7..5b1c11f9 100644 --- a/console/examples/tpcds_runner.md +++ b/console/examples/tpcds_runner.md @@ -28,7 +28,7 @@ cargo run -p datafusion-distributed-console --example cluster Or start workers individually in separate terminals: ```bash -cargo run -p datafusion-distributed-console --example console_worker +cargo run -p datafusion-distributed-console --example console_worker -- 9001 cargo run -p datafusion-distributed-console --example console_worker -- 9002 cargo run -p datafusion-distributed-console --example console_worker -- 9003 cargo run -p datafusion-distributed-console --example console_worker -- 9004 @@ -37,11 +37,9 @@ cargo run -p datafusion-distributed-console --example console_worker -- 9004 ### Step 2: Start the Console (Terminal 5) ```bash -cargo run -p datafusion-distributed-console +cargo run -p datafusion-distributed-console -- 9001 ``` -The console auto-discovers all workers via the default port (9001). - ### Step 3: Run TPC-DS Queries (Terminal 6) #### Run a single query diff --git a/console/src/main.rs b/console/src/main.rs index d17d6315..504ef026 100644 --- a/console/src/main.rs +++ b/console/src/main.rs @@ -6,7 +6,6 @@ mod worker; use app::App; use crossterm::event::{self, Event}; -use datafusion_distributed::DEFAULT_WORKER_PORT; use ratatui::DefaultTerminal; use std::time::{Duration, Instant}; use structopt::StructOpt; @@ -18,11 +17,9 @@ use url::Url; about = "Console for monitoring DataFusion distributed workers" )] struct Args { - /// URL of a worker to connect to for auto-discovery (e.g. http://localhost:9001). + /// Port of a worker to connect to for auto-discovery. /// The console calls GetClusterWorkers on this worker to discover the full cluster. - /// Defaults to http://localhost:9001. - #[structopt(long = "connect")] - connect: Option, + port: u16, /// Polling interval in milliseconds #[structopt(long = "poll-interval", default_value = "100")] @@ -35,9 +32,8 @@ async fn main() -> color_eyre::Result<()> { let args = Args::from_args(); - let seed_url = args.connect.unwrap_or_else(|| { - Url::parse(&format!("http://localhost:{DEFAULT_WORKER_PORT}")).expect("valid default URL") - }); + let seed_url = + Url::parse(&format!("http://localhost:{}", args.port)).expect("valid URL"); let poll_interval = Duration::from_millis(args.poll_interval); let mut app = App::new(seed_url); diff --git a/src/lib.rs b/src/lib.rs index eb72042e..f14a32e3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,5 +56,3 @@ pub use protobuf::StageKey; #[cfg(any(feature = "integration", test))] pub use execution_plans::benchmarks::ShuffleBench; - -pub const DEFAULT_WORKER_PORT: u16 = 9001; From 8da964ea2a8ac08320fe1bcc0236f684999c06cd Mon Sep 17 00:00:00 2001 From: Edson Petry Date: Wed, 18 Mar 2026 10:13:03 -0400 Subject: [PATCH 15/16] fix: cargo fmt --- benchmarks/cdk/bin/worker.rs | 3 +-- console/examples/console_worker.rs | 5 +---- console/src/main.rs | 3 +-- 3 files changed, 3 insertions(+), 8 deletions(-) diff --git a/benchmarks/cdk/bin/worker.rs b/benchmarks/cdk/bin/worker.rs index 04d8cb0b..978522de 100644 --- a/benchmarks/cdk/bin/worker.rs +++ b/benchmarks/cdk/bin/worker.rs @@ -292,8 +292,7 @@ async fn background_ec2_worker_resolver(urls: Arc>>) { for reservation in result.reservations() { for instance in reservation.instances() { if let Some(private_ip) = instance.private_ip_address() { - let url = - Url::parse(&format!("http://{private_ip}:9001")).unwrap(); + let url = Url::parse(&format!("http://{private_ip}:9001")).unwrap(); workers.push(url); } } diff --git a/console/examples/console_worker.rs b/console/examples/console_worker.rs index 699110b0..80178e4d 100644 --- a/console/examples/console_worker.rs +++ b/console/examples/console_worker.rs @@ -34,10 +34,7 @@ async fn main() -> Result<(), Box> { Server::builder() .add_service(worker.with_observability_service(localhost_resolver)) .add_service(worker.into_flight_server()) - .serve(SocketAddr::new( - IpAddr::V4(Ipv4Addr::LOCALHOST), - args.port, - )) + .serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), args.port)) .await?; Ok(()) diff --git a/console/src/main.rs b/console/src/main.rs index 504ef026..731b504b 100644 --- a/console/src/main.rs +++ b/console/src/main.rs @@ -32,8 +32,7 @@ async fn main() -> color_eyre::Result<()> { let args = Args::from_args(); - let seed_url = - Url::parse(&format!("http://localhost:{}", args.port)).expect("valid URL"); + let seed_url = Url::parse(&format!("http://localhost:{}", args.port)).expect("valid URL"); let poll_interval = Duration::from_millis(args.poll_interval); let mut app = App::new(seed_url); From 8c87e4afc237a1f7eefadf538c21f5223126da95 Mon Sep 17 00:00:00 2001 From: Edson Petry Date: Wed, 18 Mar 2026 13:22:55 -0400 Subject: [PATCH 16/16] feat: add "system-metrics" feature to benchmarks Cargo.toml --- benchmarks/Cargo.toml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index ab671199..90921f6c 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -7,7 +7,10 @@ default-run = "dfbench" [dependencies] datafusion = { workspace = true } datafusion-proto = { workspace = true } -datafusion-distributed = { path = "..", features = ["integration"] } +datafusion-distributed = { path = "..", features = [ + "integration", + "system-metrics", +] } tokio = { version = "1.48", features = ["full"] } parquet = { version = "57.1.0" } structopt = { version = "0.3.26" }