Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions core/src/main/scala/ox/channels/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,35 @@ trait Source[+T] extends SourceOps[T] with SourceDrainOps[T]:
/** Create a clause which can be used in [[select]]. The clause will receive a value from the current channel. */
def receiveClause: Receive = Receive(delegate.receiveClause(t => Received(t.asInstanceOf[T])))

/** Attempt to receive a value from the channel if one is immediately available. This method never blocks or suspends the calling thread.
*
* May return [[None]] even when a value is available, due to contention with concurrent operations. Should not be used as a substitute
* for [[receive]] in a spin loop.
*
* @return
* [[Some]] with the received value, or [[None]] if no value is immediately available.
* @throws ChannelClosedException
* When the channel is closed.
*/
def tryReceive(): Option[T] = tryReceiveOrClosed().orThrow

/** Attempt to receive a value from the channel if one is immediately available. This method never blocks or suspends the calling thread.
* Doesn't throw exceptions when the channel is closed, but returns a value.
*
* May return [[None]] even when a value is available, due to contention with concurrent operations. Should not be used as a substitute
* for [[receiveOrClosed]] in a spin loop.
*
* @return
* [[Some]] with the received value, [[None]] if no value is immediately available, or [[ChannelClosed]] when the channel is closed.
*/
def tryReceiveOrClosed(): Option[T] | ChannelClosed =
val r = delegate.tryReceiveOrClosed()
if r == null then None
else
ChannelClosed.fromJoxOrT[T](r.asInstanceOf[AnyRef]) match
case c: ChannelClosed => c
case v: T @unchecked => Some(v)

/** Receive a value from the channel. For a variant which throws exceptions when the channel is closed, use [[receive]].
*
* @return
Expand Down Expand Up @@ -125,6 +154,40 @@ trait Sink[-T]:
*/
def sendClause(t: T): Send = Send(delegate.asInstanceOf[JSink[T]].sendClause(t, () => Sent()))

/** Attempt to send a value to the channel if there's a waiting receiver, or space in the buffer. This method never blocks or suspends the
* calling thread.
*
* May return `false` even when space is available, due to contention with concurrent operations. Should not be used as a substitute for
* [[send]] in a spin loop.
*
* @param t
* The value to send. Not `null`.
* @return
* `true` if the value was sent, `false` otherwise.
* @throws ChannelClosedException
* When the channel is closed.
*/
def trySend(t: T): Boolean = trySendOrClosed(t).orThrow

/** Attempt to send a value to the channel if there's a waiting receiver, or space in the buffer. This method never blocks or suspends the
* calling thread. Doesn't throw exceptions when the channel is closed, but returns a value.
*
* May return `false` even when space is available, due to contention with concurrent operations. Should not be used as a substitute for
* [[sendOrClosed]] in a spin loop.
*
* @param t
* The value to send. Not `null`.
* @return
* `true` if the value was sent, `false` if there's no space/waiting receiver, or [[ChannelClosed]] when the channel is closed.
*/
def trySendOrClosed(t: T): Boolean | ChannelClosed =
val r = delegate.asInstanceOf[JSink[T]].trySendOrClosed(t)
if r == null then true // null = sent
else
ChannelClosed.fromJox(r.asInstanceOf[AnyRef]) match
case c: ChannelClosed => c
case _ => false // sentinel = not sent

