From b1942e174741e7aa103bcace7a08dcfd5a72e58e Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 30 Jun 2026 11:40:53 +0200 Subject: [PATCH] 4.x: Remove Pioneer-RetryTest as it is itself flaky? --- build.gradle | 3 - .../rxjava4/core/RxJavaSelfTest.java | 73 + .../io/reactivex/rxjava4/core/RxJavaTest.java | 26 +- .../reactivex/rxjava4/core/XFlatMapTest.java | 1301 +++++++++-------- 4 files changed, 763 insertions(+), 640 deletions(-) create mode 100644 src/test/java/io/reactivex/rxjava4/core/RxJavaSelfTest.java diff --git a/build.gradle b/build.gradle index 4906896d57..00c64472ac 100644 --- a/build.gradle +++ b/build.gradle @@ -22,7 +22,6 @@ ext { guavaVersion = "33.6.0-jre" jupiterVersion = "6.1.1" jupiterLauncherVersion = "6.1.1" - junitPioneerVersion = "2.3.0" } def releaseTag = System.getenv("BUILD_TAG") @@ -55,8 +54,6 @@ dependencies { testImplementation "org.junit.platform:junit-platform-launcher:$jupiterLauncherVersion" testRuntimeOnly "org.junit.platform:junit-platform-launcher:$jupiterLauncherVersion" // already have this - - testImplementation "org.junit-pioneer:junit-pioneer:$junitPioneerVersion" } // === Experimental JDK handling for Outreach Program === diff --git a/src/test/java/io/reactivex/rxjava4/core/RxJavaSelfTest.java b/src/test/java/io/reactivex/rxjava4/core/RxJavaSelfTest.java new file mode 100644 index 0000000000..bdbf6372f9 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava4/core/RxJavaSelfTest.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.core; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; + +import io.reactivex.rxjava4.exceptions.TestException; + +public final class RxJavaSelfTest extends RxJavaTest { + + @Test + public void withRetrySuccess() { + var counter = new AtomicInteger(); + withRetry(3, () -> { + counter.incrementAndGet(); + }); + + assertEquals(1, counter.get()); + } + + @Test + public void withRetryFailOnce() { + var counter = new AtomicInteger(); + withRetry(3, () -> { + if (counter.getAndIncrement() == 0) { + throw new TestException("Failed index " + counter.get()); + } + }); + + assertEquals(2, counter.get()); + } + + @Test + public void withRetryFailTwice() { + var counter = new AtomicInteger(); + withRetry(3, () -> { + if (counter.getAndIncrement() < 2) { + throw new TestException("Failed index " + counter.get()); + } + }); + + assertEquals(3, counter.get()); + } + + @Test + public void withRetryTruce() { + var counter = new AtomicInteger(); + + assertThrows(AssertionError.class, () -> { + withRetry(3, () -> { + counter.getAndIncrement(); + throw new TestException("Failed index " + counter.get()); + }); + }); + + assertEquals(3, counter.get()); + } +} diff --git a/src/test/java/io/reactivex/rxjava4/core/RxJavaTest.java b/src/test/java/io/reactivex/rxjava4/core/RxJavaTest.java index 0e78c15cb1..b28b9b5e97 100644 --- a/src/test/java/io/reactivex/rxjava4/core/RxJavaTest.java +++ b/src/test/java/io/reactivex/rxjava4/core/RxJavaTest.java @@ -18,8 +18,9 @@ import org.junit.jupiter.api.*; import io.reactivex.rxjava4.exceptions.UndeliverableException; +import io.reactivex.rxjava4.functions.Action; import io.reactivex.rxjava4.plugins.RxJavaPlugins; -import io.reactivex.rxjava4.testsupport.*; +import io.reactivex.rxjava4.testsupport.SuppressUndeliverable; @Timeout(value = 5, unit = TimeUnit.MINUTES) public abstract class RxJavaTest { @@ -53,4 +54,27 @@ public void afterEach(TestInfo info) { RxJavaPlugins.setErrorHandler(null); } + /** + * Wrap your test body into this retry lambda-based callback to retry flaky tests + * that usually depend on Thread.sleep consistency. + * @param count the number of times to retry + * @param code the code to run + */ + public static void withRetry(int count, Action code) { + AssertionError error = null; + while (count-- > 0) { + try { + code.run(); + return; + } catch (Throwable ex) { + if (error == null) { + error = new AssertionError("withRetry failures"); + } + error.addSuppressed(ex); + } + } + if (error != null) { + throw error; + } + } } diff --git a/src/test/java/io/reactivex/rxjava4/core/XFlatMapTest.java b/src/test/java/io/reactivex/rxjava4/core/XFlatMapTest.java index 22cc085a90..1b4feb040a 100644 --- a/src/test/java/io/reactivex/rxjava4/core/XFlatMapTest.java +++ b/src/test/java/io/reactivex/rxjava4/core/XFlatMapTest.java @@ -20,7 +20,6 @@ import java.util.concurrent.Flow.Publisher; import org.junit.jupiter.api.Test; -import org.junitpioneer.jupiter.RetryingTest; import io.reactivex.rxjava4.exceptions.TestException; import io.reactivex.rxjava4.functions.*; @@ -66,919 +65,949 @@ void beforeCancelSleep(TestObserver to) throws Exception { } @Test - @RetryingTest(3) public void flowableFlowable() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestSubscriber ts = Flowable.just(1) - .subscribeOn(Schedulers.cached()) - .flatMap((Function>) _ -> { - sleep(); - return Flowable.error(new TestException()); - }) - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestSubscriber ts = Flowable.just(1) + .subscribeOn(Schedulers.cached()) + .flatMap((Function>) _ -> { + sleep(); + return Flowable.error(new TestException()); + }) + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(ts); + beforeCancelSleep(ts); - ts.cancel(); + ts.cancel(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - ts.assertEmpty(); + ts.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void flowableSingle() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestSubscriber ts = Flowable.just(1) - .subscribeOn(Schedulers.cached()) - .flatMapSingle(_ -> { - sleep(); - return Single.error(new TestException()); - }) - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestSubscriber ts = Flowable.just(1) + .subscribeOn(Schedulers.cached()) + .flatMapSingle(_ -> { + sleep(); + return Single.error(new TestException()); + }) + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(ts); + beforeCancelSleep(ts); - ts.cancel(); + ts.cancel(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - ts.assertEmpty(); + ts.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void flowableMaybe() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestSubscriber ts = Flowable.just(1) - .subscribeOn(Schedulers.cached()) - .flatMapMaybe((Function>) _ -> { - sleep(); - return Maybe.error(new TestException()); - }) - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestSubscriber ts = Flowable.just(1) + .subscribeOn(Schedulers.cached()) + .flatMapMaybe((Function>) _ -> { + sleep(); + return Maybe.error(new TestException()); + }) + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(ts); + beforeCancelSleep(ts); - ts.cancel(); + ts.cancel(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - ts.assertEmpty(); + ts.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void flowableCompletable() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestObserver to = Flowable.just(1) - .subscribeOn(Schedulers.cached()) - .flatMapCompletable((Function) _ -> { - sleep(); - return Completable.error(new TestException()); - }) - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Flowable.just(1) + .subscribeOn(Schedulers.cached()) + .flatMapCompletable((Function) _ -> { + sleep(); + return Completable.error(new TestException()); + }) + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(to); + beforeCancelSleep(to); - to.dispose(); + to.dispose(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - to.assertEmpty(); + to.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void flowableCompletable2() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestSubscriber ts = Flowable.just(1) - .subscribeOn(Schedulers.cached()) - .flatMapCompletable((Function) _ -> { - sleep(); - return Completable.error(new TestException()); - }) - .toFlowable() - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestSubscriber ts = Flowable.just(1) + .subscribeOn(Schedulers.cached()) + .flatMapCompletable((Function) _ -> { + sleep(); + return Completable.error(new TestException()); + }) + .toFlowable() + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(ts); + beforeCancelSleep(ts); - ts.cancel(); + ts.cancel(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - ts.assertEmpty(); + ts.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void observableObservable() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestObserver to = Observable.just(1) - .subscribeOn(Schedulers.cached()) - .flatMap((Function>) _ -> { - sleep(); - return Observable.error(new TestException()); - }) - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Observable.just(1) + .subscribeOn(Schedulers.cached()) + .flatMap((Function>) _ -> { + sleep(); + return Observable.error(new TestException()); + }) + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(to); + beforeCancelSleep(to); - to.dispose(); + to.dispose(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - to.assertEmpty(); + to.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void observerSingle() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestObserver to = Observable.just(1) - .subscribeOn(Schedulers.cached()) - .flatMapSingle((Function>) _ -> { - sleep(); - return Single.error(new TestException()); - }) - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Observable.just(1) + .subscribeOn(Schedulers.cached()) + .flatMapSingle((Function>) _ -> { + sleep(); + return Single.error(new TestException()); + }) + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(to); + beforeCancelSleep(to); - to.dispose(); + to.dispose(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - to.assertEmpty(); + to.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void observerMaybe() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestObserver to = Observable.just(1) - .subscribeOn(Schedulers.cached()) - .flatMapMaybe((Function>) _ -> { - sleep(); - return Maybe.error(new TestException()); - }) - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Observable.just(1) + .subscribeOn(Schedulers.cached()) + .flatMapMaybe((Function>) _ -> { + sleep(); + return Maybe.error(new TestException()); + }) + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(to); + beforeCancelSleep(to); - to.dispose(); + to.dispose(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - to.assertEmpty(); + to.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void observerCompletable() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestObserver to = Observable.just(1) - .subscribeOn(Schedulers.cached()) - .flatMapCompletable((Function) _ -> { - sleep(); - return Completable.error(new TestException()); - }) - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Observable.just(1) + .subscribeOn(Schedulers.cached()) + .flatMapCompletable((Function) _ -> { + sleep(); + return Completable.error(new TestException()); + }) + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(to); + beforeCancelSleep(to); - to.dispose(); + to.dispose(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - to.assertEmpty(); + to.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void observerCompletable2() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestObserver to = Observable.just(1) - .subscribeOn(Schedulers.cached()) - .flatMapCompletable((Function) _ -> { - sleep(); - return Completable.error(new TestException()); - }) - .toObservable() - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Observable.just(1) + .subscribeOn(Schedulers.cached()) + .flatMapCompletable((Function) _ -> { + sleep(); + return Completable.error(new TestException()); + }) + .toObservable() + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(to); + beforeCancelSleep(to); - to.dispose(); + to.dispose(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - to.assertEmpty(); + to.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void singleSingle() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestObserver to = Single.just(1) - .subscribeOn(Schedulers.cached()) - .flatMap((Function>) _ -> { - sleep(); - return Single.error(new TestException()); - }) - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Single.just(1) + .subscribeOn(Schedulers.cached()) + .flatMap((Function>) _ -> { + sleep(); + return Single.error(new TestException()); + }) + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(to); + beforeCancelSleep(to); - to.dispose(); + to.dispose(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - to.assertEmpty(); + to.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void singleMaybe() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestObserver to = Single.just(1) - .subscribeOn(Schedulers.cached()) - .flatMapMaybe((Function>) _ -> { - sleep(); - return Maybe.error(new TestException()); - }) - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Single.just(1) + .subscribeOn(Schedulers.cached()) + .flatMapMaybe((Function>) _ -> { + sleep(); + return Maybe.error(new TestException()); + }) + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(to); + beforeCancelSleep(to); - to.dispose(); + to.dispose(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - to.assertEmpty(); + to.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void singleCompletable() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestObserver to = Single.just(1) - .subscribeOn(Schedulers.cached()) - .flatMapCompletable((Function) _ -> { - sleep(); - return Completable.error(new TestException()); - }) - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Single.just(1) + .subscribeOn(Schedulers.cached()) + .flatMapCompletable((Function) _ -> { + sleep(); + return Completable.error(new TestException()); + }) + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(to); + beforeCancelSleep(to); - to.dispose(); + to.dispose(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - to.assertEmpty(); + to.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void singleCompletable2() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestObserver to = Single.just(1) - .subscribeOn(Schedulers.cached()) - .flatMapCompletable((Function) _ -> { - sleep(); - return Completable.error(new TestException()); - }) - .toSingleDefault(0) - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Single.just(1) + .subscribeOn(Schedulers.cached()) + .flatMapCompletable((Function) _ -> { + sleep(); + return Completable.error(new TestException()); + }) + .toSingleDefault(0) + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(to); + beforeCancelSleep(to); - to.dispose(); + to.dispose(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - to.assertEmpty(); + to.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void singlePublisher() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestSubscriber ts = Single.just(1) - .subscribeOn(Schedulers.cached()) - .flatMapPublisher((Function>) _ -> { - sleep(); - return Flowable.error(new TestException()); - }) - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestSubscriber ts = Single.just(1) + .subscribeOn(Schedulers.cached()) + .flatMapPublisher((Function>) _ -> { + sleep(); + return Flowable.error(new TestException()); + }) + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(ts); + beforeCancelSleep(ts); - ts.cancel(); + ts.cancel(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - ts.assertEmpty(); + ts.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void singleCombiner() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestObserver to = Single.just(1) - .subscribeOn(Schedulers.cached()) - .flatMap((Function>) _ -> { - sleep(); - return Single.error(new TestException()); - }, Integer::sum) - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Single.just(1) + .subscribeOn(Schedulers.cached()) + .flatMap((Function>) _ -> { + sleep(); + return Single.error(new TestException()); + }, Integer::sum) + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(to); + beforeCancelSleep(to); - to.dispose(); + to.dispose(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - to.assertEmpty(); + to.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void singleObservable() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestObserver to = Single.just(1) - .subscribeOn(Schedulers.cached()) - .flatMapObservable((Function>) _ -> { - sleep(); - return Observable.error(new TestException()); - }) - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Single.just(1) + .subscribeOn(Schedulers.cached()) + .flatMapObservable((Function>) _ -> { + sleep(); + return Observable.error(new TestException()); + }) + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(to); + beforeCancelSleep(to); - to.dispose(); + to.dispose(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - to.assertEmpty(); + to.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void singleNotificationSuccess() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestObserver to = Single.just(1) - .subscribeOn(Schedulers.cached()) - .flatMap( - (Function>) _ -> { - sleep(); - return Single.error(new TestException()); - }, - (Function>) _ -> { - sleep(); - return Single.error(new TestException()); - } - ) - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Single.just(1) + .subscribeOn(Schedulers.cached()) + .flatMap( + (Function>) _ -> { + sleep(); + return Single.error(new TestException()); + }, + (Function>) _ -> { + sleep(); + return Single.error(new TestException()); + } + ) + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(to); + beforeCancelSleep(to); - to.dispose(); + to.dispose(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - to.assertEmpty(); + to.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void singleNotificationError() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestObserver to = Single.error(new TestException()) - .subscribeOn(Schedulers.cached()) - .flatMap( - (Function>) _ -> { - sleep(); - return Single.error(new TestException()); - }, - (Function>) _ -> { - sleep(); - return Single.error(new TestException()); - } - ) - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Single.error(new TestException()) + .subscribeOn(Schedulers.cached()) + .flatMap( + (Function>) _ -> { + sleep(); + return Single.error(new TestException()); + }, + (Function>) _ -> { + sleep(); + return Single.error(new TestException()); + } + ) + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(to); + beforeCancelSleep(to); - to.dispose(); + to.dispose(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - to.assertEmpty(); + to.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void maybeSingle() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestObserver to = Maybe.just(1) - .subscribeOn(Schedulers.cached()) - .flatMapSingle((Function>) _ -> { - sleep(); - return Single.error(new TestException()); - }) - .toSingle() - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Maybe.just(1) + .subscribeOn(Schedulers.cached()) + .flatMapSingle((Function>) _ -> { + sleep(); + return Single.error(new TestException()); + }) + .toSingle() + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(to); + beforeCancelSleep(to); - to.dispose(); + to.dispose(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - to.assertEmpty(); + to.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void maybeSingle2() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestObserver to = Maybe.just(1) - .subscribeOn(Schedulers.cached()) - .flatMapSingle((Function>) _ -> { - sleep(); - return Single.error(new TestException()); - }) - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Maybe.just(1) + .subscribeOn(Schedulers.cached()) + .flatMapSingle((Function>) _ -> { + sleep(); + return Single.error(new TestException()); + }) + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(to); + beforeCancelSleep(to); - to.dispose(); + to.dispose(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - to.assertEmpty(); + to.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void maybeMaybe() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestObserver to = Maybe.just(1) - .subscribeOn(Schedulers.cached()) - .flatMap((Function>) _ -> { - sleep(); - return Maybe.error(new TestException()); - }) - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Maybe.just(1) + .subscribeOn(Schedulers.cached()) + .flatMap((Function>) _ -> { + sleep(); + return Maybe.error(new TestException()); + }) + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(to); + beforeCancelSleep(to); - to.dispose(); + to.dispose(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - to.assertEmpty(); + to.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void maybePublisher() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestSubscriber ts = Maybe.just(1) - .subscribeOn(Schedulers.cached()) - .flatMapPublisher((Function>) _ -> { - sleep(); - return Flowable.error(new TestException()); - }) - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestSubscriber ts = Maybe.just(1) + .subscribeOn(Schedulers.cached()) + .flatMapPublisher((Function>) _ -> { + sleep(); + return Flowable.error(new TestException()); + }) + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(ts); + beforeCancelSleep(ts); - ts.cancel(); + ts.cancel(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - ts.assertEmpty(); + ts.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void maybeObservable() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestObserver to = Maybe.just(1) - .subscribeOn(Schedulers.cached()) - .flatMapObservable((Function>) _ -> { - sleep(); - return Observable.error(new TestException()); - }) - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Maybe.just(1) + .subscribeOn(Schedulers.cached()) + .flatMapObservable((Function>) _ -> { + sleep(); + return Observable.error(new TestException()); + }) + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(to); + beforeCancelSleep(to); - to.dispose(); + to.dispose(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - to.assertEmpty(); + to.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void maybeNotificationSuccess() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestObserver to = Maybe.just(1) - .subscribeOn(Schedulers.cached()) - .flatMap( - (Function>) _ -> { - sleep(); - return Maybe.error(new TestException()); - }, - (Function>) _ -> { - sleep(); - return Maybe.error(new TestException()); - }, - (Supplier>) () -> { - sleep(); - return Maybe.error(new TestException()); - } - ) - .test(); - - cb.await(); - - beforeCancelSleep(to); - - to.dispose(); - - Thread.sleep(SLEEP_AFTER_CANCEL); - - to.assertEmpty(); - - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Maybe.just(1) + .subscribeOn(Schedulers.cached()) + .flatMap( + (Function>) _ -> { + sleep(); + return Maybe.error(new TestException()); + }, + (Function>) _ -> { + sleep(); + return Maybe.error(new TestException()); + }, + (Supplier>) () -> { + sleep(); + return Maybe.error(new TestException()); + } + ) + .test(); + + cb.await(); + + beforeCancelSleep(to); + + to.dispose(); + + Thread.sleep(SLEEP_AFTER_CANCEL); + + to.assertEmpty(); + + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void maybeNotificationError() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestObserver to = Maybe.error(new TestException()) - .subscribeOn(Schedulers.cached()) - .flatMap( - (Function>) _ -> { - sleep(); - return Maybe.error(new TestException()); - }, - (Function>) _ -> { - sleep(); - return Maybe.error(new TestException()); - }, - (Supplier>) () -> { - sleep(); - return Maybe.error(new TestException()); - } - ) - .test(); - - cb.await(); - - beforeCancelSleep(to); - - to.dispose(); - - Thread.sleep(SLEEP_AFTER_CANCEL); - - to.assertEmpty(); - - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Maybe.error(new TestException()) + .subscribeOn(Schedulers.cached()) + .flatMap( + (Function>) _ -> { + sleep(); + return Maybe.error(new TestException()); + }, + (Function>) _ -> { + sleep(); + return Maybe.error(new TestException()); + }, + (Supplier>) () -> { + sleep(); + return Maybe.error(new TestException()); + } + ) + .test(); + + cb.await(); + + beforeCancelSleep(to); + + to.dispose(); + + Thread.sleep(SLEEP_AFTER_CANCEL); + + to.assertEmpty(); + + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void maybeNotificationEmpty() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestObserver to = Maybe.empty() - .subscribeOn(Schedulers.cached()) - .flatMap( - (Function>) _ -> { - sleep(); - return Maybe.error(new TestException()); - }, - (Function>) _ -> { - sleep(); - return Maybe.error(new TestException()); - }, - (Supplier>) () -> { - sleep(); - return Maybe.error(new TestException()); - } - ) - .test(); - - cb.await(); - - beforeCancelSleep(to); - - to.dispose(); - - Thread.sleep(SLEEP_AFTER_CANCEL); - - to.assertEmpty(); - - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Maybe.empty() + .subscribeOn(Schedulers.cached()) + .flatMap( + (Function>) _ -> { + sleep(); + return Maybe.error(new TestException()); + }, + (Function>) _ -> { + sleep(); + return Maybe.error(new TestException()); + }, + (Supplier>) () -> { + sleep(); + return Maybe.error(new TestException()); + } + ) + .test(); + + cb.await(); + + beforeCancelSleep(to); + + to.dispose(); + + Thread.sleep(SLEEP_AFTER_CANCEL); + + to.assertEmpty(); + + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void maybeCombiner() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestObserver to = Maybe.just(1) - .subscribeOn(Schedulers.cached()) - .flatMap((Function>) _ -> { - sleep(); - return Maybe.error(new TestException()); - }, Integer::sum) - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Maybe.just(1) + .subscribeOn(Schedulers.cached()) + .flatMap((Function>) _ -> { + sleep(); + return Maybe.error(new TestException()); + }, Integer::sum) + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(to); + beforeCancelSleep(to); - to.dispose(); + to.dispose(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - to.assertEmpty(); + to.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void maybeCompletable() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestObserver to = Maybe.just(1) - .subscribeOn(Schedulers.cached()) - .flatMapCompletable((Function) _ -> { - sleep(); - return Completable.error(new TestException()); - }) - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Maybe.just(1) + .subscribeOn(Schedulers.cached()) + .flatMapCompletable((Function) _ -> { + sleep(); + return Completable.error(new TestException()); + }) + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(to); + beforeCancelSleep(to); - to.dispose(); + to.dispose(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - to.assertEmpty(); + to.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } @Test - @RetryingTest(3) public void maybeCompletable2() throws Exception { - List errors = TestHelper.trackPluginErrors(); - try { - TestObserver to = Maybe.just(1) - .subscribeOn(Schedulers.cached()) - .flatMapCompletable((Function) _ -> { - sleep(); - return Completable.error(new TestException()); - }) - .toMaybe() - .test(); + withRetry(3, () -> { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Maybe.just(1) + .subscribeOn(Schedulers.cached()) + .flatMapCompletable((Function) _ -> { + sleep(); + return Completable.error(new TestException()); + }) + .toMaybe() + .test(); - cb.await(); + cb.await(); - beforeCancelSleep(to); + beforeCancelSleep(to); - to.dispose(); + to.dispose(); - Thread.sleep(SLEEP_AFTER_CANCEL); + Thread.sleep(SLEEP_AFTER_CANCEL); - to.assertEmpty(); + to.assertEmpty(); - assertTrue(errors.isEmpty(), errors.toString()); - } finally { - RxJavaPlugins.reset(); - } + assertTrue(errors.isEmpty(), errors.toString()); + } finally { + RxJavaPlugins.reset(); + } + }); } }