Skip to content

Move to dedicated Worker gRPC specification instead of Arrow Flight#375

Merged
gabotechs merged 13 commits intomainfrom
gabrielmusat/custom-worker-service
Mar 24, 2026
Merged

Move to dedicated Worker gRPC specification instead of Arrow Flight#375
gabotechs merged 13 commits intomainfrom
gabrielmusat/custom-worker-service

Conversation

@gabotechs
Copy link
Copy Markdown
Collaborator

@gabotechs gabotechs commented Mar 18, 2026

Closes #373

This PR moves away from using the Arrow Flight protobuf specification as application protocol for communicating workers in favor of using a dedicated protobuf specification. The rationale behind that can be seen in #373.

This is a breaking API change that will prevent old workers work with new ones, so it should be carefully rolled out.

Part of the reason of moving to a dedicated protobuf specification is about commitment to API stability: we want to reduce as much as possible future breaking API changes, and we expect that maintaining a proper gRPC protobuf specification will help with that.

The migration should be simple:

  • People will just need to rename some methods in their codebase (e.g., into_flight_server -> into_worker_server). Just following the Rust compiler here is enough
  • During rollout, old workers should not talk to new ones, and viceversa, otherwise queries will fail. There's ongoing work for making this easier for people Worker Versioning #380

Besides just moving to a dedicated protocol, this PR does two additional things:

  • Rename StageKey to TaskKey, as it was uniquely identifying tasks and not stages 882bec2
  • Rearrange the protobuf messages so that the do not send unnecessary stuff 0f42b5f

If we take into account that ~710 LOC are now automatically generated out of a .proto file, we are looking at a +724-924 net change, implying a total net reduction of lines of ~200, which sounds pretty good and expected.


Remote benchmarks against main.
TL;DR: the same

  ---
  TPC-H SF1 Results

  ┌───────┬───────────┬────────────────────────────┬──────────────┐
  │ Query │ main (ms) │ custom-worker-service (ms) │     Diff     │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q1    │ 288       │ 249                        │ 1.16x faster │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q2    │ 345       │ 412                        │ 1.19x slower │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q3    │ 592       │ 612                        │ 1.03x slower │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q4    │ 206       │ 185                        │ 1.11x faster │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q5    │ 312       │ 307                        │ 1.02x faster │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q6    │ 209       │ 249                        │ 1.19x slower │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q7    │ 362       │ 375                        │ 1.04x slower │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q8    │ 410       │ 397                        │ 1.03x faster │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q9    │ 430       │ 446                        │ 1.04x slower │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q10   │ 256       │ 430                        │ 1.68x slower │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q11   │ 326       │ 360                        │ 1.10x slower │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q12   │ 250       │ 335                        │ 1.34x slower │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q13   │ 230       │ 294                        │ 1.28x slower │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q14   │ 233       │ 323                        │ 1.39x slower │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q15   │ 287       │ 398                        │ 1.39x slower │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q16   │ 378       │ 372                        │ 1.02x faster │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q17   │ 424       │ 430                        │ 1.01x slower │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q18   │ 436       │ 294                        │ 1.48x faster │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q19   │ 640       │ 637                        │ 1.00x ~same  │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q20   │ 605       │ 415                        │ 1.46x faster │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q21   │ 691       │ 597                        │ 1.16x faster │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q22   │ 411       │ 295                        │ 1.39x faster │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ TOTAL │ 25.0s     │ 25.3s                      │ 1.01x slower │
  └───────┴───────────┴────────────────────────────┴──────────────┘

  TPC-H SF10 Results

  ┌───────┬───────────┬────────────────────────────┬──────────────┐
  │ Query │ main (ms) │ custom-worker-service (ms) │     Diff     │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q1    │ 1373      │ 1319                       │ 1.04x faster │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q2    │ 472       │ 454                        │ 1.04x faster │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q3    │ 1031      │ 1031                       │ 1.00x same   │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q4    │ 593       │ 582                        │ 1.02x faster │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q5    │ 1465      │ 1482                       │ 1.01x slower │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q6    │ 801       │ 702                        │ 1.14x faster │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q7    │ 1617      │ 1598                       │ 1.01x faster │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q8    │ 1804      │ 1821                       │ 1.01x slower │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q9    │ 2381      │ 2275                       │ 1.05x faster │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q10   │ 1260      │ 1115                       │ 1.13x faster │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q11   │ 427       │ 379                        │ 1.13x faster │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q12   │ 934       │ 976                        │ 1.04x slower │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q13   │ 674       │ 708                        │ 1.05x slower │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q14   │ 946       │ 881                        │ 1.07x faster │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q15   │ 1141      │ 1005                       │ 1.14x faster │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q16   │ 365       │ 481                        │ 1.32x slower │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q17   │ 2037      │ 1998                       │ 1.02x faster │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q18   │ 1925      │ 2066                       │ 1.07x slower │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q19   │ 1237      │ 1264                       │ 1.02x slower │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q20   │ 1080      │ 1206                       │ 1.12x slower │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q21   │ 2431      │ 2311                       │ 1.05x faster │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ q22   │ 493       │ 484                        │ 1.02x faster │
  ├───────┼───────────┼────────────────────────────┼──────────────┤
  │ TOTAL │ 79.5s     │ 78.4s                      │ 1.01x faster │
  └───────┴───────────┴────────────────────────────┴──────────────┘

