Skip to content

Add latency tracking for MSE adaptive routing#18646

Open
timothy-e wants to merge 6 commits into
apache:masterfrom
timothy-e:timothy-e-mse-ar-latency
Open

Add latency tracking for MSE adaptive routing#18646
timothy-e wants to merge 6 commits into
apache:masterfrom
timothy-e:timothy-e-mse-ar-latency

Conversation

@timothy-e
Copy link
Copy Markdown
Contributor

@timothy-e timothy-e commented Jun 1, 2026

#18553 added in-flight request tracking for MSE in the adaptive routing stats. This PR adds per-server latency. It looks very large, but much of it is tests, and some new files were introduced, which comes with a certain amount of boilerplate in Java.

1. The planner marks stages as needing timing. It assigns FragmentTypes and then the dispatcher uses those to turn those roles into timing collection in AdaptiveRoutingStageClassification.java. it always enables collection on stage 0, and otherwise only on trusted stages that either directly receive from pure leaves or are singleton-leaf stages themselves, while excluding stages fed by non-leaf senders so their timings do not include upstream cascade delay.

2. Stages track their elapsed time and return it as UPSTREAM_SERVER_RESPONSE_TIMES_MS stat (BaseMailboxReceiveOperator, BlockingMultiStreamConsumer, AdaptiveRoutingUpstreamTimings)

