From 8c6360ccae1f3d3becb7d829936ec07c8f52444c Mon Sep 17 00:00:00 2001 From: He-Pin Date: Mon, 25 May 2026 15:21:48 +0800 Subject: [PATCH] fix: avoid substream subscription timeout for empty prefixAndTail tails Motivation: prefixAndTail emits (prefix, tailSource) and then waits for the tailSource to be materialized before pulling upstream again. When a downstream consumer intentionally discards the tail (typical for protocol-style splitWhen + prefixAndTail + mapAsync flows where a known-empty substream is ignored), the stage holds the substream open until the configured stream-subscription-timeout fires (default 5 seconds), silently stalling the rest of the pipeline. Modification: Issue a single speculative pull from upstream as soon as openSubstream() has emitted the tail Source. If upstream finishes before any further element arrives, the stage now completes immediately instead of waiting for the substream subscription timer. If upstream pushes an element before the tail Source has been materialized, it is buffered in a one-element slot and delivered on the first downstream pull of the tail, preserving normal back-pressure semantics for late materialization. Result: Discarding a tail Source from prefixAndTail no longer blocks the parent flow for the subscription timeout window when the tail happens to be empty. Existing subscription-timeout behaviour for non-empty unsubscribed tails is unchanged. Tests: - sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowPrefixAndTailSpec" - 18/18 passed - Verified directional test "complete promptly on empty tail without waiting for subscription timeout (akka #20008)" fails against the pre-fix implementation - Full stream-tests/Test/test and +mimaReportBinaryIssues - Not run locally - deferred to CI References: Refs akka/akka#20008 --- .../operators/Source-or-Flow/prefixAndTail.md | 9 +++ .../scaladsl/FlowPrefixAndTailSpec.scala | 55 +++++++++++++++++++ .../stream/impl/fusing/StreamOfStreams.scala | 39 ++++++++++++- 3 files changed, 101 insertions(+), 2 deletions(-) diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/prefixAndTail.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/prefixAndTail.md index 9ab2ea900c0..38334263db3 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/prefixAndTail.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/prefixAndTail.md @@ -15,6 +15,15 @@ Take up to *n* elements from the stream (less than *n* only if the upstream comp Take up to *n* elements from the stream (less than *n* only if the upstream completes before emitting *n* elements) and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements. +After the prefix has been emitted, `prefixAndTail` issues a single speculative pull on its upstream so that an empty +tail (the upstream completes before producing any further element) is detected immediately. If the emitted tail +`Source` is intentionally discarded and the substream happens to be empty, the stage completes without waiting for +the configured `pekko.stream.materializer.subscription-timeout`. + +If the speculative pull delivers an element before the tail `Source` is materialized, that element is buffered and +forwarded as the first element to whoever materializes the tail. Tails that are discarded by downstream while +upstream still has elements continue to be governed by the configured subscription timeout. + ## Reactive Streams semantics @@@div { .callout } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPrefixAndTailSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPrefixAndTailSpec.scala index 1a6cf808529..c9a463115ba 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPrefixAndTailSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPrefixAndTailSpec.scala @@ -16,6 +16,7 @@ package org.apache.pekko.stream.scaladsl import scala.annotation.nowarn import scala.collection.immutable import scala.concurrent.Await +import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.control.NoStackTrace @@ -289,6 +290,60 @@ class FlowPrefixAndTailSpec extends StreamSpec(""" sub.expectSubscriptionAndComplete() } + "complete promptly on empty tail without waiting for subscription timeout (akka #20008)" in { + // Reproduces https://github.com/akka/akka/issues/20008. Default StreamSubscriptionTimeout + // is 5 seconds; without the speculative pre-pull in PrefixAndTail this would take + // >15 seconds (three discarded empty tails). The test must finish well under that. + import system.dispatcher + val fut = Source(List("2", "a", "b", "0", "3", "a", "b", "c", "0", "1", "a")) + .splitWhen(_.matches("\\d+")) + .prefixAndTail(1) + .mapAsync(1) { + case (Seq("0"), _) => + // Intentionally discard the tail Source for known-empty groups. + Future.successful("") + case (Seq(_), elements) => + elements.runWith(Sink.seq).map(_.mkString) + } + .concatSubstreams + .runWith(Sink.seq) + + Await.result(fut, 3.seconds.dilated) should ===(Seq("ab", "", "abc", "", "a")) + } + + "deliver buffered tail element when tail is materialized after upstream emitted one" in { + // Directional test for the speculative pre-pull buffer: after the prefix is emitted, + // PrefixAndTail eagerly pulls upstream. If upstream pushes a single element before the + // tail Source is materialized, that element must be buffered and delivered as the + // first element when the tail finally subscribes (akka #20008). + val publisher = TestPublisher.manualProbe[Int]() + val subscriber = TestSubscriber.manualProbe[(immutable.Seq[Int], Source[Int, _])]() + + Source.fromPublisher(publisher).prefixAndTail(1).to(Sink.fromSubscriber(subscriber)).run() + + val upstream = publisher.expectSubscription() + val downstream = subscriber.expectSubscription() + + downstream.request(1) + upstream.expectRequest() + upstream.sendNext(1) + + val (prefix, tail) = subscriber.expectNext() + prefix should ===(Seq(1)) + subscriber.expectComplete() + + // The speculative pre-pull lets the buffered element arrive before the tail is materialized. + upstream.sendNext(2) + upstream.sendComplete() + + val tailSubscriber = TestSubscriber.manualProbe[Int]() + tail.to(Sink.fromSubscriber(tailSubscriber)).run() + val tailSub = tailSubscriber.expectSubscription() + tailSub.request(10) + tailSubscriber.expectNext(2) + tailSubscriber.expectComplete() + } + } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala index bf9dd864336..6cb2fa03be7 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala @@ -285,6 +285,13 @@ import pekko.util.OptionVal builder.sizeHint(left) private var tailSource: SubSourceOutlet[T] = null + // Buffer for at most one element fetched by the speculative pre-pull issued in + // openSubstream(): it lets us discover an empty tail (upstream finishes before + // emitting any further element) without waiting for the substream subscription + // timeout when downstream intentionally discards the tail Source. See akka/akka#20008. + private var pendingElement: T = _ + private var hasPendingElement = false + private var upstreamFinished = false private val SubscriptionTimer = "SubstreamSubscriptionTimer" @@ -313,8 +320,19 @@ import pekko.util.OptionVal override def onPull(): Unit = { setKeepGoing(false) cancelTimer(SubscriptionTimer) - pull(in) tailSource.setHandler(() => pull(in)) + if (hasPendingElement) { + val elem = pendingElement + pendingElement = null.asInstanceOf[T] + hasPendingElement = false + tailSource.push(elem) + if (upstreamFinished) { + tailSource.complete() + completeStage() + } + } else if (!hasBeenPulled(in) && !isClosed(in)) { + pull(in) + } } } @@ -325,12 +343,23 @@ import pekko.util.OptionVal setKeepGoing(true) scheduleOnce(SubscriptionTimer, timeout) builder = null + // Speculative pre-pull: lets us observe an empty tail (upstream finish before any + // further element) immediately, instead of holding the substream open until the + // subscription timeout fires when downstream discards the tail Source. + if (!hasBeenPulled(in) && !isClosed(in)) pull(in) Source.fromGraph(tailSource.source) } override def onPush(): Unit = { if (prefixComplete) { - tailSource.push(grab(in)) + if (tailSource.isAvailable) { + tailSource.push(grab(in)) + } else { + // Speculative pre-pull delivered an element before the SubSource was materialized; + // buffer it until the first subHandler.onPull arrives. + pendingElement = grab(in) + hasPendingElement = true + } } else { builder += grab(in) left -= 1 @@ -353,6 +382,12 @@ import pekko.util.OptionVal val prefix = builder.result(); builder = null // free for GC emit(out, (prefix, Source.empty), () => completeStage()) + } else if (hasPendingElement) { + // Wait for the SubSource to be materialized so we can deliver the buffered element + // and the completion together. If the SubSource never gets materialized the + // subscription timer will eventually close the stage. + upstreamFinished = true + if (tailSource.isClosed) completeStage() } else { if (!tailSource.isClosed) tailSource.complete() completeStage()