From d00666c96266007e78241fecbff7089e0980bb9c Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 30 Jun 2026 14:36:00 +0200 Subject: [PATCH] 4.x: Add VirtualSchedulerTests extends AbstractSchedulerConcurrencyTests --- .../schedulers/VirtualSchedulerTests.java | 278 ++++++++++++++++++ 1 file changed, 278 insertions(+) create mode 100644 src/test/java/io/reactivex/rxjava4/schedulers/VirtualSchedulerTests.java diff --git a/src/test/java/io/reactivex/rxjava4/schedulers/VirtualSchedulerTests.java b/src/test/java/io/reactivex/rxjava4/schedulers/VirtualSchedulerTests.java new file mode 100644 index 0000000000..6deba8a512 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava4/schedulers/VirtualSchedulerTests.java @@ -0,0 +1,278 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.schedulers; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.HashMap; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; + +import io.reactivex.rxjava4.core.*; +import io.reactivex.rxjava4.core.Scheduler.Worker; +import io.reactivex.rxjava4.disposables.Disposable; +import io.reactivex.rxjava4.internal.schedulers.ComputationScheduler; +import io.reactivex.rxjava4.plugins.RxJavaPlugins; +import io.reactivex.rxjava4.testsupport.SuppressUndeliverable; + +public class VirtualSchedulerTests extends AbstractSchedulerConcurrencyTests { + + @Override + protected Scheduler getScheduler() { + return Schedulers.virtual(); + } + + @Test + public void threadSafetyWhenSchedulerIsHoppingBetweenThreads() { + + final int NUM = 1000000; + final CountDownLatch latch = new CountDownLatch(1); + final HashMap map = new HashMap<>(); + + final Scheduler.Worker inner = Schedulers.computation().createWorker(); + + try { + inner.schedule(new Runnable() /* NFI */ { + + private HashMap statefulMap = map; + int nonThreadSafeCounter; + + @Override + public void run() { + Integer i = statefulMap.get("a"); + if (i == null) { + i = 1; + statefulMap.put("a", i); + statefulMap.put("b", i); + } else { + i++; + statefulMap.put("a", i); + statefulMap.put("b", i); + } + nonThreadSafeCounter++; + statefulMap.put("nonThreadSafeCounter", nonThreadSafeCounter); + if (i < NUM) { + inner.schedule(this); + } else { + latch.countDown(); + } + } + }); + + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + System.out.println("Count A: " + map.get("a")); + System.out.println("Count B: " + map.get("b")); + System.out.println("nonThreadSafeCounter: " + map.get("nonThreadSafeCounter")); + + assertEquals(NUM, map.get("a").intValue()); + assertEquals(NUM, map.get("b").intValue()); + assertEquals(NUM, map.get("nonThreadSafeCounter").intValue()); + } finally { + inner.dispose(); + } + } + + @Test + public final void computationThreadPool1() { + Flowable f1 = Flowable. just(1, 2, 3, 4, 5); + Flowable f2 = Flowable. just(6, 7, 8, 9, 10); + Flowable f = Flowable. mergeArray(f1, f2) + .map(t -> { + assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + }); + + f.subscribeOn(Schedulers.computation()).blockingForEach(t -> System.out.println("t: " + t)); + } + + @Test + public final void mergeWithExecutorScheduler() { + + final String currentThreadName = Thread.currentThread().getName(); + + Flowable f1 = Flowable. just(1, 2, 3, 4, 5); + Flowable f2 = Flowable. just(6, 7, 8, 9, 10); + Flowable f = Flowable. mergeArray(f1, f2) + .subscribeOn(Schedulers.computation()) + .map(t -> { + assertNotEquals(Thread.currentThread().getName(), currentThreadName); + assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + }); + + f.blockingForEach(t -> System.out.println("t: " + t)); + } + + @Test + public final void handledErrorIsNotDeliveredToThreadHandler() throws InterruptedException { + SchedulerTestHelper.handledErrorIsNotDeliveredToThreadHandler(getScheduler()); + } + + @Test + public void cancelledTaskRetention() throws InterruptedException { + Worker w = Schedulers.computation().createWorker(); + try { + ExecutorSchedulerTest.cancelledRetention(w, false); + } finally { + w.dispose(); + } + w = Schedulers.computation().createWorker(); + try { + ExecutorSchedulerTest.cancelledRetention(w, true); + } finally { + w.dispose(); + } + } + + @Test + @SuppressUndeliverable + public void shutdownRejects() { + final int[] calls = { 0 }; + + Runnable r = () -> calls[0]++; + + Scheduler s = new ComputationScheduler(); + s.shutdown(); + s.shutdown(); + + assertEquals(Disposable.disposed(), s.scheduleDirect(r)); + + assertEquals(Disposable.disposed(), s.scheduleDirect(r, 1, TimeUnit.SECONDS)); + + assertEquals(Disposable.disposed(), s.schedulePeriodicallyDirect(r, 1, 1, TimeUnit.SECONDS)); + + Worker w = s.createWorker(); + w.dispose(); + + assertTrue(w.isDisposed()); + + assertEquals(Disposable.disposed(), w.schedule(r)); + + assertEquals(Disposable.disposed(), w.schedule(r, 1, TimeUnit.SECONDS)); + + assertEquals(Disposable.disposed(), w.schedulePeriodically(r, 1, 1, TimeUnit.SECONDS)); + + assertEquals(0, calls[0]); + } + + @Test + public void exceptionFromObservableShouldNotBeSwallowed() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + + // #3 thread's uncaught exception handler + Scheduler computationScheduler = new ComputationScheduler(r -> { + Thread t = new Thread(r); + t.setUncaughtExceptionHandler((_, _) -> latch.countDown()); + return t; + }); + + // #2 RxJava exception handler + RxJavaPlugins.setErrorHandler(_ -> latch.countDown()); + + // Exceptions, fatal or not, should be handled by + // #1 observer's onError(), or + // #2 RxJava exception handler, or + // #3 thread's uncaught exception handler, + // and should not be swallowed. + try { + + // #1 observer's onError() + Observable.create(s -> { + + s.onNext(1); + throw new OutOfMemoryError(); + }) + .subscribeOn(computationScheduler) + .subscribe(_ -> { }, + _ -> latch.countDown() + ); + + assertTrue(latch.await(2, TimeUnit.SECONDS)); + } finally { + RxJavaPlugins.reset(); + computationScheduler.shutdown(); + } + } + + @Test + public void exceptionFromObserverShouldNotBeSwallowed() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + + // #3 thread's uncaught exception handler + Scheduler computationScheduler = new ComputationScheduler(r -> { + Thread t = new Thread(r); + t.setUncaughtExceptionHandler((_, _) -> latch.countDown()); + return t; + }); + + // #2 RxJava exception handler + RxJavaPlugins.setErrorHandler(_ -> latch.countDown()); + + // Exceptions, fatal or not, should be handled by + // #1 observer's onError(), or + // #2 RxJava exception handler, or + // #3 thread's uncaught exception handler, + // and should not be swallowed. + try { + + // #1 observer's onError() + Flowable.interval(500, TimeUnit.MILLISECONDS, computationScheduler) + .subscribe(_ -> { + throw new OutOfMemoryError(); + }, _ -> latch.countDown()); + + assertTrue(latch.await(2, TimeUnit.SECONDS)); + } finally { + RxJavaPlugins.reset(); + computationScheduler.shutdown(); + } + } + + @Test + @SuppressUndeliverable + public void periodicTaskShouldStopOnError() throws Exception { + AtomicInteger repeatCount = new AtomicInteger(); + + Schedulers.computation().schedulePeriodicallyDirect(() -> { + repeatCount.incrementAndGet(); + throw new OutOfMemoryError(); + }, 0, 1, TimeUnit.MILLISECONDS); + + Thread.sleep(200); + + assertEquals(1, repeatCount.get()); + } + + @Test + @SuppressUndeliverable + public void periodicTaskShouldStopOnError2() throws Exception { + AtomicInteger repeatCount = new AtomicInteger(); + + Schedulers.computation().schedulePeriodicallyDirect(() -> { + repeatCount.incrementAndGet(); + throw new OutOfMemoryError(); + }, 0, 1, TimeUnit.NANOSECONDS); + + Thread.sleep(200); + + assertEquals(1, repeatCount.get()); + } +}