Avoid substream subscription timeout for empty prefixAndTail tails#2991
Draft
He-Pin wants to merge 1 commit into
Draft
Avoid substream subscription timeout for empty prefixAndTail tails#2991He-Pin wants to merge 1 commit into
He-Pin wants to merge 1 commit into
Conversation
Motivation: prefixAndTail emits (prefix, tailSource) and then waits for the tailSource to be materialized before pulling upstream again. When a downstream consumer intentionally discards the tail (typical for protocol-style splitWhen + prefixAndTail + mapAsync flows where a known-empty substream is ignored), the stage holds the substream open until the configured stream-subscription-timeout fires (default 5 seconds), silently stalling the rest of the pipeline. Modification: Issue a single speculative pull from upstream as soon as openSubstream() has emitted the tail Source. If upstream finishes before any further element arrives, the stage now completes immediately instead of waiting for the substream subscription timer. If upstream pushes an element before the tail Source has been materialized, it is buffered in a one-element slot and delivered on the first downstream pull of the tail, preserving normal back-pressure semantics for late materialization. Result: Discarding a tail Source from prefixAndTail no longer blocks the parent flow for the subscription timeout window when the tail happens to be empty. Existing subscription-timeout behaviour for non-empty unsubscribed tails is unchanged. Tests: - sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowPrefixAndTailSpec" - 18/18 passed - Verified directional test "complete promptly on empty tail without waiting for subscription timeout (akka #20008)" fails against the pre-fix implementation - Full stream-tests/Test/test and +mimaReportBinaryIssues - Not run locally - deferred to CI References: Refs akka/akka-core#20008
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
prefixAndTailemits(prefix, tailSource)and then waits for thetailSourceto be materialized before pulling upstream again. When a downstream consumer intentionally discards the tail Source — a common pattern in protocol parsers usingsplitWhen+prefixAndTail+mapAsyncwhere a known-empty substream is skipped — the stage holds the substream open until the configuredpekko.stream.materializer.subscription-timeoutfires (default 5 s), silently stalling the rest of the pipeline.Reproduces the long-standing akka/akka#20008:
Without the fix every discarded empty tail adds a 5 s wait before the next group is emitted.
Modification
PrefixAndTailnow issues a single speculativepull(in)fromopenSubstream()as soon as the tailSourcehas been emitted:Sourcehas been materialized, the element is buffered in a one-element slot and delivered as the first element on the first downstream pull of the tail; upstream finish that arrives while the buffer is occupied is deferred and replayed together with the buffered element when the tail finally subscribes.Operator docs updated to describe the new pre-pull / one-element-buffer semantics.
Result
Discarding a tail
SourcefromprefixAndTailno longer blocks the parent flow for the subscription-timeout window when the tail is empty. The directional test for the issue scenario now completes in ~60 ms instead of ~15 s.Tests
sbt \"stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowPrefixAndTailSpec\"– 18/18 passed locally (incl. new "complete promptly on empty tail without waiting for subscription timeout (akka #20008)" and "deliver buffered tail element when tail is materialized after upstream emitted one").testOnly.scalafmt --list– clean for the changed files.stream-tests/Test/testand+mimaReportBinaryIssues– Not run locally, deferred to CI (Check / Binary Compatibilityshould cover MiMa).References
Refs akka/akka#20008