Each BaseMailboxReceiveOperator tracks per-sender wall-clock elapsed time (from when its BlockingMultiStreamConsumer.OfMseBlock is constructed until each sender's EOS arrives). The start time is captured at consumer construction time, not at query start, to avoid pipeline-breaker inflation: if a pipeline breaker on this stage blocks for 1000ms waiting on a slow sender, fast senders' EOS blocks sit unconsumed in their queues during that time, and measuring from query start would report ~1000ms for all of them.

These per-sender timings are encoded as "hostname|port=elapsedMs;..." in a new UPSTREAM_SERVER_RESPONSE_TIMES_MS stat key on BaseMailboxReceiveOperator.StatKey and propagated up through the query stats. A serialized string is used because it is more straightforward than adding a Map type to the StatKey system.

3. Broker reads leaf-server timings and resolves hostname|mailboxPort sender keys to full instance IDs and accumulates the maximum observed latency per instance across all trusted stages. A decrementedServers set prevents double-recording.

4. Query timeout: on query cancellation (e.g. reaching the timeout), we try to cancel with stats. We preserve completed timings and inject elapsed time for pending senders.

Testing

1. Performance impact: We ran some performance tests and saw no impact.

2. Manually with tc

Choose a server that we want to induce latency on. ssh to that server and:

sudo tc qdisc add dev ens5 root handle 1: prio
sudo tc qdisc add dev ens5 parent 1:3 handle 30: netem delay 1200ms

sudo tc filter add dev ens5 protocol ip parent 1:0 prio 3 u32 \
  match ip dport 8442 0xffff flowid 1:3

We induced latency only for MSE to validate that adaptive routing can be influenced by only MSE queries.

To reset the latency

sudo tc qdisc del dev ens5 root # cleanup

We saw the score spike for the targetted servers.
3. Chaos Agent
We kicked off a chaos agent run that does

  1. 90s of 600ms MSE latency
  2. 30s with no faults
  3. 90s of 600ms SSE latency
  4. 90s with no faults
  5. 90s of 1200ms MSE latency
  6. 30s with no faults
  7. 90s of 1200ms SSE latency

(query timeout is 1s).

image

timothy-e added 6 commits June 1, 2026 12:10
cc stripe-private-oss-forks/pinot-reviewers
r? dang saiswapnilar

https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/597 added in-flight request tracking for MSE in the adaptive routing stats. This PR adds per-server latency. It looks very large, but ~950 lines are tests, and two new files were introduced, which comes with a certain amount of boilerplate in Java.

**1. Per-sender elapsed time in `UPSTREAM_SERVER_RESPONSE_TIMES_MS` stat** (`BaseMailboxReceiveOperator`, `BlockingMultiStreamConsumer`, `AdaptiveRoutingUpstreamTimings`)

Each `BaseMailboxReceiveOperator` now tracks per-sender wall-clock elapsed time (from when its `BlockingMultiStreamConsumer.OfMseBlock` is constructed until each sender's EOS arrives). The start time is captured at consumer construction time, not at query start, to avoid pipeline-breaker inflation: if a pipeline breaker on this stage blocks for 1000ms waiting on a slow sender, fast senders' EOS blocks sit unconsumed in their queues during that time, and measuring from query start would report ~1000ms for all of them.

These per-sender timings are encoded as `"hostname|port=elapsedMs;..."` in a new `UPSTREAM_SERVER_RESPONSE_TIMES_MS` stat key on `BaseMailboxReceiveOperator.StatKey` and propagated up through the query stats. A serialized string is used because it is more straightforward than adding a Map type to the `StatKey` system.

**2. Broker reads leaf-server timings via `StageClassification` + `extractMaxTimingsPerInstance`** (`QueryDispatcher`, `StageClassification`)

After `runReducer` completes, the broker classifies the query plan using `StageClassification.classify()`. Two kinds of stages are "trusted" (their `UPSTREAM_SERVER_RESPONSE_TIMES_MS` stats are consulted):
 - **Pure leaf receivers**: stages that directly receive from a non-SINGLETON leaf stage.
 - **SINGLETON leaf stages receiving from another leaf**: a leaf stage that scans a dimension table and receives upstream data via a SINGLETON exchange, but only when the sender stage is also a leaf.

Stages that receive from any non-leaf (intermediate) stage are excluded because the non-leaf sender may have waited on a slow upstream cascade, inflating per-sender timings at the receiver.

Only pure leaf servers (non-SINGLETON) are added to `_trackedServers` and are eligible for EMA updates. SINGLETON leaf and intermediate servers are excluded because their timings reflect cascade delays rather than their own scan performance.

For each trusted stage, the broker resolves `hostname|mailboxPort` sender keys to full instance IDs and accumulates the maximum observed latency per instance across all trusted stages. A `decrementedServers` set prevents double-recording.

**3. Fallback latency for servers not covered by extracted timings** (`QueryDispatcher` finally block)
Two fallback paths handle servers not recorded via `extractMaxTimingsPerInstance`:
- **Fallback 1 (no partial timings at all)**: If `decrementedServers` is empty (e.g. the query timed out before any stats returned), all incremented servers are recorded with `-1L` (no timing available). This avoids marking all servers with a misleading full-elapsed-time value when we have no real signal.
 - **Fallback 2 (partial timings received)**: For servers not yet decremented:
   - If the server is in `trackedServers` (a pure leaf server): records the full elapsed time (`System.currentTimeMillis() - submitTimeMs`), since the server likely timed out or was unresponsive.
   - Otherwise (intermediate/SINGLETON server): records `-1L` (not tracked for EMA).

[STREAMANALYTICS-4418](https://jira.corp.stripe.com/browse/STREAMANALYTICS-4418)

I ran a PerformanceTestWorkflow
```
{
    "numUsers": 5,
    "clusterInfo": {"clusterName": "rad-canary", "clusterRegion":
  "northwest"},
    "tenant": "long-lived-a",
    "durationMinutes": 90,
    "loadTestConfig": {
      "perQueryExpectations": {
        "sum_payments_group_by_merchant_since_1h_v2": {"minQps":
  2.0, "maxP99DurationMs": 244, "maxP50DurationMs": 165,
  "minSuccessRate": 0.999},
        "count_sum_payments_performance_since_1h_v2": {"minQps":
  2.0, "maxP99DurationMs": 229, "maxP50DurationMs": 150,
  "minSuccessRate": 0.999},
        "select_payments_three_table_join_performance_since_1h_v2":
   {"minQps": 2.0, "maxP99DurationMs": 376, "maxP50DurationMs":
  255, "minSuccessRate": 0.999},
        "select_payments_cte_join_window_performance_since_1h_v2":
  {"minQps": 2.0, "maxP99DurationMs": 432, "maxP50DurationMs": 322,
   "minSuccessRate": 0.999},
        "select_payments_union_join_performance_since_1h_v2":
  {"minQps": 2.0, "maxP99DurationMs": 339, "maxP50DurationMs": 184,
   "minSuccessRate": 0.999},
        "select_payments_nested_subquery_performance_since_1h_v2":
  {"minQps": 2.0, "maxP99DurationMs": 196, "maxP50DurationMs": 101,
   "minSuccessRate": 0.999}
      },
      "aggregateExpectations": {"minQps": 16.0, "maxP99DurationMs":
   432, "maxP50DurationMs": 322, "minSuccessRate": 0.999}
    }
  }
```

These queries are all complex MSE queries that mirror some of the patterns in the billing analytics MRR queries.

I also added [another server to reach replica in canary, and another replica](https://git.corp.stripe.com/stripe-internal/mint/pull/2124801), to further validate the distribution.

And induced latency in one of two ways:

**1. Manually with tc**

Choose a server that we want to induce latency on. `ssh` to that server and:

```
sudo tc qdisc add dev ens5 root handle 1: prio
sudo tc qdisc add dev ens5 parent 1:3 handle 30: netem delay 1200ms

sudo tc filter add dev ens5 protocol ip parent 1:0 prio 3 u32 \
  match ip dport 8442 0xffff flowid 1:3

```

We induced latency only for MSE to validate that adaptive routing can be influenced by only MSE queries.

To reset the latency

```
sudo tc qdisc del dev ens5 root # cleanup
```

We left this on for [a while](https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=2m&var-host_cluster=northwest&var-pinot_cluster=rad-canary&var-pinot_tenant=long-lived-a&var-host_type=pinotdbstreaming&from=1777497188195&to=1777502138949), and we can see there's a small amount of contamination to another server, but generally the server with the induced latency is identified as a clear outlier.

**With Chaos Scenarios**

Or run a [chaos-scenario](https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=30s&var-host_cluster=northwest&var-pinot_cluster=rad-canary&var-pinot_tenant=long-lived-a&var-host_type=pinotdbstreaming&from=1777557375599&to=1777558632565) that runs a 5 minute 600ms latency increase, then a 1m 1200ms increase, then a 1m blackhole.

And with adaptive routing off, [we see](https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=30s&var-host_cluster=northwest&var-pinot_cluster=rad-canary&var-pinot_tenant=long-lived-a&var-host_type=pinotdbstreaming&from=1777557375599&to=1777558632565)
* 2960 total timeouts
* 2140 timeouts from the 5 minute 600ms spike.
* Overall QPS dropped from 55 to 14 during the 600ms / 1200ms / blackhole.
* p50 increased from 70ms to 360ms
* p75 increased from 90ms to 810ms
* p99 increased from 130ms to 890ms

And with adaptive routing on, [we see](https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=2m&var-host_cluster=northwest&var-pinot_cluster=rad-canary&var-pinot_tenant=long-lived-a&var-host_type=pinotdbstreaming&from=1777560397381&to=1777561273611)
* 931 total timeouts (3x reduction vs adaptive routing off)
* 662 timeouts from the 5 minute 600ms spike (3x reduction vs adaptive routing off)
* Overall QPS dropped from 53 to 43 during the 600ms spike, and 33 during the 1200ms / blackhole.
* p50 increased from 70ms to 120ms (3x reduction vs adaptive routing off)
* p75 increased from 90ms to 130ms (6x reduction vs adaptive routing off)
* p99 increased from 130ms to 820ms

Degraded servers are detected better than servers that never respond at all (or respond after the query timeout). But they do still appear in the latency stats.

**Rad Rose**

Got claude to generate a [load test script](https://git.corp.stripe.com/gist/timothye/a2346d0af61476c4032fc623cc5a7298) based on the [billing analytics queries](https://git.corp.stripe.com/gist/dang/bb38f694ea62b0f92c7a8fec944f6612).  It
* queries the _rose instead of _testing tables
* replaces the merchant in the queries with a merchant with actual data in QA
* ensured all queries return some results by changing time filters as necessary.

After a few minutes of the load test, I added 600ms of latency to qa-pinotdbserver--01e5bebf93cd9888d.northwest.stripe.io.

[There seems](https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=$__auto_interval_min_interval&var-host_cluster=northwest&var-pinot_cluster=rad-rose&var-pinot_tenant=billinganalyticsrose&var-host_type=All&from=1777579989000&to=1777580954563) to be contamination into 1 or 2 realtime servers, and occasionally one offline server, but generally, the offline server is distinctly regarded as higher. The contamination effects would likely be lower if adaptive routing was enabled (not just the stats), because there would be less timeouts in general.
<img width="1384" alt="Screenshot 2026-04-30 at 4 29 11 pm" src="https://git.corp.stripe.com/user-attachments/assets/e217e755-92e6-4389-8876-15e7ad0736e9" />

Stripe-Original-Repo: stripe-private-oss-forks/pinot
Stripe-Monotonic-Timestamp: v2/2026-05-01T12:24:16Z/0
Stripe-Original-PR: https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/607
…che#633)

cc stripe-private-oss-forks/pinot-reviewers
r? dang saiswapnilar

Some performance improvements: removing redundant or thrown away work.

1. Remove TreeMap sort from encode() — The encoded string doesn't need deterministic key ordering (the broker decodes into a HashMap anyway), so removed the unnecessary sort allocation.
2. Replace String.split() with indexOf-based parsing in decode() — split() allocates a regex Pattern and a String[] on every call. Manual indexOf loop parses in-place with zero intermediate allocations.
3. Short-circuit mergeEncodings() when either arg is null/empty — Avoids decoding + re-encoding when one side has no data (common case: first worker EOS has nothing to merge with).
4. Move stage classification to planning time, which simplifies the classification a bit.
5. Since we now classify before dispatching, we can inject `collectUpstreamTiming=true` into trusted stages' `customProperties`. Each untrusted stage does not need to init a hashmap and collect timings.

[STREAMANALYTICS-4484](https://jira.corp.stripe.com/browse/STREAMANALYTICS-4484)

Started a PerformanceTestWorkflow with all the latest MSE queries on rad-canary, and ran a [chaos-scenario](https://amp.qa.corp.stripe.com/chaos-scenarios/scenario-b3c7091d-8e3c-41e8-b7ca-8330b9270a51) on it. The affected servers are clearly identified.
<img width="1249" alt="Screenshot 2026-05-12 at 11 42 00 am" src="https://git.corp.stripe.com/user-attachments/assets/47a6c84b-5891-44c8-9d6f-58aba36846ab" />
[[Grafana](https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=$__auto_interval_min_interval&var-host_cluster=northwest&var-pinot_cluster=rad-canary&var-pinot_tenant=long-lived-a&var-host_type=All&from=1778599320072&to=1778600291193&viewPanel=16)]

On master, the canary performance tests (the original 4) had results:
```
    "Aggregate QPS 35.62 >= 12.61",
    "Aggregate success rate 100.00% >= 99.90%",
    "Aggregate P99 duration 82 ms <= 240 ms",
    "Aggregate P50 duration 54 ms <= 171 ms"
```
and
```
    "Aggregate QPS 33.50 >= 12.61",
    "Aggregate success rate 100.00% >= 99.90%",
    "Aggregate P99 duration 85 ms <= 240 ms",
    "Aggregate P50 duration 58 ms <= 171 ms"
```

after deploying this change, we saw:
```
    "Aggregate QPS 38.53 >= 12.61",
    "Aggregate success rate 100.00% >= 99.90%",
    "Aggregate P99 duration 75 ms <= 240 ms",
    "Aggregate P50 duration 51 ms <= 171 ms"
```

QPS is all over the place, but there's no obvious regression.

Stripe-Original-Repo: stripe-private-oss-forks/pinot
Stripe-Monotonic-Timestamp: v2/2026-05-12T16:01:57Z/0
Stripe-Original-PR: https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/633
[Minion run](https://devboxproxy.qa.corp.stripe.com/timothye/agent/agent_run_UVLJfTr66uix4g)

cc stripe-private-oss-forks/pinot-reviewers
r? dang

Fixes the failing unit test `MailboxReceiveOperatorTest.copyStatMapsIncludesPartialTimingWhenSlowSenderNeverCompletes` introduced alongside the adaptive routing latency tracking feature (apache#633).

The test was missing the `collectUpstreamTiming=true` op-chain metadata flag that `BaseMailboxReceiveOperator` requires to enable per-sender timing collection. Without this flag, `_streamIdToSenderKey` is empty, `_senderElapsedMs` is never populated, and `copyStatMaps()` returns null for `UPSTREAM_SERVER_RESPONSE_TIMES_MS` — causing the `assertNotNull` at line 305 to fail.

Changes:
- **`OperatorTestUtil`**: Added an overloaded `getOpChainContext()` that accepts custom `opChainMetadata`, so tests can supply operator configuration flags.
- **`MailboxReceiveOperatorTest`**: Added a `getOperator()` overload accepting metadata, and updated the failing test to pass `COLLECT_UPSTREAM_TIMING_KEY -> "true"`.

The `copyStatMapsIncludesPartialTimingWhenSlowSenderNeverCompletes` test was failing in CI with:
```
java.lang.AssertionError: copyStatMaps() should include partial timing for fast sender that already sent EOS expected object to not be null
```

This was a test-only bug — the production `copyStatMaps()` implementation is correct, but the test was not setting up the operator context to enable the timing collection feature it was asserting on.

Ran the full `MailboxReceiveOperatorTest` suite (11 tests) — all pass:
```
Tests run: 11, Failures: 0, Errors: 0, Skipped: 0
BUILD SUCCESS
```

https://cibot.corp.stripe.com/builds/bui_UVLY8q2t30qyaW shows a success

This is a test-only change with no effect on production services.

Stripe-Original-Repo: stripe-private-oss-forks/pinot
Stripe-Monotonic-Timestamp: v2/2026-05-12T20:49:23Z/0
Stripe-Original-PR: https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/644
)

cc stripe-private-oss-forks/pinot-reviewers
r? dang saiswapnilar

When a query timed out, MSE adaptive routing (AR) stats did not properly identify the failing server, because it tried not to mark all servers as degraded. This was less obvious when the AR stats were not separated between MSE and SSE. However, since we have seen degradations that only affect one query engine, and we know that we want to successfully handle query timeouts, this needed to be fixed.

This PR:
- On the timeout path, collects stats about which servers responded from cancelWithStats, and passes those to extractMaxTimingsPerInstance.
- Enable timing collection on the broker's stage-0 mailbox receive operator so it captures direct-sender attribution even when EOS never arrives
- Add getSenderElapsedMsIncludingPending() to BlockingMultiStreamConsumer which reports wall-clock elapsed time for senders that haven't completed yet (used on the cancel/timeout path)

| Scenario                                             | Before PR                                                                                                              | After PR                                                                                                                                                             |
| ---------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Normal completion (no timeout)                       | `onEos()` merges sender timings; `extractMaxTimingsPerInstance` reads trusted stages; leaf servers get actual latency  | Unchanged                                                                                                                                                            |
| Worker sends error, broker cancels                   | `cancel()` (fire-and-forget, no stats); all servers get -1L                                                            | Unchanged                 |
| Broker times out (`TimeoutException`)                | `tryRecover` → `cancelWithStats`, but cancel stats not passed to `extractMaxTimingsPerInstance`; no responder tracking | `tryRecover` → `cancelWithStats`; cancel stats merged into `extractMaxTimingsPerInstance` for trusted worker stages; non-responders marked degraded with `elapsedMs` |
| Cancel itself times out (some servers don't respond) | No distinction between responders and non-responders; all untracked servers get -1L                                    | `respondingServerIds` tracks who responded; non-responders marked degraded with `elapsedMs` (tier 3)                                                                 |
| Throwable (OOM, etc.)                                | `cancel()` (fire-and-forget); all servers get -1L                                                                      | Unchanged                                                                                                                                                            |

[STREAMANALYTICS-4418](https://jira.corp.stripe.com/browse/STREAMANALYTICS-4418)

We ran a PerformanceTestWorkflow with the following arguments (on an isolated env where https://git.corp.stripe.com/stripe-internal/mint/pull/2110397 is deployed, since its been reverted on master)
```
{
    "numUsers": 5,
    "clusterInfo": {"clusterName": "rad-canary", "clusterRegion":
  "northwest"},
    "tenant": "long-lived-a",
    "durationMinutes": 90,
    "loadTestConfig": {
      "perQueryExpectations": {
        "sum_payments_group_by_merchant_since_1h_v2": {"minQps":
  2.0, "maxP99DurationMs": 244, "maxP50DurationMs": 165,
  "minSuccessRate": 0.999},
        "count_sum_payments_performance_since_1h_v2": {"minQps":
  2.0, "maxP99DurationMs": 229, "maxP50DurationMs": 150,
  "minSuccessRate": 0.999},
        "select_payments_three_table_join_performance_since_1h_v2":
   {"minQps": 2.0, "maxP99DurationMs": 376, "maxP50DurationMs":
  255, "minSuccessRate": 0.999},
        "select_payments_cte_join_window_performance_since_1h_v2":
  {"minQps": 2.0, "maxP99DurationMs": 432, "maxP50DurationMs": 322,
   "minSuccessRate": 0.999},
        "select_payments_union_join_performance_since_1h_v2":
  {"minQps": 2.0, "maxP99DurationMs": 339, "maxP50DurationMs": 184,
   "minSuccessRate": 0.999},
        "select_payments_nested_subquery_performance_since_1h_v2":
  {"minQps": 2.0, "maxP99DurationMs": 196, "maxP50DurationMs": 101,
   "minSuccessRate": 0.999}
      },
      "aggregateExpectations": {"minQps": 16.0, "maxP99DurationMs":
   432, "maxP50DurationMs": 322, "minSuccessRate": 0.999}
    }
  }
```

We kicked off a [chaos agent](https://amp.qa.corp.stripe.com/chaos-scenarios/scenario-f874bad3-419e-4a7a-870b-06010ba9fd63) run that does
1. 90s of 600ms **MSE latency**
2. 30s with no faults
3. 90s of 600ms **SSE latency**
4. 90s with no faults
5. 90s of 1200ms **MSE latency**
6. 30s with no faults
7. 90s of 1200ms **SSE latency**

Before this change was deployed, we saw the 600ms spikes appear in MSE and SSE, but the 1200ms spikes only appeared in the SSE graph.
<img width="739" alt="Screenshot 2026-05-14 at 12 49 13 pm" src="https://git.corp.stripe.com/user-attachments/assets/f168450a-4242-4e58-899f-0dfc741fd52e" />
[[grafana](https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=$__auto_interval_min_interval&var-host_cluster=northwest&var-pinot_cluster=rad-canary&var-pinot_tenant=long-lived-a&var-host_type=All&from=1778699889324&to=1778700572859)]

After this change was deployed, we saw each spike appear in each engine.
<img width="731" alt="Screenshot 2026-05-14 at 1 27 27 pm" src="https://git.corp.stripe.com/user-attachments/assets/83e672bf-6ba1-4ed1-8121-88a85be78d7b" />

[[grafana](https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=$__auto_interval_min_interval&var-host_cluster=northwest&var-pinot_cluster=rad-canary&var-pinot_tenant=long-lived-a&var-host_type=All&from=1778779007090&to=1778779619206)]

Stripe-Original-Repo: stripe-private-oss-forks/pinot
Stripe-Monotonic-Timestamp: v2/2026-05-15T17:00:38Z/0
Stripe-Original-PR: https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/648
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Jun 1, 2026

Codecov Report

❌ Patch coverage is 85.38206% with 44 lines in your changes missing coverage. Please review.
✅ Project coverage is 64.45%. Comparing base (6ed151a) to head (f965c81).

Files with missing lines Patch % Lines
.../pinot/query/service/dispatch/QueryDispatcher.java 83.49% 9 Missing and 8 partials ⚠️
...e/dispatch/AdaptiveRoutingStageClassification.java 74.00% 7 Missing and 6 partials ⚠️
...rvice/dispatch/AdaptiveRoutingUpstreamTimings.java 83.78% 3 Missing and 3 partials ⚠️
...ava/org/apache/pinot/common/datatable/StatMap.java 20.00% 0 Missing and 4 partials ⚠️
...y/runtime/operator/BaseMailboxReceiveOperator.java 91.89% 1 Missing and 2 partials ⚠️
...uery/planner/physical/DispatchablePlanContext.java 83.33% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18646      +/-   ##
============================================
+ Coverage     64.39%   64.45%   +0.06%     
  Complexity     1282     1282              
============================================
  Files          3362     3365       +3     
  Lines        207915   208185     +270     
  Branches      32463    32525      +62     
============================================
+ Hits         133883   134192     +309     
+ Misses        63258    63204      -54     
- Partials      10774    10789      +15     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-21 64.45% <85.38%> (+0.06%) ⬆️
temurin 64.45% <85.38%> (+0.06%) ⬆️
unittests 64.45% <85.38%> (+0.06%) ⬆️
unittests1 56.88% <85.04%> (+0.09%) ⬆️
unittests2 37.12% <10.96%> (-0.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@timothy-e
Copy link
Copy Markdown
Contributor Author

Hey @yashmayya, here's part two of adaptive routing for MSE. I've done some pretty extensive testing/baking with it on our QA/prod clusters over the last few weeks (although right prod is stats collection-only, not actually using adaptive routing). Sorry for the big change, I don't think there's a way to break it down smaller while keeping each part testable.

@timothy-e timothy-e marked this pull request as ready for review June 1, 2026 23:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants