Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ default-run = "dfbench"
[dependencies]
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
datafusion-distributed = { path = "..", features = ["integration"] }
datafusion-distributed = { path = "..", features = [
"integration",
"system-metrics",
] }
tokio = { version = "1.48", features = ["full"] }
parquet = { version = "57.1.0" }
structopt = { version = "0.3.26" }
Expand Down
2 changes: 2 additions & 0 deletions benchmarks/cdk/bin/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
}),
),
);
let ec2_worker_resolver = Arc::new(Ec2WorkerResolver::new());
let grpc_server = Server::builder()
.add_service(worker.with_observability_service(ec2_worker_resolver))
.add_service(worker.into_flight_server())
.serve(WORKER_ADDR.parse()?);

Expand Down
85 changes: 85 additions & 0 deletions console/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# datafusion-distributed-console

A terminal UI (TUI) for monitoring [DataFusion Distributed](../README.md)
clusters in real time. Built with [ratatui](https://ratatui.rs).

## Quick-start

```bash
# Start a local cluster (16 workers on ports 9001-9016)
cargo run -p datafusion-distributed-console --example cluster

# In another terminal, open the console (connect to any worker port)
cargo run -p datafusion-distributed-console -- 9001
```

The console requires a port argument and auto-discovers all workers in the
cluster via the `GetClusterWorkers` RPC.

## Usage

```
datafusion-distributed-console <PORT> [OPTIONS]
```

| Argument / Flag | Required | Description |
|--------------------|----------|------------------------------------------------------|
| `PORT` | Yes | Port of a seed worker for auto-discovery |
| `--poll-interval` | No | Polling interval in milliseconds (default: 100) |

## Views

### Cluster Overview (`1`)

A table of all workers showing connection status, active tasks, queries in
flight, CPU usage, memory, and throughput. Columns are sortable.

### Worker Detail (`2`)

Drill into a single worker to see per-task progress (active and completed),
CPU/memory sparklines, and task durations.

## Worker Discovery

The console uses a single seed port to discover the full cluster.
On startup and every 5 seconds, it calls `GetClusterWorkers` on the seed worker,
which returns URLs for all known workers via its `WorkerResolver`. New workers
are added automatically; removed workers are cleaned up.

## Monitoring an EC2 Benchmark Cluster

The benchmark workers in [`benchmarks/cdk/`](../benchmarks/cdk/README.md) run on
EC2 instances with the observability service enabled. Each worker listens on port
9001 (gRPC/Flight + Observability) and port 9000 (HTTP benchmarks). The
`Ec2WorkerResolver` discovers peers via `DescribeInstances`, so connecting the
console to any single worker exposes the full cluster.

To run the console, SSH into any instance in the cluster and install it there
(the console runs inside the VPC so it can reach all workers on their private IPs):

```bash
cd benchmarks/cdk/
npm run deploy

# Connect to an instance via SSM
aws ssm start-session --target "$INSTANCE_ID" --region "$AWS_REGION"

# Install the Rust toolchain
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
source "$HOME/.cargo/env"

# Install the console binary from the repo
cargo install --locked --git https://github.com/datafusion-contrib/datafusion-distributed.git \
datafusion-distributed-console

# Run — connect to the local worker on port 9001
datafusion-distributed-console 9001
```

## Examples

| Example | Description |
|--------------------------------------------------|------------------------------------------------|
| [`cluster`](examples/cluster.md) | Start a local multi-worker cluster |
| [`console_worker`](examples/console.md) | Start individual workers with observability |
| [`tpcds_runner`](examples/tpcds_runner.md) | Run TPC-DS queries with live monitoring |
38 changes: 26 additions & 12 deletions console/examples/cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,45 +4,59 @@ Starts an in-memory cluster of DataFusion distributed workers with observability
enabled. This is the fastest way to get a local cluster running for use with the
console TUI or the TPC-DS runner.

## Usage
## Quick-start

```bash
# Terminal 1 — start 16 workers
cargo run -p datafusion-distributed-console --example cluster

### Start a cluster with OS-assigned ports
# Terminal 2 — open the console (connect to any worker port)
cargo run -p datafusion-distributed-console -- 9001
```

## Usage

```bash
cargo run -p datafusion-distributed-console --example cluster
```

This starts 16 workers on random ports and prints the commands to connect.
This starts 16 workers on ports 9001 through 9016 and prints the commands to connect.

### Customize the cluster

```bash
cargo run -p datafusion-distributed-console --example cluster -- \
--workers 8 \
--base-port 9000
--base-port 5000
```

| Flag | Default | Description |
|-----------------|---------|----------------------------------------------------------|
| `--workers` | 16 | Number of workers to start |
| `--base-port` | 0 | Starting port (0 = OS-assigned random ports) |
| `--base-port` | 9001 | Starting port; workers bind to consecutive ports |

Workers bind to consecutive ports starting from `--base-port`
(e.g. `--base-port 5000` gives 5000, 5001, ..., 5015 for 16 workers).

If you change the base port, tell the console which worker to connect to:

When `--base-port` is set, workers bind to consecutive ports starting from that
value (e.g. 9000, 9001, ..., 9007).
```bash
cargo run -p datafusion-distributed-console -- 5000
```

## Connecting to the cluster

After starting, the example prints ready-to-use commands. For example:

```
Started 4 workers on ports: 9000,9001,9002,9003
Started 16 workers on ports: 9001,9002,...,9016

Console:
cargo run -p datafusion-distributed-console -- --cluster-ports 9000,9001,9002,9003
Console (connect to any worker for auto-discovery):
cargo run -p datafusion-distributed-console -- 9001
TPC-DS runner:
cargo run -p datafusion-distributed-console --example tpcds_runner -- --cluster-ports 9000,9001,9002,9003
cargo run -p datafusion-distributed-console --example tpcds_runner -- --cluster-ports 9001,9002,...,9016
Single query:
cargo run -p datafusion-distributed-console --example console_run -- --cluster-ports 9000,9001,9002,9003 "SELECT 1"
cargo run -p datafusion-distributed-console --example console_run -- --cluster-ports 9001,9002,...,9016 "SELECT 1"
```

Press `Ctrl+C` to stop all workers.
57 changes: 43 additions & 14 deletions console/examples/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use datafusion_distributed::Worker;
use async_trait::async_trait;
use datafusion::error::DataFusionError;
use datafusion_distributed::{Worker, WorkerResolver};
use std::error::Error;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use structopt::StructOpt;
use tokio::net::TcpListener;
use tonic::transport::Server;
use url::Url;

#[derive(StructOpt)]
#[structopt(
Expand All @@ -16,8 +20,7 @@ struct Args {
workers: usize,

/// Starting port. Workers bind to consecutive ports from this value.
/// If 0, the OS assigns random ports.
#[structopt(long, default_value = "0")]
#[structopt(long, default_value = "9001")]
base_port: u16,
}

Expand All @@ -26,27 +29,33 @@ async fn main() -> Result<(), Box<dyn Error>> {
let args = Args::from_args();

let mut ports = Vec::new();
let mut listeners = Vec::new();

// Bind all listeners first so we know all ports before starting workers
for i in 0..args.workers {
let addr = SocketAddr::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
if args.base_port == 0 {
0
} else {
args.base_port
.checked_add(i as u16)
.expect("port overflow: base_port + workers exceeds u16::MAX")
},
args.base_port
.checked_add(i as u16)
.expect("port overflow: base_port + workers exceeds u16::MAX"),
);
let listener = TcpListener::bind(addr).await?;
let port = listener.local_addr()?.port();
ports.push(port);
listeners.push(listener);
}

let localhost_resolver = Arc::new(LocalhostWorkerResolver {
ports: ports.clone(),
});

for listener in listeners {
let resolver = localhost_resolver.clone();
tokio::spawn(async move {
let worker = Worker::default();

Server::builder()
.add_service(worker.with_observability_service())
.add_service(worker.with_observability_service(resolver))
.add_service(worker.into_flight_server())
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
.await
Expand All @@ -61,8 +70,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
.join(",");

println!("Started {} workers on ports: {ports_csv}\n", args.workers);
println!("Console:");
println!("\tcargo run -p datafusion-distributed-console -- --cluster-ports {ports_csv}");
println!("Console (connect to any worker for auto-discovery):");
println!(
"\tcargo run -p datafusion-distributed-console -- {}",
ports[0]
);
println!("TPC-DS runner:");
println!(
"\tcargo run -p datafusion-distributed-console --example tpcds_runner -- --cluster-ports {ports_csv}"
Expand All @@ -73,8 +85,25 @@ async fn main() -> Result<(), Box<dyn Error>> {
);
println!("Press Ctrl+C to stop all workers.");

// Block forever
tokio::signal::ctrl_c().await?;

Ok(())
}

#[derive(Clone)]
struct LocalhostWorkerResolver {
ports: Vec<u16>,
}

#[async_trait]
impl WorkerResolver for LocalhostWorkerResolver {
fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
self.ports
.iter()
.map(|port| {
let url_string = format!("http://localhost:{port}");
Url::parse(&url_string).map_err(|e| DataFusionError::External(Box::new(e)))
})
.collect::<Result<Vec<Url>, _>>()
}
}
27 changes: 21 additions & 6 deletions console/examples/console.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,41 @@ while running distributed queries.

## Terminal 1: Start workers with observability

The easiest way is to use the cluster example, which starts 16 workers on ports
9001-9016 (see [cluster.md](cluster.md)):

```bash
cargo run -p datafusion-distributed-console --example cluster
```

Or start individual workers manually:

```bash
cargo run -p datafusion-distributed-console --example console_worker -- 8080
cargo run -p datafusion-distributed-console --example console_worker -- 8081
cargo run -p datafusion-distributed-console --example console_worker -- 9001
cargo run -p datafusion-distributed-console --example console_worker -- 9002
```

## Terminal 2: Start console

```bash
cargo run -p datafusion-distributed-console -- --cluster-ports 8080,8081
cargo run -p datafusion-distributed-console -- 9001
```

The TUI console will start and connect to the workers. It will show "Waiting for tasks..."
until queries are executed.
The console auto-discovers all workers in the cluster via `GetClusterWorkers`.
It will show "Waiting for tasks..." until queries are executed.

To connect to a worker on a different port:

```bash
cargo run -p datafusion-distributed-console -- 9002
```

## Terminal 3: Run a query

```bash
cargo run -p datafusion-distributed-console --example console_run -- \
"SELECT * FROM weather LIMIT 100" \
--cluster-ports 8080,8081
--cluster-ports 9001,9002
```

The console will display real-time task progress across all workers.
35 changes: 32 additions & 3 deletions console/examples/console_worker.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,59 @@
use datafusion_distributed::Worker;
use async_trait::async_trait;
use datafusion::error::DataFusionError;
use datafusion_distributed::{Worker, WorkerResolver};
use std::error::Error;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use structopt::StructOpt;
use tonic::transport::Server;
use url::Url;

#[derive(StructOpt)]
#[structopt(
name = "console_worker",
about = "A localhost DataFusion worker with observability"
)]
struct Args {
#[structopt(default_value = "8080")]
/// Port to listen on.
port: u16,

/// The ports holding Distributed DataFusion workers.
#[structopt(long = "cluster-ports", use_delimiter = true)]
cluster_ports: Vec<u16>,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let args = Args::from_args();

let localhost_resolver = Arc::new(LocalhostWorkerResolver {
ports: args.cluster_ports.clone(),
});
let worker = Worker::default();

Server::builder()
.add_service(worker.with_observability_service())
.add_service(worker.with_observability_service(localhost_resolver))
.add_service(worker.into_flight_server())
.serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), args.port))
.await?;

Ok(())
}

#[derive(Clone)]
struct LocalhostWorkerResolver {
ports: Vec<u16>,
}

#[async_trait]
impl WorkerResolver for LocalhostWorkerResolver {
fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
self.ports
.iter()
.map(|port| {
let url_string = format!("http://localhost:{port}");
Url::parse(&url_string).map_err(|e| DataFusionError::External(Box::new(e)))
})
.collect::<Result<Vec<Url>, _>>()
}
}
Loading
Loading