/** Send a value to the channel. For a variant which throws exceptions when the channel is closed, use [[send]].
*
* @param t
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/ox/channels/ChannelClosed.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ object ChannelClosed:
private[ox] def fromJoxOrUnit(joxResult: AnyRef): Unit | ChannelClosed =
if joxResult == null then () else fromJox(joxResult).asInstanceOf[ChannelClosed]

private def fromJox(joxResult: AnyRef): AnyRef | ChannelClosed =
private[ox] def fromJox(joxResult: AnyRef): AnyRef | ChannelClosed =
joxResult match
case _: JChannelDone => Done
case e: JChannelError => Error(e.cause())
Expand Down
112 changes: 112 additions & 0 deletions core/src/test/scala/ox/channels/ChannelTryTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package ox.channels

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class ChannelTryTest extends AnyFlatSpec with Matchers:
// trySend

"trySend" should "return true when there is space in the buffer" in:
val c = Channel.buffered[Int](2)
c.trySend(1) shouldBe true
c.trySend(2) shouldBe true

it should "return false when the buffer is full" in:
val c = Channel.buffered[Int](1)
c.trySend(1) shouldBe true
c.trySend(2) shouldBe false

it should "return true for an unlimited channel" in:
val c = Channel.unlimited[Int]
c.trySend(1) shouldBe true
c.trySend(2) shouldBe true

it should "throw ChannelClosedException.Done when the channel is done" in:
val c = Channel.buffered[Int](2)
c.done()
assertThrows[ChannelClosedException.Done]:
c.trySend(1)

it should "throw ChannelClosedException.Error when the channel is in error" in:
val c = Channel.buffered[Int](2)
c.error(new RuntimeException("test"))
assertThrows[ChannelClosedException.Error]:
c.trySend(1)

// trySendOrClosed

"trySendOrClosed" should "return true when there is space in the buffer" in:
val c = Channel.buffered[Int](2)
c.trySendOrClosed(1) shouldBe true
c.trySendOrClosed(2) shouldBe true

it should "return false when the buffer is full" in:
val c = Channel.buffered[Int](1)
c.trySendOrClosed(1) shouldBe true
c.trySendOrClosed(2) shouldBe false

it should "return ChannelClosed.Done when the channel is done" in:
val c = Channel.buffered[Int](2)
c.done()
c.trySendOrClosed(1) shouldBe ChannelClosed.Done

it should "return ChannelClosed.Error when the channel is in error" in:
val c = Channel.buffered[Int](2)
val ex = new RuntimeException("test")
c.error(ex)
c.trySendOrClosed(1) should matchPattern { case ChannelClosed.Error(_) => }

// tryReceive

"tryReceive" should "return Some with the value when one is available" in:
val c = Channel.buffered[Int](2)
c.send(1)
c.send(2)
c.tryReceive() shouldBe Some(1)
c.tryReceive() shouldBe Some(2)

it should "return None when the buffer is empty" in:
val c = Channel.buffered[Int](2)
c.tryReceive() shouldBe None

it should "throw ChannelClosedException.Done when the channel is done and empty" in:
val c = Channel.buffered[Int](2)
c.done()
assertThrows[ChannelClosedException.Done]:
c.tryReceive()

it should "throw ChannelClosedException.Error when the channel is in error" in:
val c = Channel.buffered[Int](2)
c.send(1)
c.error(new RuntimeException("test"))
assertThrows[ChannelClosedException.Error]:
c.tryReceive()

// tryReceiveOrClosed

"tryReceiveOrClosed" should "return Some with the value when one is available" in:
val c = Channel.buffered[Int](2)
c.send(1)
c.tryReceiveOrClosed() shouldBe Some(1)

it should "return None when the buffer is empty" in:
val c = Channel.buffered[Int](2)
c.tryReceiveOrClosed() shouldBe None

it should "return ChannelClosed.Done when the channel is done and empty" in:
val c = Channel.buffered[Int](2)
c.done()
c.tryReceiveOrClosed() shouldBe ChannelClosed.Done

it should "return ChannelClosed.Error when the channel is in error" in:
val c = Channel.buffered[Int](2)
c.send(1)
c.error(new RuntimeException("test"))
c.tryReceiveOrClosed() should matchPattern { case ChannelClosed.Error(_) => }

it should "return Some(value) from a done channel that still has buffered values" in:
val c = Channel.buffered[Int](2)
c.send(42)
c.done()
c.tryReceiveOrClosed() shouldBe Some(42)
end ChannelTryTest
18 changes: 10 additions & 8 deletions core/src/test/scala/ox/flow/FlowOpsMapParTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,31 +87,33 @@ class FlowOpsMapParTest extends AnyFlatSpec with Matchers with Eventually:
it should "cancel other running forks when there's an error" in supervised:
// given
val trail = Trail()
val flow = Flow.fromIterable(1 to 10)
// Using exactly 2 elements with mapPar(2) so both always start concurrently — the semaphore has
// 2 permits and there are only 2 forks, so both acquire immediately with no competition.
// Element 2 fails after a short delay; element 1 should be cancelled during its long sleep.

// when
val s2 = flow
val s2 = Flow
.fromIterable(List(1, 2))
.mapPar(2): i =>
if i == 4 then
if i == 2 then
sleep(100.millis)
trail.add("exception")
throw new Exception("boom")
else
sleep(200.millis)
sleep(1000.millis)
trail.add(s"done")
i * 2
.runToChannel()

// then
s2.receive() shouldBe 2
s2.receive() shouldBe 4
s2.receiveOrClosed() should matchPattern {
case ChannelClosed.Error(ChannelClosedException.Error(reason)) if reason.getMessage == "boom" =>
}

// checking if the forks aren't left running
// checking if the forks aren't left running; element 1 runs concurrently with the failing element 2
// and should be cancelled before its 1s sleep completes (900ms cancellation window)
sleep(200.millis)
trail.get shouldBe Vector("done", "done", "exception") // TODO: 3 isn't cancelled because it's already taken off the queue
trail.get shouldBe Vector("exception") // element 1 cancelled (no "done"), element 2 added "exception"

// Edge Cases
it should "handle empty flow" in supervised:
Expand Down