diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index ab671199..90921f6c 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -7,7 +7,10 @@ default-run = "dfbench" [dependencies] datafusion = { workspace = true } datafusion-proto = { workspace = true } -datafusion-distributed = { path = "..", features = ["integration"] } +datafusion-distributed = { path = "..", features = [ + "integration", + "system-metrics", +] } tokio = { version = "1.48", features = ["full"] } parquet = { version = "57.1.0" } structopt = { version = "0.3.26" } diff --git a/benchmarks/cdk/bin/worker.rs b/benchmarks/cdk/bin/worker.rs index 5d019c5e..978522de 100644 --- a/benchmarks/cdk/bin/worker.rs +++ b/benchmarks/cdk/bin/worker.rs @@ -205,7 +205,9 @@ async fn main() -> Result<(), Box> { }), ), ); + let ec2_worker_resolver = Arc::new(Ec2WorkerResolver::new()); let grpc_server = Server::builder() + .add_service(worker.with_observability_service(ec2_worker_resolver)) .add_service(worker.into_flight_server()) .serve(WORKER_ADDR.parse()?); diff --git a/console/README.md b/console/README.md new file mode 100644 index 00000000..81058f22 --- /dev/null +++ b/console/README.md @@ -0,0 +1,85 @@ +# datafusion-distributed-console + +A terminal UI (TUI) for monitoring [DataFusion Distributed](../README.md) +clusters in real time. Built with [ratatui](https://ratatui.rs). + +## Quick-start + +```bash +# Start a local cluster (16 workers on ports 9001-9016) +cargo run -p datafusion-distributed-console --example cluster + +# In another terminal, open the console (connect to any worker port) +cargo run -p datafusion-distributed-console -- 9001 +``` + +The console requires a port argument and auto-discovers all workers in the +cluster via the `GetClusterWorkers` RPC. + +## Usage + +``` +datafusion-distributed-console [OPTIONS] +``` + +| Argument / Flag | Required | Description | +|--------------------|----------|------------------------------------------------------| +| `PORT` | Yes | Port of a seed worker for auto-discovery | +| `--poll-interval` | No | Polling interval in milliseconds (default: 100) | + +## Views + +### Cluster Overview (`1`) + +A table of all workers showing connection status, active tasks, queries in +flight, CPU usage, memory, and throughput. Columns are sortable. + +### Worker Detail (`2`) + +Drill into a single worker to see per-task progress (active and completed), +CPU/memory sparklines, and task durations. + +## Worker Discovery + +The console uses a single seed port to discover the full cluster. +On startup and every 5 seconds, it calls `GetClusterWorkers` on the seed worker, +which returns URLs for all known workers via its `WorkerResolver`. New workers +are added automatically; removed workers are cleaned up. + +## Monitoring an EC2 Benchmark Cluster + +The benchmark workers in [`benchmarks/cdk/`](../benchmarks/cdk/README.md) run on +EC2 instances with the observability service enabled. Each worker listens on port +9001 (gRPC/Flight + Observability) and port 9000 (HTTP benchmarks). The +`Ec2WorkerResolver` discovers peers via `DescribeInstances`, so connecting the +console to any single worker exposes the full cluster. + +To run the console, SSH into any instance in the cluster and install it there +(the console runs inside the VPC so it can reach all workers on their private IPs): + +```bash +cd benchmarks/cdk/ +npm run deploy + +# Connect to an instance via SSM +aws ssm start-session --target "$INSTANCE_ID" --region "$AWS_REGION" + +# Install the Rust toolchain +curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y +source "$HOME/.cargo/env" + +# Install the console binary from the repo +cargo install --locked --git https://github.com/datafusion-contrib/datafusion-distributed.git \ + datafusion-distributed-console + +# Run — connect to the local worker on port 9001 +datafusion-distributed-console 9001 +``` + +## Examples + +| Example | Description | +|--------------------------------------------------|------------------------------------------------| +| [`cluster`](examples/cluster.md) | Start a local multi-worker cluster | +| [`console_worker`](examples/console.md) | Start individual workers with observability | +| [`tpcds_runner`](examples/tpcds_runner.md) | Run TPC-DS queries with live monitoring | diff --git a/console/examples/cluster.md b/console/examples/cluster.md index 253656c3..312efdc0 100644 --- a/console/examples/cluster.md +++ b/console/examples/cluster.md @@ -4,45 +4,59 @@ Starts an in-memory cluster of DataFusion distributed workers with observability enabled. This is the fastest way to get a local cluster running for use with the console TUI or the TPC-DS runner. -## Usage +## Quick-start + +```bash +# Terminal 1 — start 16 workers +cargo run -p datafusion-distributed-console --example cluster -### Start a cluster with OS-assigned ports +# Terminal 2 — open the console (connect to any worker port) +cargo run -p datafusion-distributed-console -- 9001 +``` + +## Usage ```bash cargo run -p datafusion-distributed-console --example cluster ``` -This starts 16 workers on random ports and prints the commands to connect. +This starts 16 workers on ports 9001 through 9016 and prints the commands to connect. ### Customize the cluster ```bash cargo run -p datafusion-distributed-console --example cluster -- \ --workers 8 \ - --base-port 9000 + --base-port 5000 ``` | Flag | Default | Description | |-----------------|---------|----------------------------------------------------------| | `--workers` | 16 | Number of workers to start | -| `--base-port` | 0 | Starting port (0 = OS-assigned random ports) | +| `--base-port` | 9001 | Starting port; workers bind to consecutive ports | + +Workers bind to consecutive ports starting from `--base-port` +(e.g. `--base-port 5000` gives 5000, 5001, ..., 5015 for 16 workers). + +If you change the base port, tell the console which worker to connect to: -When `--base-port` is set, workers bind to consecutive ports starting from that -value (e.g. 9000, 9001, ..., 9007). +```bash +cargo run -p datafusion-distributed-console -- 5000 +``` ## Connecting to the cluster After starting, the example prints ready-to-use commands. For example: ``` -Started 4 workers on ports: 9000,9001,9002,9003 +Started 16 workers on ports: 9001,9002,...,9016 -Console: - cargo run -p datafusion-distributed-console -- --cluster-ports 9000,9001,9002,9003 +Console (connect to any worker for auto-discovery): + cargo run -p datafusion-distributed-console -- 9001 TPC-DS runner: - cargo run -p datafusion-distributed-console --example tpcds_runner -- --cluster-ports 9000,9001,9002,9003 + cargo run -p datafusion-distributed-console --example tpcds_runner -- --cluster-ports 9001,9002,...,9016 Single query: - cargo run -p datafusion-distributed-console --example console_run -- --cluster-ports 9000,9001,9002,9003 "SELECT 1" + cargo run -p datafusion-distributed-console --example console_run -- --cluster-ports 9001,9002,...,9016 "SELECT 1" ``` Press `Ctrl+C` to stop all workers. diff --git a/console/examples/cluster.rs b/console/examples/cluster.rs index 73260c05..cb541240 100644 --- a/console/examples/cluster.rs +++ b/console/examples/cluster.rs @@ -1,9 +1,13 @@ -use datafusion_distributed::Worker; +use async_trait::async_trait; +use datafusion::error::DataFusionError; +use datafusion_distributed::{Worker, WorkerResolver}; use std::error::Error; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::sync::Arc; use structopt::StructOpt; use tokio::net::TcpListener; use tonic::transport::Server; +use url::Url; #[derive(StructOpt)] #[structopt( @@ -16,8 +20,7 @@ struct Args { workers: usize, /// Starting port. Workers bind to consecutive ports from this value. - /// If 0, the OS assigns random ports. - #[structopt(long, default_value = "0")] + #[structopt(long, default_value = "9001")] base_port: u16, } @@ -26,27 +29,33 @@ async fn main() -> Result<(), Box> { let args = Args::from_args(); let mut ports = Vec::new(); + let mut listeners = Vec::new(); + // Bind all listeners first so we know all ports before starting workers for i in 0..args.workers { let addr = SocketAddr::new( IpAddr::V4(Ipv4Addr::LOCALHOST), - if args.base_port == 0 { - 0 - } else { - args.base_port - .checked_add(i as u16) - .expect("port overflow: base_port + workers exceeds u16::MAX") - }, + args.base_port + .checked_add(i as u16) + .expect("port overflow: base_port + workers exceeds u16::MAX"), ); let listener = TcpListener::bind(addr).await?; let port = listener.local_addr()?.port(); ports.push(port); + listeners.push(listener); + } + + let localhost_resolver = Arc::new(LocalhostWorkerResolver { + ports: ports.clone(), + }); + for listener in listeners { + let resolver = localhost_resolver.clone(); tokio::spawn(async move { let worker = Worker::default(); Server::builder() - .add_service(worker.with_observability_service()) + .add_service(worker.with_observability_service(resolver)) .add_service(worker.into_flight_server()) .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) .await @@ -61,8 +70,11 @@ async fn main() -> Result<(), Box> { .join(","); println!("Started {} workers on ports: {ports_csv}\n", args.workers); - println!("Console:"); - println!("\tcargo run -p datafusion-distributed-console -- --cluster-ports {ports_csv}"); + println!("Console (connect to any worker for auto-discovery):"); + println!( + "\tcargo run -p datafusion-distributed-console -- {}", + ports[0] + ); println!("TPC-DS runner:"); println!( "\tcargo run -p datafusion-distributed-console --example tpcds_runner -- --cluster-ports {ports_csv}" @@ -73,8 +85,25 @@ async fn main() -> Result<(), Box> { ); println!("Press Ctrl+C to stop all workers."); - // Block forever tokio::signal::ctrl_c().await?; Ok(()) } + +#[derive(Clone)] +struct LocalhostWorkerResolver { + ports: Vec, +} + +#[async_trait] +impl WorkerResolver for LocalhostWorkerResolver { + fn get_urls(&self) -> Result, DataFusionError> { + self.ports + .iter() + .map(|port| { + let url_string = format!("http://localhost:{port}"); + Url::parse(&url_string).map_err(|e| DataFusionError::External(Box::new(e))) + }) + .collect::, _>>() + } +} diff --git a/console/examples/console.md b/console/examples/console.md index 03710e86..871db718 100644 --- a/console/examples/console.md +++ b/console/examples/console.md @@ -5,26 +5,41 @@ while running distributed queries. ## Terminal 1: Start workers with observability +The easiest way is to use the cluster example, which starts 16 workers on ports +9001-9016 (see [cluster.md](cluster.md)): + +```bash + cargo run -p datafusion-distributed-console --example cluster +``` + +Or start individual workers manually: + ```bash - cargo run -p datafusion-distributed-console --example console_worker -- 8080 - cargo run -p datafusion-distributed-console --example console_worker -- 8081 + cargo run -p datafusion-distributed-console --example console_worker -- 9001 + cargo run -p datafusion-distributed-console --example console_worker -- 9002 ``` ## Terminal 2: Start console ```bash - cargo run -p datafusion-distributed-console -- --cluster-ports 8080,8081 + cargo run -p datafusion-distributed-console -- 9001 ``` -The TUI console will start and connect to the workers. It will show "Waiting for tasks..." -until queries are executed. +The console auto-discovers all workers in the cluster via `GetClusterWorkers`. +It will show "Waiting for tasks..." until queries are executed. + +To connect to a worker on a different port: + +```bash + cargo run -p datafusion-distributed-console -- 9002 +``` ## Terminal 3: Run a query ```bash cargo run -p datafusion-distributed-console --example console_run -- \ "SELECT * FROM weather LIMIT 100" \ - --cluster-ports 8080,8081 + --cluster-ports 9001,9002 ``` The console will display real-time task progress across all workers. diff --git a/console/examples/console_worker.rs b/console/examples/console_worker.rs index 189e8230..80178e4d 100644 --- a/console/examples/console_worker.rs +++ b/console/examples/console_worker.rs @@ -1,8 +1,12 @@ -use datafusion_distributed::Worker; +use async_trait::async_trait; +use datafusion::error::DataFusionError; +use datafusion_distributed::{Worker, WorkerResolver}; use std::error::Error; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::sync::Arc; use structopt::StructOpt; use tonic::transport::Server; +use url::Url; #[derive(StructOpt)] #[structopt( @@ -10,21 +14,46 @@ use tonic::transport::Server; about = "A localhost DataFusion worker with observability" )] struct Args { - #[structopt(default_value = "8080")] + /// Port to listen on. port: u16, + + /// The ports holding Distributed DataFusion workers. + #[structopt(long = "cluster-ports", use_delimiter = true)] + cluster_ports: Vec, } #[tokio::main] async fn main() -> Result<(), Box> { let args = Args::from_args(); + let localhost_resolver = Arc::new(LocalhostWorkerResolver { + ports: args.cluster_ports.clone(), + }); let worker = Worker::default(); Server::builder() - .add_service(worker.with_observability_service()) + .add_service(worker.with_observability_service(localhost_resolver)) .add_service(worker.into_flight_server()) .serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), args.port)) .await?; Ok(()) } + +#[derive(Clone)] +struct LocalhostWorkerResolver { + ports: Vec, +} + +#[async_trait] +impl WorkerResolver for LocalhostWorkerResolver { + fn get_urls(&self) -> Result, DataFusionError> { + self.ports + .iter() + .map(|port| { + let url_string = format!("http://localhost:{port}"); + Url::parse(&url_string).map_err(|e| DataFusionError::External(Box::new(e))) + }) + .collect::, _>>() + } +} diff --git a/console/examples/tpcds_runner.md b/console/examples/tpcds_runner.md index 630d9396..5b1c11f9 100644 --- a/console/examples/tpcds_runner.md +++ b/console/examples/tpcds_runner.md @@ -17,21 +17,27 @@ may take a few minutes). ## Usage -### Step 1: Start Workers with Observability (Terminals 1-4) +### Step 1: Start Workers with Observability -Start 4 workers on different ports in different terminals: +The quickest way is to start a cluster (defaults to 16 workers on ports 9001-9016): ```bash -cargo run -p datafusion-distributed-console --example -- console_worker -- 8080 -cargo run -p datafusion-distributed-console --example -- console_worker -- 8081 -cargo run -p datafusion-distributed-console --example -- console_worker -- 8082 -cargo run -p datafusion-distributed-console --example -- console_worker -- 8083 +cargo run -p datafusion-distributed-console --example cluster +``` + +Or start workers individually in separate terminals: + +```bash +cargo run -p datafusion-distributed-console --example console_worker -- 9001 +cargo run -p datafusion-distributed-console --example console_worker -- 9002 +cargo run -p datafusion-distributed-console --example console_worker -- 9003 +cargo run -p datafusion-distributed-console --example console_worker -- 9004 ``` ### Step 2: Start the Console (Terminal 5) ```bash -cargo run -p datafusion-distributed-console +cargo run -p datafusion-distributed-console -- 9001 ``` ### Step 3: Run TPC-DS Queries (Terminal 6) @@ -39,14 +45,14 @@ cargo run -p datafusion-distributed-console #### Run a single query ```bash -cargo run -p datafusion-distributed-console --example tpcds_runner \ - --cluster-ports 8080,8081,8082,8083 \ +cargo run -p datafusion-distributed-console --example tpcds_runner -- \ + --cluster-ports 9001,9002,9003,9004 \ --query q99 ``` #### Run all TPC-DS queries sequentially ```bash -cargo run -p datafusion-distributed-console --example tpcds_runner \ - --cluster-ports 8080,8081,8082,8083 \ +cargo run -p datafusion-distributed-console --example tpcds_runner -- \ + --cluster-ports 9001,9002,9003,9004 ``` diff --git a/console/src/app.rs b/console/src/app.rs index 43a23e3d..04ec2b19 100644 --- a/console/src/app.rs +++ b/console/src/app.rs @@ -1,5 +1,5 @@ use crate::state::{ClusterViewState, SortColumn, SortDirection, View, WorkerViewState}; -use crate::worker::{ConnectionStatus, WorkerConn}; +use crate::worker::{ConnectionStatus, WorkerConn, discover_cluster_workers}; use std::collections::HashSet; use std::time::{Duration, Instant}; use url::Url; @@ -21,6 +21,10 @@ pub(crate) struct App { prev_output_rows_time: Option, /// Smoothed cluster-wide throughput in rows/s. pub(crate) current_throughput: f64, + /// Seed URL for worker discovery via `GetClusterWorkers`. + seed_url: Url, + /// Last time we ran worker discovery. + last_discovery: Option, } /// Cluster-wide statistics for the header. @@ -35,13 +39,14 @@ pub(crate) struct ClusterStats { pub(crate) active_queries: usize, } -impl App { - /// Create a new App with the given worker URLs. - pub(crate) fn new(worker_urls: Vec) -> Self { - let workers = worker_urls.into_iter().map(WorkerConn::new).collect(); +/// Interval between worker discovery polls. +const DISCOVERY_INTERVAL: Duration = Duration::from_secs(5); +impl App { + /// Create a new App that discovers workers via `GetClusterWorkers` on the seed URL. + pub(crate) fn new(seed_url: Url) -> Self { App { - workers, + workers: Vec::new(), active_query_count: 0, current_view: View::ClusterOverview, cluster_state: ClusterViewState::default(), @@ -54,6 +59,8 @@ impl App { prev_output_rows_total: 0, prev_output_rows_time: None, current_throughput: 0.0, + seed_url, + last_discovery: None, } } @@ -63,6 +70,9 @@ impl App { return; } + // Run worker discovery if in auto mode + self.maybe_discover_workers().await; + // Attempt connection for workers in Connecting or Disconnected state for worker in &mut self.workers { if worker.should_retry_connection() { @@ -89,6 +99,39 @@ impl App { self.update_throughput(); } + /// Periodically discovers workers via `GetClusterWorkers` on the seed URL. + async fn maybe_discover_workers(&mut self) { + let should_discover = match self.last_discovery { + None => true, + Some(last) => last.elapsed() >= DISCOVERY_INTERVAL, + }; + + if !should_discover { + return; + } + + self.last_discovery = Some(Instant::now()); + + let discovered_urls = match discover_cluster_workers(&self.seed_url).await { + Ok(urls) => urls, + Err(_) => return, + }; + + // Build set of currently known URLs (owned to avoid borrow conflict) + let known_urls: HashSet = self.workers.iter().map(|w| w.url.clone()).collect(); + + // Add new workers + for url in &discovered_urls { + if !known_urls.contains(url) { + self.workers.push(WorkerConn::new(url.clone())); + } + } + + // Remove workers that are no longer in the discovered set + let discovered_set: HashSet<&Url> = discovered_urls.iter().collect(); + self.workers.retain(|w| discovered_set.contains(&w.url)); + } + /// Update cluster-wide throughput from output rows delta. fn update_throughput(&mut self) { let current_total: u64 = self.workers.iter().map(|w| w.output_rows_total).sum(); diff --git a/console/src/main.rs b/console/src/main.rs index bb3b6f57..731b504b 100644 --- a/console/src/main.rs +++ b/console/src/main.rs @@ -17,9 +17,9 @@ use url::Url; about = "Console for monitoring DataFusion distributed workers" )] struct Args { - /// Comma-delimited list of worker ports (assumed localhost) - #[structopt(long = "cluster-ports", use_delimiter = true)] - cluster_ports: Vec, + /// Port of a worker to connect to for auto-discovery. + /// The console calls GetClusterWorkers on this worker to discover the full cluster. + port: u16, /// Polling interval in milliseconds #[structopt(long = "poll-interval", default_value = "100")] @@ -32,14 +32,10 @@ async fn main() -> color_eyre::Result<()> { let args = Args::from_args(); - let worker_urls: Vec = args - .cluster_ports - .iter() - .map(|port| Url::parse(&format!("http://localhost:{port}")).expect("valid localhost URL")) - .collect(); + let seed_url = Url::parse(&format!("http://localhost:{}", args.port)).expect("valid URL"); let poll_interval = Duration::from_millis(args.poll_interval); - let mut app = App::new(worker_urls); + let mut app = App::new(seed_url); let mut terminal = ratatui::init(); terminal.clear()?; diff --git a/console/src/worker.rs b/console/src/worker.rs index 6bf4a919..ffc4c14e 100644 --- a/console/src/worker.rs +++ b/console/src/worker.rs @@ -1,5 +1,6 @@ use datafusion_distributed::{ - GetTaskProgressRequest, ObservabilityServiceClient, PingRequest, TaskProgress, TaskStatus, + GetClusterWorkersRequest, GetTaskProgressRequest, ObservabilityServiceClient, PingRequest, + TaskProgress, TaskStatus, }; use std::collections::{HashMap, HashSet, VecDeque}; use std::time::{Duration, Instant}; @@ -364,3 +365,29 @@ fn push_history(buf: &mut VecDeque, value: u64) { } buf.push_back(value); } + +/// Connects to a seed worker and calls `GetClusterWorkers` to discover all worker URLs. +pub(crate) async fn discover_cluster_workers(seed_url: &Url) -> Result, String> { + let mut client = ObservabilityServiceClient::connect(seed_url.to_string()) + .await + .map_err(|e| format!("Failed to connect to seed worker {seed_url}: {e}"))?; + + client + .ping(PingRequest {}) + .await + .map_err(|e| format!("Seed worker {seed_url} ping failed: {e}"))?; + + let response = client + .get_cluster_workers(GetClusterWorkersRequest {}) + .await + .map_err(|e| format!("GetClusterWorkers failed on {seed_url}: {e}"))?; + + let urls = response + .into_inner() + .worker_urls + .into_iter() + .filter_map(|s| Url::parse(&s).ok()) + .collect(); + + Ok(urls) +} diff --git a/src/flight_service/worker.rs b/src/flight_service/worker.rs index b277b400..d98cb497 100644 --- a/src/flight_service/worker.rs +++ b/src/flight_service/worker.rs @@ -2,7 +2,9 @@ use crate::flight_service::WorkerSessionBuilder; use crate::flight_service::do_action::{INIT_ACTION_TYPE, TaskData}; use crate::flight_service::single_write_multi_read::SingleWriteMultiRead; use crate::protobuf::StageKey; -use crate::{DefaultSessionBuilder, ObservabilityServiceImpl, ObservabilityServiceServer}; +use crate::{ + DefaultSessionBuilder, ObservabilityServiceImpl, ObservabilityServiceServer, WorkerResolver, +}; use arrow_flight::flight_service_server::{FlightService, FlightServiceServer}; use arrow_flight::{ Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, @@ -133,11 +135,18 @@ impl Worker { .max_encoding_message_size(usize::MAX) } + /// Creates an [`ObservabilityServiceServer`] that exposes task progress and cluster + /// worker discovery via the provided [`WorkerResolver`]. + /// + /// The returned server is meant to be added to the same [`tonic::transport::Server`] as the + /// Flight service — gRPC multiplexes both services on a single port. pub fn with_observability_service( &self, + worker_resolver: Arc, ) -> ObservabilityServiceServer { ObservabilityServiceServer::new(ObservabilityServiceImpl::new( self.task_data_entries.clone(), + worker_resolver, )) } diff --git a/src/lib.rs b/src/lib.rs index f56f0384..f14a32e3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -46,9 +46,10 @@ pub use stage::{ }; pub use observability::{ - GetTaskProgressRequest, GetTaskProgressResponse, ObservabilityService, - ObservabilityServiceClient, ObservabilityServiceImpl, ObservabilityServiceServer, PingRequest, - PingResponse, StageKey as ObservabilityStageKey, TaskProgress, TaskStatus, WorkerMetrics, + GetClusterWorkersRequest, GetClusterWorkersResponse, GetTaskProgressRequest, + GetTaskProgressResponse, ObservabilityService, ObservabilityServiceClient, + ObservabilityServiceImpl, ObservabilityServiceServer, PingRequest, PingResponse, + StageKey as ObservabilityStageKey, TaskProgress, TaskStatus, WorkerMetrics, }; pub use protobuf::StageKey; diff --git a/src/observability/generated/observability.rs b/src/observability/generated/observability.rs index a032eb07..cc7fe513 100644 --- a/src/observability/generated/observability.rs +++ b/src/observability/generated/observability.rs @@ -46,6 +46,13 @@ pub struct GetTaskProgressResponse { #[prost(message, optional, tag = "2")] pub worker_metrics: ::core::option::Option, } +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct GetClusterWorkersRequest {} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct GetClusterWorkersResponse { + #[prost(string, repeated, tag = "1")] + pub worker_urls: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum TaskStatus { @@ -198,6 +205,25 @@ pub mod observability_service_client { )); self.inner.unary(req, path, codec).await } + pub async fn get_cluster_workers( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> + { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/observability.ObservabilityService/GetClusterWorkers", + ); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new( + "observability.ObservabilityService", + "GetClusterWorkers", + )); + self.inner.unary(req, path, codec).await + } } } /// Generated server implementations. @@ -221,6 +247,10 @@ pub mod observability_service_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; + async fn get_cluster_workers( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; } #[derive(Debug)] pub struct ObservabilityServiceServer { @@ -377,6 +407,49 @@ pub mod observability_service_server { }; Box::pin(fut) } + "/observability.ObservabilityService/GetClusterWorkers" => { + #[allow(non_camel_case_types)] + struct GetClusterWorkersSvc(pub Arc); + impl + tonic::server::UnaryService + for GetClusterWorkersSvc + { + type Response = super::GetClusterWorkersResponse; + type Future = BoxFuture, tonic::Status>; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_cluster_workers(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetClusterWorkersSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => Box::pin(async move { let mut response = http::Response::new(tonic::body::Body::default()); let headers = response.headers_mut(); diff --git a/src/observability/mod.rs b/src/observability/mod.rs index 6c07b7b5..c48906ae 100644 --- a/src/observability/mod.rs +++ b/src/observability/mod.rs @@ -7,7 +7,8 @@ pub use generated::observability::observability_service_server::{ }; pub use generated::observability::{ - GetTaskProgressRequest, GetTaskProgressResponse, PingRequest, PingResponse, StageKey, - TaskProgress, TaskStatus, WorkerMetrics, + GetClusterWorkersRequest, GetClusterWorkersResponse, GetTaskProgressRequest, + GetTaskProgressResponse, PingRequest, PingResponse, StageKey, TaskProgress, TaskStatus, + WorkerMetrics, }; pub use service::ObservabilityServiceImpl; diff --git a/src/observability/proto/observability.proto b/src/observability/proto/observability.proto index 8e0f8d15..6c0c403d 100644 --- a/src/observability/proto/observability.proto +++ b/src/observability/proto/observability.proto @@ -4,6 +4,7 @@ package observability; service ObservabilityService { rpc Ping (PingRequest) returns (PingResponse); rpc GetTaskProgress (GetTaskProgressRequest) returns (GetTaskProgressResponse); + rpc GetClusterWorkers (GetClusterWorkersRequest) returns (GetClusterWorkersResponse); } message PingRequest {} @@ -44,3 +45,9 @@ message GetTaskProgressResponse { repeated TaskProgress tasks = 1; WorkerMetrics worker_metrics = 2; } + +message GetClusterWorkersRequest {} + +message GetClusterWorkersResponse { + repeated string worker_urls = 1; +} diff --git a/src/observability/service.rs b/src/observability/service.rs index 3a9cd38a..00672677 100644 --- a/src/observability/service.rs +++ b/src/observability/service.rs @@ -1,4 +1,5 @@ use crate::flight_service::{SingleWriteMultiRead, TaskData}; +use crate::networking::WorkerResolver; use crate::protobuf::StageKey; use datafusion::error::DataFusionError; use datafusion::physical_plan::ExecutionPlan; @@ -13,14 +14,18 @@ use tokio::sync::watch; use tonic::{Request, Response, Status}; use super::{ - GetTaskProgressResponse, ObservabilityService, TaskProgress, TaskStatus, WorkerMetrics, - generated::observability::{GetTaskProgressRequest, PingRequest, PingResponse}, + GetClusterWorkersResponse, GetTaskProgressResponse, ObservabilityService, TaskProgress, + TaskStatus, WorkerMetrics, + generated::observability::{ + GetClusterWorkersRequest, GetTaskProgressRequest, PingRequest, PingResponse, + }, }; type ResultTaskData = Result>; pub struct ObservabilityServiceImpl { task_data_entries: Arc>>>, + worker_resolver: Arc, #[cfg(feature = "system-metrics")] system: watch::Receiver, } @@ -28,6 +33,7 @@ pub struct ObservabilityServiceImpl { impl ObservabilityServiceImpl { pub fn new( task_data_entries: Arc>>>, + worker_resolver: Arc, ) -> Self { #[cfg(feature = "system-metrics")] let (tx, rx) = tokio::sync::watch::channel(WorkerMetrics::default()); @@ -64,9 +70,9 @@ impl ObservabilityServiceImpl { } }); } - Self { task_data_entries, + worker_resolver, #[cfg(feature = "system-metrics")] system: rx, } @@ -112,6 +118,20 @@ impl ObservabilityService for ObservabilityServiceImpl { worker_metrics, })) } + + async fn get_cluster_workers( + &self, + _request: Request, + ) -> Result, Status> { + let urls = self + .worker_resolver + .get_urls() + .map_err(|e| Status::internal(format!("Failed to resolve workers: {e}")))?; + + let worker_urls = urls.into_iter().map(|url| url.to_string()).collect(); + + Ok(Response::new(GetClusterWorkersResponse { worker_urls })) + } } impl ObservabilityServiceImpl {