diff --git a/core/src/main/scala/ox/channels/Channel.scala b/core/src/main/scala/ox/channels/Channel.scala index d83590b6..3d75288c 100644 --- a/core/src/main/scala/ox/channels/Channel.scala +++ b/core/src/main/scala/ox/channels/Channel.scala @@ -54,6 +54,35 @@ trait Source[+T] extends SourceOps[T] with SourceDrainOps[T]: /** Create a clause which can be used in [[select]]. The clause will receive a value from the current channel. */ def receiveClause: Receive = Receive(delegate.receiveClause(t => Received(t.asInstanceOf[T]))) + /** Attempt to receive a value from the channel if one is immediately available. This method never blocks or suspends the calling thread. + * + * May return [[None]] even when a value is available, due to contention with concurrent operations. Should not be used as a substitute + * for [[receive]] in a spin loop. + * + * @return + * [[Some]] with the received value, or [[None]] if no value is immediately available. + * @throws ChannelClosedException + * When the channel is closed. + */ + def tryReceive(): Option[T] = tryReceiveOrClosed().orThrow + + /** Attempt to receive a value from the channel if one is immediately available. This method never blocks or suspends the calling thread. + * Doesn't throw exceptions when the channel is closed, but returns a value. + * + * May return [[None]] even when a value is available, due to contention with concurrent operations. Should not be used as a substitute + * for [[receiveOrClosed]] in a spin loop. + * + * @return + * [[Some]] with the received value, [[None]] if no value is immediately available, or [[ChannelClosed]] when the channel is closed. + */ + def tryReceiveOrClosed(): Option[T] | ChannelClosed = + val r = delegate.tryReceiveOrClosed() + if r == null then None + else + ChannelClosed.fromJoxOrT[T](r.asInstanceOf[AnyRef]) match + case c: ChannelClosed => c + case v: T @unchecked => Some(v) + /** Receive a value from the channel. For a variant which throws exceptions when the channel is closed, use [[receive]]. * * @return @@ -125,6 +154,40 @@ trait Sink[-T]: */ def sendClause(t: T): Send = Send(delegate.asInstanceOf[JSink[T]].sendClause(t, () => Sent())) + /** Attempt to send a value to the channel if there's a waiting receiver, or space in the buffer. This method never blocks or suspends the + * calling thread. + * + * May return `false` even when space is available, due to contention with concurrent operations. Should not be used as a substitute for + * [[send]] in a spin loop. + * + * @param t + * The value to send. Not `null`. + * @return + * `true` if the value was sent, `false` otherwise. + * @throws ChannelClosedException + * When the channel is closed. + */ + def trySend(t: T): Boolean = trySendOrClosed(t).orThrow + + /** Attempt to send a value to the channel if there's a waiting receiver, or space in the buffer. This method never blocks or suspends the + * calling thread. Doesn't throw exceptions when the channel is closed, but returns a value. + * + * May return `false` even when space is available, due to contention with concurrent operations. Should not be used as a substitute for + * [[sendOrClosed]] in a spin loop. + * + * @param t + * The value to send. Not `null`. + * @return + * `true` if the value was sent, `false` if there's no space/waiting receiver, or [[ChannelClosed]] when the channel is closed. + */ + def trySendOrClosed(t: T): Boolean | ChannelClosed = + val r = delegate.asInstanceOf[JSink[T]].trySendOrClosed(t) + if r == null then true // null = sent + else + ChannelClosed.fromJox(r.asInstanceOf[AnyRef]) match + case c: ChannelClosed => c + case _ => false // sentinel = not sent + /** Send a value to the channel. For a variant which throws exceptions when the channel is closed, use [[send]]. * * @param t diff --git a/core/src/main/scala/ox/channels/ChannelClosed.scala b/core/src/main/scala/ox/channels/ChannelClosed.scala index d92ba60b..d9028451 100644 --- a/core/src/main/scala/ox/channels/ChannelClosed.scala +++ b/core/src/main/scala/ox/channels/ChannelClosed.scala @@ -16,7 +16,7 @@ object ChannelClosed: private[ox] def fromJoxOrUnit(joxResult: AnyRef): Unit | ChannelClosed = if joxResult == null then () else fromJox(joxResult).asInstanceOf[ChannelClosed] - private def fromJox(joxResult: AnyRef): AnyRef | ChannelClosed = + private[ox] def fromJox(joxResult: AnyRef): AnyRef | ChannelClosed = joxResult match case _: JChannelDone => Done case e: JChannelError => Error(e.cause()) diff --git a/core/src/test/scala/ox/channels/ChannelTryTest.scala b/core/src/test/scala/ox/channels/ChannelTryTest.scala new file mode 100644 index 00000000..e361c1b8 --- /dev/null +++ b/core/src/test/scala/ox/channels/ChannelTryTest.scala @@ -0,0 +1,112 @@ +package ox.channels + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class ChannelTryTest extends AnyFlatSpec with Matchers: + // trySend + + "trySend" should "return true when there is space in the buffer" in: + val c = Channel.buffered[Int](2) + c.trySend(1) shouldBe true + c.trySend(2) shouldBe true + + it should "return false when the buffer is full" in: + val c = Channel.buffered[Int](1) + c.trySend(1) shouldBe true + c.trySend(2) shouldBe false + + it should "return true for an unlimited channel" in: + val c = Channel.unlimited[Int] + c.trySend(1) shouldBe true + c.trySend(2) shouldBe true + + it should "throw ChannelClosedException.Done when the channel is done" in: + val c = Channel.buffered[Int](2) + c.done() + assertThrows[ChannelClosedException.Done]: + c.trySend(1) + + it should "throw ChannelClosedException.Error when the channel is in error" in: + val c = Channel.buffered[Int](2) + c.error(new RuntimeException("test")) + assertThrows[ChannelClosedException.Error]: + c.trySend(1) + + // trySendOrClosed + + "trySendOrClosed" should "return true when there is space in the buffer" in: + val c = Channel.buffered[Int](2) + c.trySendOrClosed(1) shouldBe true + c.trySendOrClosed(2) shouldBe true + + it should "return false when the buffer is full" in: + val c = Channel.buffered[Int](1) + c.trySendOrClosed(1) shouldBe true + c.trySendOrClosed(2) shouldBe false + + it should "return ChannelClosed.Done when the channel is done" in: + val c = Channel.buffered[Int](2) + c.done() + c.trySendOrClosed(1) shouldBe ChannelClosed.Done + + it should "return ChannelClosed.Error when the channel is in error" in: + val c = Channel.buffered[Int](2) + val ex = new RuntimeException("test") + c.error(ex) + c.trySendOrClosed(1) should matchPattern { case ChannelClosed.Error(_) => } + + // tryReceive + + "tryReceive" should "return Some with the value when one is available" in: + val c = Channel.buffered[Int](2) + c.send(1) + c.send(2) + c.tryReceive() shouldBe Some(1) + c.tryReceive() shouldBe Some(2) + + it should "return None when the buffer is empty" in: + val c = Channel.buffered[Int](2) + c.tryReceive() shouldBe None + + it should "throw ChannelClosedException.Done when the channel is done and empty" in: + val c = Channel.buffered[Int](2) + c.done() + assertThrows[ChannelClosedException.Done]: + c.tryReceive() + + it should "throw ChannelClosedException.Error when the channel is in error" in: + val c = Channel.buffered[Int](2) + c.send(1) + c.error(new RuntimeException("test")) + assertThrows[ChannelClosedException.Error]: + c.tryReceive() + + // tryReceiveOrClosed + + "tryReceiveOrClosed" should "return Some with the value when one is available" in: + val c = Channel.buffered[Int](2) + c.send(1) + c.tryReceiveOrClosed() shouldBe Some(1) + + it should "return None when the buffer is empty" in: + val c = Channel.buffered[Int](2) + c.tryReceiveOrClosed() shouldBe None + + it should "return ChannelClosed.Done when the channel is done and empty" in: + val c = Channel.buffered[Int](2) + c.done() + c.tryReceiveOrClosed() shouldBe ChannelClosed.Done + + it should "return ChannelClosed.Error when the channel is in error" in: + val c = Channel.buffered[Int](2) + c.send(1) + c.error(new RuntimeException("test")) + c.tryReceiveOrClosed() should matchPattern { case ChannelClosed.Error(_) => } + + it should "return Some(value) from a done channel that still has buffered values" in: + val c = Channel.buffered[Int](2) + c.send(42) + c.done() + c.tryReceiveOrClosed() shouldBe Some(42) +end ChannelTryTest diff --git a/core/src/test/scala/ox/flow/FlowOpsMapParTest.scala b/core/src/test/scala/ox/flow/FlowOpsMapParTest.scala index 81626286..73382530 100644 --- a/core/src/test/scala/ox/flow/FlowOpsMapParTest.scala +++ b/core/src/test/scala/ox/flow/FlowOpsMapParTest.scala @@ -87,31 +87,33 @@ class FlowOpsMapParTest extends AnyFlatSpec with Matchers with Eventually: it should "cancel other running forks when there's an error" in supervised: // given val trail = Trail() - val flow = Flow.fromIterable(1 to 10) + // Using exactly 2 elements with mapPar(2) so both always start concurrently — the semaphore has + // 2 permits and there are only 2 forks, so both acquire immediately with no competition. + // Element 2 fails after a short delay; element 1 should be cancelled during its long sleep. // when - val s2 = flow + val s2 = Flow + .fromIterable(List(1, 2)) .mapPar(2): i => - if i == 4 then + if i == 2 then sleep(100.millis) trail.add("exception") throw new Exception("boom") else - sleep(200.millis) + sleep(1000.millis) trail.add(s"done") i * 2 .runToChannel() // then - s2.receive() shouldBe 2 - s2.receive() shouldBe 4 s2.receiveOrClosed() should matchPattern { case ChannelClosed.Error(ChannelClosedException.Error(reason)) if reason.getMessage == "boom" => } - // checking if the forks aren't left running + // checking if the forks aren't left running; element 1 runs concurrently with the failing element 2 + // and should be cancelled before its 1s sleep completes (900ms cancellation window) sleep(200.millis) - trail.get shouldBe Vector("done", "done", "exception") // TODO: 3 isn't cancelled because it's already taken off the queue + trail.get shouldBe Vector("exception") // element 1 cancelled (no "done"), element 2 added "exception" // Edge Cases it should "handle empty flow" in supervised: