Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava4.schedulers;

import static org.junit.jupiter.api.Assertions.*;

import java.util.HashMap;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.Test;

import io.reactivex.rxjava4.core.*;
import io.reactivex.rxjava4.core.Scheduler.Worker;
import io.reactivex.rxjava4.disposables.Disposable;
import io.reactivex.rxjava4.internal.schedulers.ComputationScheduler;
import io.reactivex.rxjava4.plugins.RxJavaPlugins;
import io.reactivex.rxjava4.testsupport.SuppressUndeliverable;

public class VirtualSchedulerTests extends AbstractSchedulerConcurrencyTests {

@Override
protected Scheduler getScheduler() {
return Schedulers.virtual();
}

@Test
public void threadSafetyWhenSchedulerIsHoppingBetweenThreads() {

final int NUM = 1000000;
final CountDownLatch latch = new CountDownLatch(1);
final HashMap<String, Integer> map = new HashMap<>();

final Scheduler.Worker inner = Schedulers.computation().createWorker();

try {
inner.schedule(new Runnable() /* NFI */ {

private HashMap<String, Integer> statefulMap = map;
int nonThreadSafeCounter;

@Override
public void run() {
Integer i = statefulMap.get("a");
if (i == null) {
i = 1;
statefulMap.put("a", i);
statefulMap.put("b", i);
} else {
i++;
statefulMap.put("a", i);
statefulMap.put("b", i);
}
nonThreadSafeCounter++;
statefulMap.put("nonThreadSafeCounter", nonThreadSafeCounter);
if (i < NUM) {
inner.schedule(this);
} else {
latch.countDown();
}
}
});

try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("Count A: " + map.get("a"));
System.out.println("Count B: " + map.get("b"));
System.out.println("nonThreadSafeCounter: " + map.get("nonThreadSafeCounter"));

assertEquals(NUM, map.get("a").intValue());
assertEquals(NUM, map.get("b").intValue());
assertEquals(NUM, map.get("nonThreadSafeCounter").intValue());
} finally {
inner.dispose();
}
}

@Test
public final void computationThreadPool1() {
Flowable<Integer> f1 = Flowable.<Integer> just(1, 2, 3, 4, 5);
Flowable<Integer> f2 = Flowable.<Integer> just(6, 7, 8, 9, 10);
Flowable<String> f = Flowable.<Integer> mergeArray(f1, f2)
.map(t -> {
assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool"));
return "Value_" + t + "_Thread_" + Thread.currentThread().getName();
});

f.subscribeOn(Schedulers.computation()).blockingForEach(t -> System.out.println("t: " + t));
}

@Test
public final void mergeWithExecutorScheduler() {

final String currentThreadName = Thread.currentThread().getName();

Flowable<Integer> f1 = Flowable.<Integer> just(1, 2, 3, 4, 5);
Flowable<Integer> f2 = Flowable.<Integer> just(6, 7, 8, 9, 10);
Flowable<String> f = Flowable.<Integer> mergeArray(f1, f2)
.subscribeOn(Schedulers.computation())
.map(t -> {
assertNotEquals(Thread.currentThread().getName(), currentThreadName);
assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool"));
return "Value_" + t + "_Thread_" + Thread.currentThread().getName();
});

f.blockingForEach(t -> System.out.println("t: " + t));
}

@Test
public final void handledErrorIsNotDeliveredToThreadHandler() throws InterruptedException {
SchedulerTestHelper.handledErrorIsNotDeliveredToThreadHandler(getScheduler());
}

@Test
public void cancelledTaskRetention() throws InterruptedException {
Worker w = Schedulers.computation().createWorker();
try {
ExecutorSchedulerTest.cancelledRetention(w, false);
} finally {
w.dispose();
}
w = Schedulers.computation().createWorker();
try {
ExecutorSchedulerTest.cancelledRetention(w, true);
} finally {
w.dispose();
}
}

@Test
@SuppressUndeliverable
public void shutdownRejects() {
final int[] calls = { 0 };

Runnable r = () -> calls[0]++;

Scheduler s = new ComputationScheduler();
s.shutdown();
s.shutdown();

assertEquals(Disposable.disposed(), s.scheduleDirect(r));

assertEquals(Disposable.disposed(), s.scheduleDirect(r, 1, TimeUnit.SECONDS));

assertEquals(Disposable.disposed(), s.schedulePeriodicallyDirect(r, 1, 1, TimeUnit.SECONDS));

Worker w = s.createWorker();
w.dispose();

assertTrue(w.isDisposed());

assertEquals(Disposable.disposed(), w.schedule(r));

assertEquals(Disposable.disposed(), w.schedule(r, 1, TimeUnit.SECONDS));

assertEquals(Disposable.disposed(), w.schedulePeriodically(r, 1, 1, TimeUnit.SECONDS));

assertEquals(0, calls[0]);
}

@Test
public void exceptionFromObservableShouldNotBeSwallowed() throws Exception {
CountDownLatch latch = new CountDownLatch(1);

// #3 thread's uncaught exception handler
Scheduler computationScheduler = new ComputationScheduler(r -> {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler((_, _) -> latch.countDown());
return t;
});

// #2 RxJava exception handler
RxJavaPlugins.setErrorHandler(_ -> latch.countDown());

// Exceptions, fatal or not, should be handled by
// #1 observer's onError(), or
// #2 RxJava exception handler, or
// #3 thread's uncaught exception handler,
// and should not be swallowed.
try {

// #1 observer's onError()
Observable.create(s -> {

s.onNext(1);
throw new OutOfMemoryError();
})
.subscribeOn(computationScheduler)
.subscribe(_ -> { },
_ -> latch.countDown()
);

assertTrue(latch.await(2, TimeUnit.SECONDS));
} finally {
RxJavaPlugins.reset();
computationScheduler.shutdown();
}
}

@Test
public void exceptionFromObserverShouldNotBeSwallowed() throws Exception {
CountDownLatch latch = new CountDownLatch(1);

// #3 thread's uncaught exception handler
Scheduler computationScheduler = new ComputationScheduler(r -> {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler((_, _) -> latch.countDown());
return t;
});

// #2 RxJava exception handler
RxJavaPlugins.setErrorHandler(_ -> latch.countDown());

// Exceptions, fatal or not, should be handled by
// #1 observer's onError(), or
// #2 RxJava exception handler, or
// #3 thread's uncaught exception handler,
// and should not be swallowed.
try {

// #1 observer's onError()
Flowable.interval(500, TimeUnit.MILLISECONDS, computationScheduler)
.subscribe(_ -> {
throw new OutOfMemoryError();
}, _ -> latch.countDown());

assertTrue(latch.await(2, TimeUnit.SECONDS));
} finally {
RxJavaPlugins.reset();
computationScheduler.shutdown();
}
}

@Test
@SuppressUndeliverable
public void periodicTaskShouldStopOnError() throws Exception {
AtomicInteger repeatCount = new AtomicInteger();

Schedulers.computation().schedulePeriodicallyDirect(() -> {
repeatCount.incrementAndGet();
throw new OutOfMemoryError();
}, 0, 1, TimeUnit.MILLISECONDS);

Thread.sleep(200);

assertEquals(1, repeatCount.get());
}

@Test
@SuppressUndeliverable
public void periodicTaskShouldStopOnError2() throws Exception {
AtomicInteger repeatCount = new AtomicInteger();

Schedulers.computation().schedulePeriodicallyDirect(() -> {
repeatCount.incrementAndGet();
throw new OutOfMemoryError();
}, 0, 1, TimeUnit.NANOSECONDS);

Thread.sleep(200);

assertEquals(1, repeatCount.get());
}
}
Loading