Copy link
Copy Markdown
Collaborator

@gene-bordegaray gene-bordegaray left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a confusion with the relationship between Worker and WorkerService that could get some help from a comment or two and to document API changes. But this implementation seems much more fitting for the use cases compared to using Flight and has some nice clean ups 😄

@gene-bordegaray
Copy link
Copy Markdown
Collaborator

A follow up question. I wouldn't expect there to be much / any changes to bench results since this is not affecting the data streaming. Any insight as to why we see much faster / slower result in cases?

@gabotechs
Copy link
Copy Markdown
Collaborator Author

A follow up question. I wouldn't expect there to be much / any changes to bench results since this is not affecting the data streaming. Any insight as to why we see much faster / slower result in cases?

For TPCH_SF1, the queries are very small computationally speaking, so the network noisy becomes very relevant. A better indicator for those queries is the total execution time, which we report at the bottom. That one is less influenced by the noise.

For the TPCH_SF10, I see them pretty much within the noisy threshold, anything that is within the +-20% range can perfectly be just noise. All other engines so pretty much this same noise, and I imagine it can be attributed to:

  • real network between nodes
  • network calls to S3

@gabotechs gabotechs marked this pull request as ready for review March 20, 2026 17:37
@gabotechs
Copy link
Copy Markdown
Collaborator Author

Added a couple of extra commits with some cleanups now that we are reworking the protocol:

  • Rename StageKey to TaskKey, as it was uniquely identifying tasks and not stages 882bec2
  • Rearrange the protobuf messages so that the do not send unnecessary stuff 0f42b5f

Comment on lines -94 to -100
let task_ctx = Arc::new(task_ctx_with_extension(
&task_ctx,
DistributedTaskContext {
task_index: doget.target_task_index as usize,
task_count: doget.target_task_count as usize,
},
));
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reworking a bit the protocol, this no longer needs to be set multiple times per execute_task call, it can be just set once at the instantiation of the task in set_plan

Comment on lines +60 to +64
let mut cfg =
SessionConfig::default().with_extension(Arc::new(DistributedTaskContext {
task_index: key.task_number as usize,
task_count: body.task_count as usize,
}));
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the information for building a DistributedTaskContext exists at the moment of calling set_plan so it can be set once here, rather than multiple times per execute_task call

# Conflicts:
#	benchmarks/cdk/bin/worker.rs
#	console/examples/cluster.rs
#	console/examples/console_worker.rs
#	src/lib.rs
#	src/observability/mod.rs
#	src/observability/service.rs
#	src/worker/worker_service.rs
Copy link
Copy Markdown
Collaborator

@gene-bordegaray gene-bordegaray left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a error message nit and API thing but looks good 💯

@gabotechs gabotechs merged commit 556a5de into main Mar 24, 2026
13 of 14 checks passed
@gabotechs gabotechs deleted the gabrielmusat/custom-worker-service branch March 24, 2026 15:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Use a dedicated gRPC service for workers instead of Arrow Flight

3 participants