diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/prefixAndTail.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/prefixAndTail.md index 9ab2ea900c0..38334263db3 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/prefixAndTail.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/prefixAndTail.md @@ -15,6 +15,15 @@ Take up to *n* elements from the stream (less than *n* only if the upstream comp Take up to *n* elements from the stream (less than *n* only if the upstream completes before emitting *n* elements) and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements. +After the prefix has been emitted, `prefixAndTail` issues a single speculative pull on its upstream so that an empty +tail (the upstream completes before producing any further element) is detected immediately. If the emitted tail +`Source` is intentionally discarded and the substream happens to be empty, the stage completes without waiting for +the configured `pekko.stream.materializer.subscription-timeout`. + +If the speculative pull delivers an element before the tail `Source` is materialized, that element is buffered and +forwarded as the first element to whoever materializes the tail. Tails that are discarded by downstream while +upstream still has elements continue to be governed by the configured subscription timeout. + ## Reactive Streams semantics @@@div { .callout } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPrefixAndTailSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPrefixAndTailSpec.scala index 1a6cf808529..c9a463115ba 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPrefixAndTailSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowPrefixAndTailSpec.scala @@ -16,6 +16,7 @@ package org.apache.pekko.stream.scaladsl import scala.annotation.nowarn import scala.collection.immutable import scala.concurrent.Await +import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.control.NoStackTrace @@ -289,6 +290,60 @@ class FlowPrefixAndTailSpec extends StreamSpec(""" sub.expectSubscriptionAndComplete() } + "complete promptly on empty tail without waiting for subscription timeout (akka #20008)" in { + // Reproduces https://github.com/akka/akka/issues/20008. Default StreamSubscriptionTimeout + // is 5 seconds; without the speculative pre-pull in PrefixAndTail this would take + // >15 seconds (three discarded empty tails). The test must finish well under that. + import system.dispatcher + val fut = Source(List("2", "a", "b", "0", "3", "a", "b", "c", "0", "1", "a")) + .splitWhen(_.matches("\\d+")) + .prefixAndTail(1) + .mapAsync(1) { + case (Seq("0"), _) => + // Intentionally discard the tail Source for known-empty groups. + Future.successful("") + case (Seq(_), elements) => + elements.runWith(Sink.seq).map(_.mkString) + } + .concatSubstreams + .runWith(Sink.seq) + + Await.result(fut, 3.seconds.dilated) should ===(Seq("ab", "", "abc", "", "a")) + } + + "deliver buffered tail element when tail is materialized after upstream emitted one" in { + // Directional test for the speculative pre-pull buffer: after the prefix is emitted, + // PrefixAndTail eagerly pulls upstream. If upstream pushes a single element before the + // tail Source is materialized, that element must be buffered and delivered as the + // first element when the tail finally subscribes (akka #20008). + val publisher = TestPublisher.manualProbe[Int]() + val subscriber = TestSubscriber.manualProbe[(immutable.Seq[Int], Source[Int, _])]() + + Source.fromPublisher(publisher).prefixAndTail(1).to(Sink.fromSubscriber(subscriber)).run() + + val upstream = publisher.expectSubscription() + val downstream = subscriber.expectSubscription() + + downstream.request(1) + upstream.expectRequest() + upstream.sendNext(1) + + val (prefix, tail) = subscriber.expectNext() + prefix should ===(Seq(1)) + subscriber.expectComplete() + + // The speculative pre-pull lets the buffered element arrive before the tail is materialized. + upstream.sendNext(2) + upstream.sendComplete() + + val tailSubscriber = TestSubscriber.manualProbe[Int]() + tail.to(Sink.fromSubscriber(tailSubscriber)).run() + val tailSub = tailSubscriber.expectSubscription() + tailSub.request(10) + tailSubscriber.expectNext(2) + tailSubscriber.expectComplete() + } + } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala index bf9dd864336..6cb2fa03be7 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala @@ -285,6 +285,13 @@ import pekko.util.OptionVal builder.sizeHint(left) private var tailSource: SubSourceOutlet[T] = null + // Buffer for at most one element fetched by the speculative pre-pull issued in + // openSubstream(): it lets us discover an empty tail (upstream finishes before + // emitting any further element) without waiting for the substream subscription + // timeout when downstream intentionally discards the tail Source. See akka/akka#20008. + private var pendingElement: T = _ + private var hasPendingElement = false + private var upstreamFinished = false private val SubscriptionTimer = "SubstreamSubscriptionTimer" @@ -313,8 +320,19 @@ import pekko.util.OptionVal override def onPull(): Unit = { setKeepGoing(false) cancelTimer(SubscriptionTimer) - pull(in) tailSource.setHandler(() => pull(in)) + if (hasPendingElement) { + val elem = pendingElement + pendingElement = null.asInstanceOf[T] + hasPendingElement = false + tailSource.push(elem) + if (upstreamFinished) { + tailSource.complete() + completeStage() + } + } else if (!hasBeenPulled(in) && !isClosed(in)) { + pull(in) + } } } @@ -325,12 +343,23 @@ import pekko.util.OptionVal setKeepGoing(true) scheduleOnce(SubscriptionTimer, timeout) builder = null + // Speculative pre-pull: lets us observe an empty tail (upstream finish before any + // further element) immediately, instead of holding the substream open until the + // subscription timeout fires when downstream discards the tail Source. + if (!hasBeenPulled(in) && !isClosed(in)) pull(in) Source.fromGraph(tailSource.source) } override def onPush(): Unit = { if (prefixComplete) { - tailSource.push(grab(in)) + if (tailSource.isAvailable) { + tailSource.push(grab(in)) + } else { + // Speculative pre-pull delivered an element before the SubSource was materialized; + // buffer it until the first subHandler.onPull arrives. + pendingElement = grab(in) + hasPendingElement = true + } } else { builder += grab(in) left -= 1 @@ -353,6 +382,12 @@ import pekko.util.OptionVal val prefix = builder.result(); builder = null // free for GC emit(out, (prefix, Source.empty), () => completeStage()) + } else if (hasPendingElement) { + // Wait for the SubSource to be materialized so we can deliver the buffered element + // and the completion together. If the SubSource never gets materialized the + // subscription timer will eventually close the stage. + upstreamFinished = true + if (tailSource.isClosed) completeStage() } else { if (!tailSource.isClosed) tailSource.complete() completeStage()