Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ testdata/clickbench/*
!testdata/clickbench/queries
src/observability/gen/target/*
src/observability/gen/Cargo.lock
src/worker/gen/target/*
src/worker/gen/Cargo.lock
4 changes: 2 additions & 2 deletions benchmarks/cdk/bin/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let mut errors = vec![];
for worker_url in worker_resolver.get_urls().map_err(err)? {
if let Err(err) = channel_resolver
.get_flight_client_for_url(&worker_url)
.get_worker_client_for_url(&worker_url)
.await
{
errors.push(err.to_string())
Expand Down Expand Up @@ -208,7 +208,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let ec2_worker_resolver = Arc::new(Ec2WorkerResolver::new());
let grpc_server = Server::builder()
.add_service(worker.with_observability_service(ec2_worker_resolver))
.add_service(worker.into_flight_server())
.add_service(worker.into_worker_server())
.serve(WORKER_ADDR.parse()?);

info!("Started listener HTTP server in {LISTENER_ADDR}");
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl RunOpt {
let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
Ok::<_, Box<dyn Error + Send + Sync>>(
Server::builder()
.add_service(Worker::default().into_flight_server())
.add_service(Worker::default().into_worker_server())
.serve_with_incoming(incoming)
.await?,
)
Expand Down
2 changes: 1 addition & 1 deletion console/examples/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

Server::builder()
.add_service(worker.with_observability_service(resolver))
.add_service(worker.into_flight_server())
.add_service(worker.into_worker_server())
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
.await
.expect("worker server failed");
Expand Down
2 changes: 1 addition & 1 deletion console/examples/console_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

Server::builder()
.add_service(worker.with_observability_service(localhost_resolver))
.add_service(worker.into_flight_server())
.add_service(worker.into_worker_server())
.serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), args.port))
.await?;

Expand Down
2 changes: 1 addition & 1 deletion console/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl App {
let mut query_ids = HashSet::new();
for worker in &self.workers {
for task in &worker.tasks {
if let Some(sk) = &task.stage_key {
if let Some(sk) = &task.task_key {
query_ids.insert(&sk.query_id);
}
}
Expand Down
6 changes: 3 additions & 3 deletions console/src/ui/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ fn render_active_tasks(frame: &mut Frame, area: Rect, app: &mut App, idx: usize)
let mut task_indices: Vec<usize> = (0..worker.tasks.len()).collect();
task_indices.sort_by(|&a, &b| {
let dur_a = worker.tasks[a]
.stage_key
.task_key
.as_ref()
.map(|sk| worker.task_duration(&sk.query_id, sk.stage_id, sk.task_number))
.unwrap_or_default();
let dur_b = worker.tasks[b]
.stage_key
.task_key
.as_ref()
.map(|sk| worker.task_duration(&sk.query_id, sk.stage_id, sk.task_number))
.unwrap_or_default();
Expand All @@ -110,7 +110,7 @@ fn render_active_tasks(frame: &mut Frame, area: Rect, app: &mut App, idx: usize)
.iter()
.map(|&i| {
let task = &worker.tasks[i];
if let Some(sk) = &task.stage_key {
if let Some(sk) = &task.task_key {
let query_hex = hex_prefix(&sk.query_id, 8);
let duration = worker.task_duration(&sk.query_id, sk.stage_id, sk.task_number);
let dur_str = format_duration(duration);
Expand Down
10 changes: 5 additions & 5 deletions console/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl WorkerConn {
let new_task_keys: HashSet<TaskKey> = new_tasks
.iter()
.filter_map(|t| {
t.stage_key
t.task_key
.as_ref()
.map(|sk| (sk.query_id.clone(), sk.stage_id, sk.task_number))
})
Expand All @@ -176,7 +176,7 @@ impl WorkerConn {
// Detect completed tasks: tasks that were running but disappeared
for old_task in &self.tasks {
if old_task.status == TaskStatus::Running as i32 {
if let Some(sk) = &old_task.stage_key {
if let Some(sk) = &old_task.task_key {
let key = (sk.query_id.clone(), sk.stage_id, sk.task_number);
if !new_task_keys.contains(&key) {
// Task disappeared — assume completed
Expand Down Expand Up @@ -208,7 +208,7 @@ impl WorkerConn {
// Track first_seen for new tasks
let now = Instant::now();
for task in &new_tasks {
if let Some(sk) = &task.stage_key {
if let Some(sk) = &task.task_key {
let key = (sk.query_id.clone(), sk.stage_id, sk.task_number);
self.task_first_seen.entry(key).or_insert(now);
}
Expand All @@ -226,7 +226,7 @@ impl WorkerConn {
let mut has_running = false;

for task in &self.tasks {
if let Some(sk) = &task.stage_key {
if let Some(sk) = &task.task_key {
current_query_ids.insert(sk.query_id.clone());
if task.status == TaskStatus::Running as i32 {
has_running = true;
Expand Down Expand Up @@ -338,7 +338,7 @@ impl WorkerConn {
let ids: HashSet<_> = self
.tasks
.iter()
.filter_map(|t| t.stage_key.as_ref().map(|sk| &sk.query_id))
.filter_map(|t| t.task_key.as_ref().map(|sk| &sk.query_id))
.collect();
ids.len()
}
Expand Down
18 changes: 9 additions & 9 deletions docs/source/user-guide/channel-resolver.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

This trait is optional—a sensible default implementation exists that handles most use cases.

The `ChannelResolver` trait controls how Distributed DataFusion builds Arrow Flight clients backed by
The `ChannelResolver` trait controls how Distributed DataFusion builds Worker gRPC clients backed by
[Tonic](https://github.com/hyperium/tonic) channels for worker URLs.

The default implementation connects to each URL, builds an Arrow Flight client, and caches it for reuse on
The default implementation connects to each URL, builds a Worker client, and caches it for reuse on
subsequent requests to the same URL.

## Providing your own ChannelResolver
Expand All @@ -15,7 +15,7 @@ For providing your own implementation, you'll need to take into account the foll
- You will need to provide your own implementation in two places:
- in the `SessionContext` that first initiates and plans your queries.
- while instantiating the `Worker` with the `from_session_builder()` constructor.
- If building from scratch, ensure Arrow Flight clients are reused across requests rather than recreated each time.
- If building from scratch, ensure Worker clients are reused across requests rather than recreated each time.
- You can extend `DefaultChannelResolver` as a foundation for custom implementations. This automatically handles
gRPC channel reuse.

Expand All @@ -25,19 +25,19 @@ struct CustomChannelResolver;

#[async_trait]
impl ChannelResolver for CustomChannelResolver {
async fn get_flight_client_for_url(
async fn get_worker_client_for_url(
&self,
url: &Url,
) -> Result<FlightServiceClient<BoxCloneSyncChannel>, DataFusionError> {
// Build a custom FlightServiceClient wrapped with tower
) -> Result<WorkerServiceClient<BoxCloneSyncChannel>, DataFusionError> {
// Build a custom WorkerServiceClient wrapped with tower
// layers or something similar.
todo!()
}
}

async fn main() {
// Build a single instance for your application's lifetime
// to enable Arrow Flight client reuse across queries.
// to enable Worker client reuse across queries.
let channel_resolver = CustomChannelResolver;

let state = SessionStateBuilder::new()
Expand All @@ -56,10 +56,10 @@ async fn main() {
}
});
Server::builder()
.add_service(endpoint.into_flight_server())
.add_service(endpoint.into_worker_server())
.serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000))
.await?;

Ok(())
}
```
```
4 changes: 2 additions & 2 deletions docs/source/user-guide/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ a fully formed physical plan and injects the appropriate nodes to execute the qu
It builds the distributed plan from bottom to top, injecting network boundaries at appropriate locations based on
the nodes present in the original plan.

## [Worker](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/flight_service/worker.rs)
## [Worker](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/worker/worker_service.rs)

Arrow Flight server implementation that integrates with the Tonic ecosystem and listens to serialized plans that get
gRPC server implementation that integrates with the Tonic ecosystem and listens to serialized plans that get
executed over the wire.

Users are expected to build these and spawn them in ports so that the network boundary nodes can reach them.
Expand Down
6 changes: 3 additions & 3 deletions docs/source/user-guide/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Rather than imposing constraints on your infrastructure or query serving pattern
allows you to plug in your own networking stack and spawn your own gRPC servers that act as workers in the cluster.

This project heavily relies on the [Tonic](https://github.com/hyperium/tonic) ecosystem for the networking layer.
Users of this library are responsible for building their own Tonic server, adding the Arrow Flight distributed
Users of this library are responsible for building their own Tonic server, adding the distributed
DataFusion service to it and spawning it on a port so that it can be reached by other workers in the cluster. A very
basic example of this would be:

Expand All @@ -21,7 +21,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let worker = Worker::default();

Server::builder()
.add_service(worker.into_flight_server())
.add_service(worker.into_worker_server())
.serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000))
.await?;

Expand Down Expand Up @@ -74,7 +74,7 @@ This will leave a DataFusion `SessionContext` ready for executing distributed qu
Depending on your needs, your setup can get more complicated, for example:

- You may want to resolve worker URLs dynamically using the Kubernetes API.
- You may want to wrap the Arrow Flight clients that connect workers with an observability layer.
- You may want to wrap the Worker clients that connect workers with an observability layer.
- You may want to be able to execute your own custom ExecutionPlans in a distributed manner.
- etc...

Expand Down
16 changes: 8 additions & 8 deletions docs/source/user-guide/worker.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
# Spawn a Worker

The `Worker` is a gRPC server implementing the Arrow Flight protocol for distributed query execution. Worker nodes
The `Worker` is a gRPC server that handles distributed query execution. Worker nodes
run these endpoints to receive execution plans, execute them, and stream results back.

## Overview

The `Worker` is the core worker component in Distributed DataFusion. It:

- Receives serialized execution plans via Arrow Flight's `do_get` method
- Receives serialized execution plans via gRPC
- Deserializes plans using protobuf and user-provided codecs
- Executes plans using the local DataFusion runtime
- Streams results back as Arrow record batches through the gRPC Arrow Flight interface
- Streams results back as Arrow record batches through the gRPC interface

## Launching the Arrow Flight server
## Launching the Worker server

The default `Worker` implementation satisfies most basic use cases:

Expand All @@ -23,7 +23,7 @@ async fn main() {
let endpoint = Worker::default();

Server::builder()
.add_service(endpoint.into_flight_server())
.add_service(endpoint.into_worker_server())
.serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000))
.await?;

Expand All @@ -47,7 +47,7 @@ async fn main() {
let endpoint = Worker::from_session_builder(build_sate);

Server::builder()
.add_service(endpoint.into_flight_server())
.add_service(endpoint.into_worker_server())
.serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000))
.await?;

Expand Down Expand Up @@ -88,10 +88,10 @@ async fn main() {
let endpoint = Worker::default();

Server::builder()
.add_service(endpoint.into_flight_server())
.add_service(endpoint.into_worker_server())
.serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080))
.await?;
}
```

The `into_flight_server()` method builds a `FlightServiceServer` ready to be added as a Tonic service.
The `into_worker_server()` method builds a `WorkerServiceServer` ready to be added as a Tonic service.
14 changes: 7 additions & 7 deletions examples/in_memory_cluster.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use arrow::util::pretty::pretty_format_batches;
use arrow_flight::flight_service_client::FlightServiceClient;
use async_trait::async_trait;
use datafusion::common::DataFusionError;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use datafusion_distributed::{
BoxCloneSyncChannel, ChannelResolver, DistributedExt, DistributedPhysicalOptimizerRule, Worker,
WorkerQueryContext, WorkerResolver, create_flight_client, display_plan_ascii,
WorkerQueryContext, WorkerResolver, WorkerServiceClient, create_worker_client,
display_plan_ascii,
};
use futures::TryStreamExt;
use hyper_util::rt::TokioIo;
Expand Down Expand Up @@ -66,7 +66,7 @@ const DUMMY_URL: &str = "http://localhost:50051";
/// tokio duplex rather than a TCP connection.
#[derive(Clone)]
struct InMemoryChannelResolver {
channel: FlightServiceClient<BoxCloneSyncChannel>,
channel: WorkerServiceClient<BoxCloneSyncChannel>,
}

impl InMemoryChannelResolver {
Expand All @@ -84,7 +84,7 @@ impl InMemoryChannelResolver {
}));

let this = Self {
channel: create_flight_client(BoxCloneSyncChannel::new(channel)),
channel: create_worker_client(BoxCloneSyncChannel::new(channel)),
};
let this_clone = this.clone();

Expand All @@ -95,7 +95,7 @@ impl InMemoryChannelResolver {

tokio::spawn(async move {
Server::builder()
.add_service(endpoint.into_flight_server())
.add_service(endpoint.into_worker_server())
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
});
Expand All @@ -106,10 +106,10 @@ impl InMemoryChannelResolver {

#[async_trait]
impl ChannelResolver for InMemoryChannelResolver {
async fn get_flight_client_for_url(
async fn get_worker_client_for_url(
&self,
_: &url::Url,
) -> Result<FlightServiceClient<BoxCloneSyncChannel>, DataFusionError> {
) -> Result<WorkerServiceClient<BoxCloneSyncChannel>, DataFusionError> {
Ok(self.channel.clone())
}
}
Expand Down
2 changes: 1 addition & 1 deletion examples/localhost_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let args = Args::from_args();

Server::builder()
.add_service(Worker::default().into_flight_server())
.add_service(Worker::default().into_worker_server())
.serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), args.port))
.await?;

Expand Down
12 changes: 5 additions & 7 deletions src/distributed_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ pub trait DistributedExt: Sized {
/// Example:
///
/// ```
/// # use arrow_flight::flight_service_client::FlightServiceClient;
/// # use async_trait::async_trait;
/// # use datafusion::common::DataFusionError;
/// # use datafusion::execution::{SessionState, SessionStateBuilder};
Expand Down Expand Up @@ -228,29 +227,28 @@ pub trait DistributedExt: Sized {
resolver: T,
);

/// This is what tells Distributed DataFusion how to build an Arrow Flight client out of a worker URL.
/// This is what tells Distributed DataFusion how to build a Worker gRPC client out of a worker URL.
///
/// There's a default implementation that caches the Arrow Flight client instances so that there's
/// There's a default implementation that caches the Worker client instances so that there's
/// only one per URL, but users can decide to override that behavior in favor of their own solution.
///
/// Example:
///
/// ```
/// # use arrow_flight::flight_service_client::FlightServiceClient;
/// # use async_trait::async_trait;
/// # use datafusion::common::DataFusionError;
/// # use datafusion::execution::{SessionState, SessionStateBuilder};
/// # use datafusion::prelude::SessionConfig;
/// # use url::Url;
/// # use std::sync::Arc;
/// # use datafusion_distributed::{BoxCloneSyncChannel, ChannelResolver, DistributedExt, DistributedPhysicalOptimizerRule, WorkerQueryContext};
/// # use datafusion_distributed::{BoxCloneSyncChannel, ChannelResolver, DistributedExt, DistributedPhysicalOptimizerRule, WorkerQueryContext, WorkerServiceClient};
///
/// struct CustomChannelResolver;
///
/// #[async_trait]
/// impl ChannelResolver for CustomChannelResolver {
/// async fn get_flight_client_for_url(&self, url: &Url) -> Result<FlightServiceClient<BoxCloneSyncChannel>, DataFusionError> {
/// // Build a custom FlightServiceClient wrapped with tower layers or something similar.
/// async fn get_worker_client_for_url(&self, url: &Url) -> Result<WorkerServiceClient<BoxCloneSyncChannel>, DataFusionError> {
/// // Build a custom WorkerServiceClient wrapped with tower layers or something similar.
/// todo!()
/// }
/// }
Expand Down
Loading
Loading