Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ Take up to *n* elements from the stream (less than *n* only if the upstream comp
Take up to *n* elements from the stream (less than *n* only if the upstream completes before emitting *n* elements)
and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements.

After the prefix has been emitted, `prefixAndTail` issues a single speculative pull on its upstream so that an empty
tail (the upstream completes before producing any further element) is detected immediately. If the emitted tail
`Source` is intentionally discarded and the substream happens to be empty, the stage completes without waiting for
the configured `pekko.stream.materializer.subscription-timeout`.

If the speculative pull delivers an element before the tail `Source` is materialized, that element is buffered and
forwarded as the first element to whoever materializes the tail. Tails that are discarded by downstream while
upstream still has elements continue to be governed by the configured subscription timeout.

## Reactive Streams semantics

@@@div { .callout }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package org.apache.pekko.stream.scaladsl
import scala.annotation.nowarn
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.control.NoStackTrace

Expand Down Expand Up @@ -289,6 +290,60 @@ class FlowPrefixAndTailSpec extends StreamSpec("""
sub.expectSubscriptionAndComplete()
}

"complete promptly on empty tail without waiting for subscription timeout (akka #20008)" in {
// Reproduces https://github.com/akka/akka/issues/20008. Default StreamSubscriptionTimeout
// is 5 seconds; without the speculative pre-pull in PrefixAndTail this would take
// >15 seconds (three discarded empty tails). The test must finish well under that.
import system.dispatcher
val fut = Source(List("2", "a", "b", "0", "3", "a", "b", "c", "0", "1", "a"))
.splitWhen(_.matches("\\d+"))
.prefixAndTail(1)
.mapAsync(1) {
case (Seq("0"), _) =>
// Intentionally discard the tail Source for known-empty groups.
Future.successful("")
case (Seq(_), elements) =>
elements.runWith(Sink.seq).map(_.mkString)
}
.concatSubstreams
.runWith(Sink.seq)

Await.result(fut, 3.seconds.dilated) should ===(Seq("ab", "", "abc", "", "a"))
}

"deliver buffered tail element when tail is materialized after upstream emitted one" in {
// Directional test for the speculative pre-pull buffer: after the prefix is emitted,
// PrefixAndTail eagerly pulls upstream. If upstream pushes a single element before the
// tail Source is materialized, that element must be buffered and delivered as the
// first element when the tail finally subscribes (akka #20008).
val publisher = TestPublisher.manualProbe[Int]()
val subscriber = TestSubscriber.manualProbe[(immutable.Seq[Int], Source[Int, _])]()

Source.fromPublisher(publisher).prefixAndTail(1).to(Sink.fromSubscriber(subscriber)).run()

val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription()

downstream.request(1)
upstream.expectRequest()
upstream.sendNext(1)

val (prefix, tail) = subscriber.expectNext()
prefix should ===(Seq(1))
subscriber.expectComplete()

// The speculative pre-pull lets the buffered element arrive before the tail is materialized.
upstream.sendNext(2)
upstream.sendComplete()

val tailSubscriber = TestSubscriber.manualProbe[Int]()
tail.to(Sink.fromSubscriber(tailSubscriber)).run()
val tailSub = tailSubscriber.expectSubscription()
tailSub.request(10)
tailSubscriber.expectNext(2)
tailSubscriber.expectComplete()
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,13 @@ import pekko.util.OptionVal
builder.sizeHint(left)

private var tailSource: SubSourceOutlet[T] = null
// Buffer for at most one element fetched by the speculative pre-pull issued in
// openSubstream(): it lets us discover an empty tail (upstream finishes before
// emitting any further element) without waiting for the substream subscription
// timeout when downstream intentionally discards the tail Source. See akka/akka#20008.
private var pendingElement: T = _
private var hasPendingElement = false
private var upstreamFinished = false

private val SubscriptionTimer = "SubstreamSubscriptionTimer"

Expand Down Expand Up @@ -313,8 +320,19 @@ import pekko.util.OptionVal
override def onPull(): Unit = {
setKeepGoing(false)
cancelTimer(SubscriptionTimer)
pull(in)
tailSource.setHandler(() => pull(in))
if (hasPendingElement) {
val elem = pendingElement
pendingElement = null.asInstanceOf[T]
hasPendingElement = false
tailSource.push(elem)
if (upstreamFinished) {
tailSource.complete()
completeStage()
}
} else if (!hasBeenPulled(in) && !isClosed(in)) {
pull(in)
}
}
}

Expand All @@ -325,12 +343,23 @@ import pekko.util.OptionVal
setKeepGoing(true)
scheduleOnce(SubscriptionTimer, timeout)
builder = null
// Speculative pre-pull: lets us observe an empty tail (upstream finish before any
// further element) immediately, instead of holding the substream open until the
// subscription timeout fires when downstream discards the tail Source.
if (!hasBeenPulled(in) && !isClosed(in)) pull(in)
Source.fromGraph(tailSource.source)
}

override def onPush(): Unit = {
if (prefixComplete) {
tailSource.push(grab(in))
if (tailSource.isAvailable) {
tailSource.push(grab(in))
} else {
// Speculative pre-pull delivered an element before the SubSource was materialized;
// buffer it until the first subHandler.onPull arrives.
pendingElement = grab(in)
hasPendingElement = true
}
} else {
builder += grab(in)
left -= 1
Expand All @@ -353,6 +382,12 @@ import pekko.util.OptionVal
val prefix = builder.result();
builder = null // free for GC
emit(out, (prefix, Source.empty), () => completeStage())
} else if (hasPendingElement) {
// Wait for the SubSource to be materialized so we can deliver the buffered element
// and the completion together. If the SubSource never gets materialized the
// subscription timer will eventually close the stage.
upstreamFinished = true
if (tailSource.isClosed) completeStage()
} else {
if (!tailSource.isClosed) tailSource.complete()
completeStage()
Expand Down
Loading