From a9bd8cf85e34a63dc9e2105429ac40ac36263ad9 Mon Sep 17 00:00:00 2001 From: Adam Warski Date: Mon, 2 Mar 2026 17:38:49 +0000 Subject: [PATCH 1/6] Add non-blocking trySend and tryReceive methods from jox 1.1.2 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Expose jox 1.1.2's new non-blocking try* methods on ox's Source and Sink traits. All four variants follow the existing orClosed convention: - Sink.trySend(t): Boolean — true=sent, throws when closed - Sink.trySendOrClosed(t): Boolean | ChannelClosed - Source.tryReceive(): Option[T] — Some=received, None=not available, throws when closed - Source.tryReceiveOrClosed(): Option[T] | ChannelClosed Co-Authored-By: Claude Sonnet 4.6 --- core/src/main/scala/ox/channels/Channel.scala | 57 ++++++++ .../scala/ox/channels/ChannelClosed.scala | 20 +++ .../scala/ox/channels/ChannelTryTest.scala | 134 ++++++++++++++++++ 3 files changed, 211 insertions(+) create mode 100644 core/src/test/scala/ox/channels/ChannelTryTest.scala diff --git a/core/src/main/scala/ox/channels/Channel.scala b/core/src/main/scala/ox/channels/Channel.scala index d83590b6..6b38c4b5 100644 --- a/core/src/main/scala/ox/channels/Channel.scala +++ b/core/src/main/scala/ox/channels/Channel.scala @@ -54,6 +54,32 @@ trait Source[+T] extends SourceOps[T] with SourceDrainOps[T]: /** Create a clause which can be used in [[select]]. The clause will receive a value from the current channel. */ def receiveClause: Receive = Receive(delegate.receiveClause(t => Received(t.asInstanceOf[T]))) + /** Attempt to receive a value from the channel if one is immediately available. This method never blocks or suspends the calling thread. + * + * May return [[None]] even when a value is available, due to contention with concurrent operations. Should not be used as a substitute + * for [[receive]] in a spin loop. + * + * @return + * [[Some]] with the received value, or [[None]] if no value is immediately available. + * @throws ChannelClosedException + * When the channel is closed. + */ + def tryReceive(): Option[T] = tryReceiveOrClosed() match + case c: ChannelClosed => throw c.toThrowable + case opt: Option[T @unchecked] => opt + + /** Attempt to receive a value from the channel if one is immediately available. This method never blocks or suspends the calling thread. + * Doesn't throw exceptions when the channel is closed, but returns a value. + * + * May return [[None]] even when a value is available, due to contention with concurrent operations. Should not be used as a substitute + * for [[receiveOrClosed]] in a spin loop. + * + * @return + * [[Some]] with the received value, [[None]] if no value is immediately available, or [[ChannelClosed]] when the channel is closed. + */ + def tryReceiveOrClosed(): Option[T] | ChannelClosed = + ChannelClosed.fromJoxTryReceiveOrClosed(delegate.tryReceiveOrClosed()) + /** Receive a value from the channel. For a variant which throws exceptions when the channel is closed, use [[receive]]. * * @return @@ -125,6 +151,37 @@ trait Sink[-T]: */ def sendClause(t: T): Send = Send(delegate.asInstanceOf[JSink[T]].sendClause(t, () => Sent())) + /** Attempt to send a value to the channel if there's a waiting receiver, or space in the buffer. This method never blocks or suspends the + * calling thread. + * + * May return `false` even when space is available, due to contention with concurrent operations. Should not be used as a substitute for + * [[send]] in a spin loop. + * + * @param t + * The value to send. Not `null`. + * @return + * `true` if the value was sent, `false` otherwise. + * @throws ChannelClosedException + * When the channel is closed. + */ + def trySend(t: T): Boolean = trySendOrClosed(t) match + case c: ChannelClosed => throw c.toThrowable + case b: Boolean => b + + /** Attempt to send a value to the channel if there's a waiting receiver, or space in the buffer. This method never blocks or suspends the + * calling thread. Doesn't throw exceptions when the channel is closed, but returns a value. + * + * May return `false` even when space is available, due to contention with concurrent operations. Should not be used as a substitute for + * [[sendOrClosed]] in a spin loop. + * + * @param t + * The value to send. Not `null`. + * @return + * `true` if the value was sent, `false` if there's no space/waiting receiver, or [[ChannelClosed]] when the channel is closed. + */ + def trySendOrClosed(t: T): Boolean | ChannelClosed = + ChannelClosed.fromJoxTrySendOrClosed(delegate.asInstanceOf[JSink[T]].trySendOrClosed(t)) + /** Send a value to the channel. For a variant which throws exceptions when the channel is closed, use [[send]]. * * @param t diff --git a/core/src/main/scala/ox/channels/ChannelClosed.scala b/core/src/main/scala/ox/channels/ChannelClosed.scala index d92ba60b..a4144da5 100644 --- a/core/src/main/scala/ox/channels/ChannelClosed.scala +++ b/core/src/main/scala/ox/channels/ChannelClosed.scala @@ -16,6 +16,26 @@ object ChannelClosed: private[ox] def fromJoxOrUnit(joxResult: AnyRef): Unit | ChannelClosed = if joxResult == null then () else fromJox(joxResult).asInstanceOf[ChannelClosed] + /** Converts the result of jox's `tryReceiveOrClosed()` (which returns `T | JChannelClosed | null`) to `Option[T] | ChannelClosed`. `null` + * means no value was immediately available. + */ + private[ox] def fromJoxTryReceiveOrClosed[T](joxResult: AnyRef): Option[T] | ChannelClosed = + joxResult match + case null => None + case _: JChannelDone => Done + case e: JChannelError => Error(e.cause()) + case v => Some(v.asInstanceOf[T]) + + /** Converts the result of jox's `trySendOrClosed()` (which returns `null | JChannelClosed | sentinel`) to `Boolean | ChannelClosed`. + * `null` means the value was sent, any other non-ChannelClosed value (sentinel) means it was not sent. + */ + private[ox] def fromJoxTrySendOrClosed(joxResult: AnyRef): Boolean | ChannelClosed = + joxResult match + case null => true + case _: JChannelDone => Done + case e: JChannelError => Error(e.cause()) + case _ => false + private def fromJox(joxResult: AnyRef): AnyRef | ChannelClosed = joxResult match case _: JChannelDone => Done diff --git a/core/src/test/scala/ox/channels/ChannelTryTest.scala b/core/src/test/scala/ox/channels/ChannelTryTest.scala new file mode 100644 index 00000000..8ffb51c7 --- /dev/null +++ b/core/src/test/scala/ox/channels/ChannelTryTest.scala @@ -0,0 +1,134 @@ +package ox.channels + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class ChannelTryTest extends AnyFlatSpec with Matchers: + // trySend + + "trySend" should "return true when there is space in the buffer" in { + val c = Channel.buffered[Int](2) + c.trySend(1) shouldBe true + c.trySend(2) shouldBe true + } + + it should "return false when the buffer is full" in { + val c = Channel.buffered[Int](1) + c.trySend(1) shouldBe true + c.trySend(2) shouldBe false + } + + it should "return true for an unlimited channel" in { + val c = Channel.unlimited[Int] + c.trySend(1) shouldBe true + c.trySend(2) shouldBe true + } + + it should "throw ChannelClosedException.Done when the channel is done" in { + val c = Channel.buffered[Int](2) + c.done() + assertThrows[ChannelClosedException.Done] { + c.trySend(1) + } + } + + it should "throw ChannelClosedException.Error when the channel is in error" in { + val c = Channel.buffered[Int](2) + c.error(new RuntimeException("test")) + assertThrows[ChannelClosedException.Error] { + c.trySend(1) + } + } + + // trySendOrClosed + + "trySendOrClosed" should "return true when there is space in the buffer" in { + val c = Channel.buffered[Int](2) + c.trySendOrClosed(1) shouldBe true + c.trySendOrClosed(2) shouldBe true + } + + it should "return false when the buffer is full" in { + val c = Channel.buffered[Int](1) + c.trySendOrClosed(1) shouldBe true + c.trySendOrClosed(2) shouldBe false + } + + it should "return ChannelClosed.Done when the channel is done" in { + val c = Channel.buffered[Int](2) + c.done() + c.trySendOrClosed(1) shouldBe ChannelClosed.Done + } + + it should "return ChannelClosed.Error when the channel is in error" in { + val c = Channel.buffered[Int](2) + val ex = new RuntimeException("test") + c.error(ex) + c.trySendOrClosed(1) should matchPattern { case ChannelClosed.Error(_) => } + } + + // tryReceive + + "tryReceive" should "return Some with the value when one is available" in { + val c = Channel.buffered[Int](2) + c.send(1) + c.send(2) + c.tryReceive() shouldBe Some(1) + c.tryReceive() shouldBe Some(2) + } + + it should "return None when the buffer is empty" in { + val c = Channel.buffered[Int](2) + c.tryReceive() shouldBe None + } + + it should "throw ChannelClosedException.Done when the channel is done and empty" in { + val c = Channel.buffered[Int](2) + c.done() + assertThrows[ChannelClosedException.Done] { + c.tryReceive() + } + } + + it should "throw ChannelClosedException.Error when the channel is in error" in { + val c = Channel.buffered[Int](2) + c.send(1) + c.error(new RuntimeException("test")) + assertThrows[ChannelClosedException.Error] { + c.tryReceive() + } + } + + // tryReceiveOrClosed + + "tryReceiveOrClosed" should "return Some with the value when one is available" in { + val c = Channel.buffered[Int](2) + c.send(1) + c.tryReceiveOrClosed() shouldBe Some(1) + } + + it should "return None when the buffer is empty" in { + val c = Channel.buffered[Int](2) + c.tryReceiveOrClosed() shouldBe None + } + + it should "return ChannelClosed.Done when the channel is done and empty" in { + val c = Channel.buffered[Int](2) + c.done() + c.tryReceiveOrClosed() shouldBe ChannelClosed.Done + } + + it should "return ChannelClosed.Error when the channel is in error" in { + val c = Channel.buffered[Int](2) + c.send(1) + c.error(new RuntimeException("test")) + c.tryReceiveOrClosed() should matchPattern { case ChannelClosed.Error(_) => } + } + + it should "return Some(value) from a done channel that still has buffered values" in { + val c = Channel.buffered[Int](2) + c.send(42) + c.done() + c.tryReceiveOrClosed() shouldBe Some(42) + } +end ChannelTryTest From ccfec2c7d838023b0300ae2a79d666274cb607c0 Mon Sep 17 00:00:00 2001 From: Adam Warski Date: Mon, 2 Mar 2026 17:48:23 +0000 Subject: [PATCH 2/6] Use orThrow in trySend/tryReceive; fix scaladoc backtick - trySend and tryReceive now delegate to orThrow, consistent with send/receive - Fix stray backtick in ChannelClosed scaladoc Co-Authored-By: Claude Sonnet 4.6 --- core/src/main/scala/ox/channels/Channel.scala | 8 ++------ core/src/main/scala/ox/channels/ChannelClosed.scala | 2 +- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/ox/channels/Channel.scala b/core/src/main/scala/ox/channels/Channel.scala index 6b38c4b5..b9d9e810 100644 --- a/core/src/main/scala/ox/channels/Channel.scala +++ b/core/src/main/scala/ox/channels/Channel.scala @@ -64,9 +64,7 @@ trait Source[+T] extends SourceOps[T] with SourceDrainOps[T]: * @throws ChannelClosedException * When the channel is closed. */ - def tryReceive(): Option[T] = tryReceiveOrClosed() match - case c: ChannelClosed => throw c.toThrowable - case opt: Option[T @unchecked] => opt + def tryReceive(): Option[T] = tryReceiveOrClosed().orThrow /** Attempt to receive a value from the channel if one is immediately available. This method never blocks or suspends the calling thread. * Doesn't throw exceptions when the channel is closed, but returns a value. @@ -164,9 +162,7 @@ trait Sink[-T]: * @throws ChannelClosedException * When the channel is closed. */ - def trySend(t: T): Boolean = trySendOrClosed(t) match - case c: ChannelClosed => throw c.toThrowable - case b: Boolean => b + def trySend(t: T): Boolean = trySendOrClosed(t).orThrow /** Attempt to send a value to the channel if there's a waiting receiver, or space in the buffer. This method never blocks or suspends the * calling thread. Doesn't throw exceptions when the channel is closed, but returns a value. diff --git a/core/src/main/scala/ox/channels/ChannelClosed.scala b/core/src/main/scala/ox/channels/ChannelClosed.scala index a4144da5..d2a64090 100644 --- a/core/src/main/scala/ox/channels/ChannelClosed.scala +++ b/core/src/main/scala/ox/channels/ChannelClosed.scala @@ -27,7 +27,7 @@ object ChannelClosed: case v => Some(v.asInstanceOf[T]) /** Converts the result of jox's `trySendOrClosed()` (which returns `null | JChannelClosed | sentinel`) to `Boolean | ChannelClosed`. - * `null` means the value was sent, any other non-ChannelClosed value (sentinel) means it was not sent. + * `null` means the value was sent, any other non-`ChannelClosed` value (sentinel) means it was not sent. */ private[ox] def fromJoxTrySendOrClosed(joxResult: AnyRef): Boolean | ChannelClosed = joxResult match From ce0ddb9a30266cb875d024cacbd082a9f3f1d0f5 Mon Sep 17 00:00:00 2001 From: Adam Warski Date: Mon, 2 Mar 2026 18:05:04 +0000 Subject: [PATCH 3/6] Inline try* helpers; fix CircuitBreakerTest flakiness; use in: style - Remove fromJoxTryReceiveOrClosed/fromJoxTrySendOrClosed helper methods and inline their logic in tryReceiveOrClosed/trySendOrClosed, delegating to the existing fromJoxOrT and fromJox methods. Expose fromJox as private[ox] to enable this. - Update ChannelTryTest to use Scala 3 in: syntax (no braces) - Fix CircuitBreakerTest flakiness by wrapping transition assertions in eventually{}, giving a 1s window for actor-dispatched state changes to propagate under JVM load Co-Authored-By: Claude Sonnet 4.6 --- core/src/main/scala/ox/channels/Channel.scala | 14 +++- .../scala/ox/channels/ChannelClosed.scala | 22 +------ .../scala/ox/channels/ChannelTryTest.scala | 66 +++++++------------ .../ox/resilience/CircuitBreakerTest.scala | 21 +++--- 4 files changed, 47 insertions(+), 76 deletions(-) diff --git a/core/src/main/scala/ox/channels/Channel.scala b/core/src/main/scala/ox/channels/Channel.scala index b9d9e810..3d75288c 100644 --- a/core/src/main/scala/ox/channels/Channel.scala +++ b/core/src/main/scala/ox/channels/Channel.scala @@ -76,7 +76,12 @@ trait Source[+T] extends SourceOps[T] with SourceDrainOps[T]: * [[Some]] with the received value, [[None]] if no value is immediately available, or [[ChannelClosed]] when the channel is closed. */ def tryReceiveOrClosed(): Option[T] | ChannelClosed = - ChannelClosed.fromJoxTryReceiveOrClosed(delegate.tryReceiveOrClosed()) + val r = delegate.tryReceiveOrClosed() + if r == null then None + else + ChannelClosed.fromJoxOrT[T](r.asInstanceOf[AnyRef]) match + case c: ChannelClosed => c + case v: T @unchecked => Some(v) /** Receive a value from the channel. For a variant which throws exceptions when the channel is closed, use [[receive]]. * @@ -176,7 +181,12 @@ trait Sink[-T]: * `true` if the value was sent, `false` if there's no space/waiting receiver, or [[ChannelClosed]] when the channel is closed. */ def trySendOrClosed(t: T): Boolean | ChannelClosed = - ChannelClosed.fromJoxTrySendOrClosed(delegate.asInstanceOf[JSink[T]].trySendOrClosed(t)) + val r = delegate.asInstanceOf[JSink[T]].trySendOrClosed(t) + if r == null then true // null = sent + else + ChannelClosed.fromJox(r.asInstanceOf[AnyRef]) match + case c: ChannelClosed => c + case _ => false // sentinel = not sent /** Send a value to the channel. For a variant which throws exceptions when the channel is closed, use [[send]]. * diff --git a/core/src/main/scala/ox/channels/ChannelClosed.scala b/core/src/main/scala/ox/channels/ChannelClosed.scala index d2a64090..d9028451 100644 --- a/core/src/main/scala/ox/channels/ChannelClosed.scala +++ b/core/src/main/scala/ox/channels/ChannelClosed.scala @@ -16,27 +16,7 @@ object ChannelClosed: private[ox] def fromJoxOrUnit(joxResult: AnyRef): Unit | ChannelClosed = if joxResult == null then () else fromJox(joxResult).asInstanceOf[ChannelClosed] - /** Converts the result of jox's `tryReceiveOrClosed()` (which returns `T | JChannelClosed | null`) to `Option[T] | ChannelClosed`. `null` - * means no value was immediately available. - */ - private[ox] def fromJoxTryReceiveOrClosed[T](joxResult: AnyRef): Option[T] | ChannelClosed = - joxResult match - case null => None - case _: JChannelDone => Done - case e: JChannelError => Error(e.cause()) - case v => Some(v.asInstanceOf[T]) - - /** Converts the result of jox's `trySendOrClosed()` (which returns `null | JChannelClosed | sentinel`) to `Boolean | ChannelClosed`. - * `null` means the value was sent, any other non-`ChannelClosed` value (sentinel) means it was not sent. - */ - private[ox] def fromJoxTrySendOrClosed(joxResult: AnyRef): Boolean | ChannelClosed = - joxResult match - case null => true - case _: JChannelDone => Done - case e: JChannelError => Error(e.cause()) - case _ => false - - private def fromJox(joxResult: AnyRef): AnyRef | ChannelClosed = + private[ox] def fromJox(joxResult: AnyRef): AnyRef | ChannelClosed = joxResult match case _: JChannelDone => Done case e: JChannelError => Error(e.cause()) diff --git a/core/src/test/scala/ox/channels/ChannelTryTest.scala b/core/src/test/scala/ox/channels/ChannelTryTest.scala index 8ffb51c7..e361c1b8 100644 --- a/core/src/test/scala/ox/channels/ChannelTryTest.scala +++ b/core/src/test/scala/ox/channels/ChannelTryTest.scala @@ -6,129 +6,107 @@ import org.scalatest.matchers.should.Matchers class ChannelTryTest extends AnyFlatSpec with Matchers: // trySend - "trySend" should "return true when there is space in the buffer" in { + "trySend" should "return true when there is space in the buffer" in: val c = Channel.buffered[Int](2) c.trySend(1) shouldBe true c.trySend(2) shouldBe true - } - it should "return false when the buffer is full" in { + it should "return false when the buffer is full" in: val c = Channel.buffered[Int](1) c.trySend(1) shouldBe true c.trySend(2) shouldBe false - } - it should "return true for an unlimited channel" in { + it should "return true for an unlimited channel" in: val c = Channel.unlimited[Int] c.trySend(1) shouldBe true c.trySend(2) shouldBe true - } - it should "throw ChannelClosedException.Done when the channel is done" in { + it should "throw ChannelClosedException.Done when the channel is done" in: val c = Channel.buffered[Int](2) c.done() - assertThrows[ChannelClosedException.Done] { + assertThrows[ChannelClosedException.Done]: c.trySend(1) - } - } - it should "throw ChannelClosedException.Error when the channel is in error" in { + it should "throw ChannelClosedException.Error when the channel is in error" in: val c = Channel.buffered[Int](2) c.error(new RuntimeException("test")) - assertThrows[ChannelClosedException.Error] { + assertThrows[ChannelClosedException.Error]: c.trySend(1) - } - } // trySendOrClosed - "trySendOrClosed" should "return true when there is space in the buffer" in { + "trySendOrClosed" should "return true when there is space in the buffer" in: val c = Channel.buffered[Int](2) c.trySendOrClosed(1) shouldBe true c.trySendOrClosed(2) shouldBe true - } - it should "return false when the buffer is full" in { + it should "return false when the buffer is full" in: val c = Channel.buffered[Int](1) c.trySendOrClosed(1) shouldBe true c.trySendOrClosed(2) shouldBe false - } - it should "return ChannelClosed.Done when the channel is done" in { + it should "return ChannelClosed.Done when the channel is done" in: val c = Channel.buffered[Int](2) c.done() c.trySendOrClosed(1) shouldBe ChannelClosed.Done - } - it should "return ChannelClosed.Error when the channel is in error" in { + it should "return ChannelClosed.Error when the channel is in error" in: val c = Channel.buffered[Int](2) val ex = new RuntimeException("test") c.error(ex) c.trySendOrClosed(1) should matchPattern { case ChannelClosed.Error(_) => } - } // tryReceive - "tryReceive" should "return Some with the value when one is available" in { + "tryReceive" should "return Some with the value when one is available" in: val c = Channel.buffered[Int](2) c.send(1) c.send(2) c.tryReceive() shouldBe Some(1) c.tryReceive() shouldBe Some(2) - } - it should "return None when the buffer is empty" in { + it should "return None when the buffer is empty" in: val c = Channel.buffered[Int](2) c.tryReceive() shouldBe None - } - it should "throw ChannelClosedException.Done when the channel is done and empty" in { + it should "throw ChannelClosedException.Done when the channel is done and empty" in: val c = Channel.buffered[Int](2) c.done() - assertThrows[ChannelClosedException.Done] { + assertThrows[ChannelClosedException.Done]: c.tryReceive() - } - } - it should "throw ChannelClosedException.Error when the channel is in error" in { + it should "throw ChannelClosedException.Error when the channel is in error" in: val c = Channel.buffered[Int](2) c.send(1) c.error(new RuntimeException("test")) - assertThrows[ChannelClosedException.Error] { + assertThrows[ChannelClosedException.Error]: c.tryReceive() - } - } // tryReceiveOrClosed - "tryReceiveOrClosed" should "return Some with the value when one is available" in { + "tryReceiveOrClosed" should "return Some with the value when one is available" in: val c = Channel.buffered[Int](2) c.send(1) c.tryReceiveOrClosed() shouldBe Some(1) - } - it should "return None when the buffer is empty" in { + it should "return None when the buffer is empty" in: val c = Channel.buffered[Int](2) c.tryReceiveOrClosed() shouldBe None - } - it should "return ChannelClosed.Done when the channel is done and empty" in { + it should "return ChannelClosed.Done when the channel is done and empty" in: val c = Channel.buffered[Int](2) c.done() c.tryReceiveOrClosed() shouldBe ChannelClosed.Done - } - it should "return ChannelClosed.Error when the channel is in error" in { + it should "return ChannelClosed.Error when the channel is in error" in: val c = Channel.buffered[Int](2) c.send(1) c.error(new RuntimeException("test")) c.tryReceiveOrClosed() should matchPattern { case ChannelClosed.Error(_) => } - } - it should "return Some(value) from a done channel that still has buffered values" in { + it should "return Some(value) from a done channel that still has buffered values" in: val c = Channel.buffered[Int](2) c.send(42) c.done() c.tryReceiveOrClosed() shouldBe Some(42) - } end ChannelTryTest diff --git a/core/src/test/scala/ox/resilience/CircuitBreakerTest.scala b/core/src/test/scala/ox/resilience/CircuitBreakerTest.scala index 7ebc5c31..ac84bb82 100644 --- a/core/src/test/scala/ox/resilience/CircuitBreakerTest.scala +++ b/core/src/test/scala/ox/resilience/CircuitBreakerTest.scala @@ -3,11 +3,15 @@ package ox.resilience import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import org.scalatest.OptionValues +import org.scalatest.concurrent.Eventually +import org.scalatest.time.{Millis, Seconds, Span} import scala.concurrent.duration.* import ox.* import org.scalatest.EitherValues -class CircuitBreakerTest extends AnyFlatSpec with Matchers with OptionValues with EitherValues: +class CircuitBreakerTest extends AnyFlatSpec with Matchers with OptionValues with EitherValues with Eventually: + implicit override val patienceConfig: PatienceConfig = + PatienceConfig(timeout = Span(1, Seconds), interval = Span(50, Millis)) behavior of "Circuit Breaker run operations" it should "run operation when metrics are not exceeded" in supervised { @@ -190,11 +194,11 @@ class CircuitBreakerTest extends AnyFlatSpec with Matchers with OptionValues wit // 750ms: the first operation failed, should be open sleep(500.millis) - circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.Open] + eventually { circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.Open] } // 1750ms: first operation failed more than 1s ago, second operation failed less than 1s ago and was ignored sleep(1.second) - circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.HalfOpen] + eventually { circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.HalfOpen] } // 2250ms: more than 1s after the last failing operation, should be now half-open sleep(500.millis) @@ -202,11 +206,11 @@ class CircuitBreakerTest extends AnyFlatSpec with Matchers with OptionValues wit // 3250ms: at 2500ms 1 sec timeout on halfOpen state passes, we go back to open sleep(1.second) - circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.Open] + eventually { circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.Open] } // 3750ms: at 3500ms we go to halfOpen again sleep(1000.millis) - circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.HalfOpen] + eventually { circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.HalfOpen] } } it should "correctly calculate metrics when results come in after state change" in supervised { @@ -246,18 +250,17 @@ class CircuitBreakerTest extends AnyFlatSpec with Matchers with OptionValues wit // 750ms: the first operation failed, should be open sleep(500.millis) - circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.Open] + eventually { circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.Open] } // 1750ms: first operation failed more than 1s ago, second operation failed less than 1s ago and was ignored sleep(1.second) - circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.HalfOpen] + eventually { circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.HalfOpen] } // 2250ms: complete enough operations for halfOpen state - since success should switch back to Closed sleep(500.millis) circuitBreaker.runOrDropEither(Right("c")).discard - sleep(100.millis) // wait for state to register // Should go back to closed, we have one successful operation - circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.Closed] + eventually { circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.Closed] } } end CircuitBreakerTest From dd1c71b523b9c3e4a490310e2f3c858707d96806 Mon Sep 17 00:00:00 2001 From: Adam Warski Date: Mon, 2 Mar 2026 18:08:04 +0000 Subject: [PATCH 4/6] Revert CircuitBreakerTest changes Co-Authored-By: Claude Sonnet 4.6 --- .../ox/resilience/CircuitBreakerTest.scala | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/core/src/test/scala/ox/resilience/CircuitBreakerTest.scala b/core/src/test/scala/ox/resilience/CircuitBreakerTest.scala index ac84bb82..7ebc5c31 100644 --- a/core/src/test/scala/ox/resilience/CircuitBreakerTest.scala +++ b/core/src/test/scala/ox/resilience/CircuitBreakerTest.scala @@ -3,15 +3,11 @@ package ox.resilience import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import org.scalatest.OptionValues -import org.scalatest.concurrent.Eventually -import org.scalatest.time.{Millis, Seconds, Span} import scala.concurrent.duration.* import ox.* import org.scalatest.EitherValues -class CircuitBreakerTest extends AnyFlatSpec with Matchers with OptionValues with EitherValues with Eventually: - implicit override val patienceConfig: PatienceConfig = - PatienceConfig(timeout = Span(1, Seconds), interval = Span(50, Millis)) +class CircuitBreakerTest extends AnyFlatSpec with Matchers with OptionValues with EitherValues: behavior of "Circuit Breaker run operations" it should "run operation when metrics are not exceeded" in supervised { @@ -194,11 +190,11 @@ class CircuitBreakerTest extends AnyFlatSpec with Matchers with OptionValues wit // 750ms: the first operation failed, should be open sleep(500.millis) - eventually { circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.Open] } + circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.Open] // 1750ms: first operation failed more than 1s ago, second operation failed less than 1s ago and was ignored sleep(1.second) - eventually { circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.HalfOpen] } + circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.HalfOpen] // 2250ms: more than 1s after the last failing operation, should be now half-open sleep(500.millis) @@ -206,11 +202,11 @@ class CircuitBreakerTest extends AnyFlatSpec with Matchers with OptionValues wit // 3250ms: at 2500ms 1 sec timeout on halfOpen state passes, we go back to open sleep(1.second) - eventually { circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.Open] } + circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.Open] // 3750ms: at 3500ms we go to halfOpen again sleep(1000.millis) - eventually { circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.HalfOpen] } + circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.HalfOpen] } it should "correctly calculate metrics when results come in after state change" in supervised { @@ -250,17 +246,18 @@ class CircuitBreakerTest extends AnyFlatSpec with Matchers with OptionValues wit // 750ms: the first operation failed, should be open sleep(500.millis) - eventually { circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.Open] } + circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.Open] // 1750ms: first operation failed more than 1s ago, second operation failed less than 1s ago and was ignored sleep(1.second) - eventually { circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.HalfOpen] } + circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.HalfOpen] // 2250ms: complete enough operations for halfOpen state - since success should switch back to Closed sleep(500.millis) circuitBreaker.runOrDropEither(Right("c")).discard + sleep(100.millis) // wait for state to register // Should go back to closed, we have one successful operation - eventually { circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.Closed] } + circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.Closed] } end CircuitBreakerTest From 7e07fff939bc709f68a9d4d05e3545deb4cfa434 Mon Sep 17 00:00:00 2001 From: Adam Warski Date: Mon, 2 Mar 2026 21:50:04 +0000 Subject: [PATCH 5/6] Fix flaky FlowOpsMapParTest: reliably test fork cancellation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The test relied on a non-fair semaphore giving permits to specific elements (3 and 4) in a predictable order, which wasn't guaranteed. With the inProgress buffer holding all forks, any of the waiting forks could acquire the semaphore — including element 4 before elements 1 and 2 run — causing failures. Redesign: use exactly 2 elements with mapPar(2). Since there are only 2 forks competing for 2 permits, both always start concurrently with no non-determinism. Element 2 fails after 100ms, giving a 900ms window to cancel element 1 (sleeping 1s). Co-Authored-By: Claude Sonnet 4.6 --- .../test/scala/ox/flow/FlowOpsMapParTest.scala | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/ox/flow/FlowOpsMapParTest.scala b/core/src/test/scala/ox/flow/FlowOpsMapParTest.scala index 81626286..da8fa326 100644 --- a/core/src/test/scala/ox/flow/FlowOpsMapParTest.scala +++ b/core/src/test/scala/ox/flow/FlowOpsMapParTest.scala @@ -87,31 +87,32 @@ class FlowOpsMapParTest extends AnyFlatSpec with Matchers with Eventually: it should "cancel other running forks when there's an error" in supervised: // given val trail = Trail() - val flow = Flow.fromIterable(1 to 10) + // Using exactly 2 elements with mapPar(2) so both always start concurrently — the semaphore has + // 2 permits and there are only 2 forks, so both acquire immediately with no competition. + // Element 2 fails after a short delay; element 1 should be cancelled during its long sleep. // when - val s2 = flow + val s2 = Flow.fromIterable(List(1, 2)) .mapPar(2): i => - if i == 4 then + if i == 2 then sleep(100.millis) trail.add("exception") throw new Exception("boom") else - sleep(200.millis) + sleep(1000.millis) trail.add(s"done") i * 2 .runToChannel() // then - s2.receive() shouldBe 2 - s2.receive() shouldBe 4 s2.receiveOrClosed() should matchPattern { case ChannelClosed.Error(ChannelClosedException.Error(reason)) if reason.getMessage == "boom" => } - // checking if the forks aren't left running + // checking if the forks aren't left running; element 1 runs concurrently with the failing element 2 + // and should be cancelled before its 1s sleep completes (900ms cancellation window) sleep(200.millis) - trail.get shouldBe Vector("done", "done", "exception") // TODO: 3 isn't cancelled because it's already taken off the queue + trail.get shouldBe Vector("exception") // element 1 cancelled (no "done"), element 2 added "exception" // Edge Cases it should "handle empty flow" in supervised: From 24c0531d4aece4bebda6cf67da9b818e186fa4a4 Mon Sep 17 00:00:00 2001 From: Adam Warski Date: Tue, 3 Mar 2026 10:43:23 +0000 Subject: [PATCH 6/6] scalafmt Co-Authored-By: Claude Sonnet 4.6 --- core/src/test/scala/ox/flow/FlowOpsMapParTest.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/ox/flow/FlowOpsMapParTest.scala b/core/src/test/scala/ox/flow/FlowOpsMapParTest.scala index da8fa326..73382530 100644 --- a/core/src/test/scala/ox/flow/FlowOpsMapParTest.scala +++ b/core/src/test/scala/ox/flow/FlowOpsMapParTest.scala @@ -92,7 +92,8 @@ class FlowOpsMapParTest extends AnyFlatSpec with Matchers with Eventually: // Element 2 fails after a short delay; element 1 should be cancelled during its long sleep. // when - val s2 = Flow.fromIterable(List(1, 2)) + val s2 = Flow + .fromIterable(List(1, 2)) .mapPar(2): i => if i == 2 then sleep(100.millis)