Skip to content

Fix for SerializedObserverTest #1128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 107 additions & 95 deletions rxjava-core/src/test/java/rx/observers/SerializedObserverTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private Observer<String> serializedObserver(Observer<String> o) {
@Test
public void testSingleThreadedBasic() {
Subscription s = mock(Subscription.class);
TestSingleThreadedObservable onSubscribe = new TestSingleThreadedObservable(s, "one", "two", "three");
TestSingleThreadedObservable onSubscribe = new TestSingleThreadedObservable("one", "two", "three");
Observable<String> w = Observable.create(onSubscribe);

Observer<String> aw = serializedObserver(observer);
Expand All @@ -84,8 +84,7 @@ public void testSingleThreadedBasic() {

@Test
public void testMultiThreadedBasic() {
Subscription s = mock(Subscription.class);
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three");
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable("one", "two", "three");
Observable<String> w = Observable.create(onSubscribe);

BusyObserver busyObserver = new BusyObserver();
Expand All @@ -109,8 +108,7 @@ public void testMultiThreadedBasic() {

@Test(timeout = 1000)
public void testMultiThreadedWithNPE() throws InterruptedException {
Subscription s = mock(Subscription.class);
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null);
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable("one", "two", "three", null);
Observable<String> w = Observable.create(onSubscribe);

BusyObserver busyObserver = new BusyObserver();
Expand Down Expand Up @@ -141,40 +139,40 @@ public void testMultiThreadedWithNPE() throws InterruptedException {

@Test
public void testMultiThreadedWithNPEinMiddle() {
Subscription s = mock(Subscription.class);
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null, "four", "five", "six", "seven", "eight", "nine");
Observable<String> w = Observable.create(onSubscribe);

BusyObserver busyObserver = new BusyObserver();
Observer<String> aw = serializedObserver(busyObserver);

w.subscribe(aw);
onSubscribe.waitToFinish();

System.out.println("OnSubscribe maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get() + " Observer maxConcurrentThreads: " + busyObserver.maxConcurrentThreads.get());

// we can have concurrency ...
assertTrue(onSubscribe.maxConcurrentThreads.get() > 1);
// ... but the onNext execution should be single threaded
assertEquals(1, busyObserver.maxConcurrentThreads.get());

// this should not be the full number of items since the error should stop it before it completes all 9
System.out.println("onNext count: " + busyObserver.onNextCount.get());
assertTrue(busyObserver.onNextCount.get() < 9);
assertTrue(busyObserver.onError);
// no onCompleted because onError was invoked
assertFalse(busyObserver.onCompleted);
// non-deterministic because unsubscribe happens after 'waitToFinish' releases
// so commenting out for now as this is not a critical thing to test here
// verify(s, times(1)).unsubscribe();
int n = 10;
for (int i = 0; i < n; i++) {
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable("one", "two", "three", null,
"four", "five", "six", "seven", "eight", "nine");
Observable<String> w = Observable.create(onSubscribe);

BusyObserver busyObserver = new BusyObserver();
Observer<String> aw = serializedObserver(busyObserver);

w.subscribe(aw);
onSubscribe.waitToFinish();

System.out.println("OnSubscribe maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get() + " Observer maxConcurrentThreads: " + busyObserver.maxConcurrentThreads.get());

// we can have concurrency ...
assertTrue(onSubscribe.maxConcurrentThreads.get() > 1);
// ... but the onNext execution should be single threaded
assertEquals(1, busyObserver.maxConcurrentThreads.get());

// this should not be the full number of items since the error should stop it before it completes all 9
System.out.println("onNext count: " + busyObserver.onNextCount.get());
assertFalse(busyObserver.onCompleted);
assertTrue(busyObserver.onError);
assertTrue(busyObserver.onNextCount.get() < 9);
// no onCompleted because onError was invoked
// non-deterministic because unsubscribe happens after 'waitToFinish' releases
// so commenting out for now as this is not a critical thing to test here
// verify(s, times(1)).unsubscribe();
}
}

/**
* A non-realistic use case that tries to expose thread-safety issues by throwing lots of out-of-order
* events on many threads.
*
* @param w
* @param tw
*/
@Test
public void runOutOfOrderConcurrencyTest() {
Expand Down Expand Up @@ -226,11 +224,6 @@ public void runOutOfOrderConcurrencyTest() {
}
}

/**
*
* @param w
* @param tw
*/
@Test
public void runConcurrencyTest() {
ExecutorService tp = Executors.newFixedThreadPool(20);
Expand Down Expand Up @@ -283,59 +276,68 @@ public void runConcurrencyTest() {
@Test
public void testNotificationDelay() throws InterruptedException {
ExecutorService tp = Executors.newFixedThreadPool(2);
try {
int n = 10;
for (int i = 0; i < n; i++) {
final CountDownLatch firstOnNext = new CountDownLatch(1);
final CountDownLatch onNextCount = new CountDownLatch(2);
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch running = new CountDownLatch(2);

final CountDownLatch firstOnNext = new CountDownLatch(1);
final CountDownLatch onNextCount = new CountDownLatch(2);
final CountDownLatch latch = new CountDownLatch(1);
TestSubscriber<String> to = new TestSubscriber<String>(new Observer<String>() {

TestSubscriber<String> to = new TestSubscriber<String>(new Observer<String>() {
@Override
public void onCompleted() {

@Override
public void onCompleted() {

}
}

@Override
public void onError(Throwable e) {
@Override
public void onError(Throwable e) {

}
}

@Override
public void onNext(String t) {
firstOnNext.countDown();
// force it to take time when delivering so the second one is enqueued
try {
latch.await();
} catch (InterruptedException e) {
}
}
@Override
public void onNext(String t) {
firstOnNext.countDown();
// force it to take time when delivering so the second one is enqueued
try {
latch.await();
} catch (InterruptedException e) {
}
}

});
Observer<String> o = serializedObserver(to);
});
Observer<String> o = serializedObserver(to);

Future<?> f1 = tp.submit(new OnNextThread(o, 1, onNextCount));
Future<?> f2 = tp.submit(new OnNextThread(o, 1, onNextCount));
Future<?> f1 = tp.submit(new OnNextThread(o, 1, onNextCount, running));
Future<?> f2 = tp.submit(new OnNextThread(o, 1, onNextCount, running));

firstOnNext.await();
running.await(); // let one of the OnNextThread actually run before proceeding

firstOnNext.await();

Thread t1 = to.getLastSeenThread();
System.out.println("first onNext on thread: " + t1);
Thread t1 = to.getLastSeenThread();
System.out.println("first onNext on thread: " + t1);

latch.countDown();
latch.countDown();

waitOnThreads(f1, f2);
// not completed yet
waitOnThreads(f1, f2);
// not completed yet

assertEquals(2, to.getOnNextEvents().size());
assertEquals(2, to.getOnNextEvents().size());

Thread t2 = to.getLastSeenThread();
System.out.println("second onNext on thread: " + t2);
Thread t2 = to.getLastSeenThread();
System.out.println("second onNext on thread: " + t2);

assertSame(t1, t2);
assertSame(t1, t2);

System.out.println(to.getOnNextEvents());
o.onCompleted();
System.out.println(to.getOnNextEvents());
System.out.println(to.getOnNextEvents());
o.onCompleted();
System.out.println(to.getOnNextEvents());
}
} finally {
tp.shutdown();
}
}

/**
Expand Down Expand Up @@ -435,20 +437,22 @@ public static class OnNextThread implements Runnable {
private final Observer<String> observer;
private final int numStringsToSend;
final AtomicInteger produced;
private final CountDownLatch running;

OnNextThread(Observer<String> observer, int numStringsToSend, CountDownLatch latch) {
this(observer, numStringsToSend, new AtomicInteger(), latch);
OnNextThread(Observer<String> observer, int numStringsToSend, CountDownLatch latch, CountDownLatch running) {
this(observer, numStringsToSend, new AtomicInteger(), latch, running);
}

OnNextThread(Observer<String> observer, int numStringsToSend, AtomicInteger produced) {
this(observer, numStringsToSend, produced, null);
this(observer, numStringsToSend, produced, null, null);
}

OnNextThread(Observer<String> observer, int numStringsToSend, AtomicInteger produced, CountDownLatch latch) {
OnNextThread(Observer<String> observer, int numStringsToSend, AtomicInteger produced, CountDownLatch latch, CountDownLatch running) {
this.observer = observer;
this.numStringsToSend = numStringsToSend;
this.produced = produced;
this.latch = latch;
this.running = running;
}

OnNextThread(Observer<String> observer, int numStringsToSend) {
Expand All @@ -457,6 +461,9 @@ public static class OnNextThread implements Runnable {

@Override
public void run() {
if (running != null) {
running.countDown();
}
for (int i = 0; i < numStringsToSend; i++) {
observer.onNext(Thread.currentThread().getId() + "-" + i);
if (latch != null) {
Expand Down Expand Up @@ -603,19 +610,18 @@ public int assertEvents(TestConcurrencyObserverEvent expectedEndingEvent) throws
/**
* This spawns a single thread for the subscribe execution
*/
private static class TestSingleThreadedObservable implements Observable.OnSubscribeFunc<String> {
private static class TestSingleThreadedObservable implements Observable.OnSubscribe<String> {

final Subscription s;
final String[] values;
private Thread t = null;

public TestSingleThreadedObservable(final Subscription s, final String... values) {
this.s = s;
public TestSingleThreadedObservable(final String... values) {
this.values = values;

}

public Subscription onSubscribe(final Observer<? super String> observer) {
@Override
public void call(final Subscriber<? super String> observer) {
System.out.println("TestSingleThreadedObservable subscribed to ...");
t = new Thread(new Runnable() {

Expand All @@ -637,7 +643,6 @@ public void run() {
System.out.println("starting TestSingleThreadedObservable thread");
t.start();
System.out.println("done starting TestSingleThreadedObservable thread");
return s;
}

public void waitToFinish() {
Expand All @@ -653,42 +658,48 @@ public void waitToFinish() {
/**
* This spawns a thread for the subscription, then a separate thread for each onNext call.
*/
private static class TestMultiThreadedObservable implements Observable.OnSubscribeFunc<String> {
private static class TestMultiThreadedObservable implements Observable.OnSubscribe<String> {

final Subscription s;
final String[] values;
Thread t = null;
AtomicInteger threadsRunning = new AtomicInteger();
AtomicInteger maxConcurrentThreads = new AtomicInteger();
ExecutorService threadPool;

public TestMultiThreadedObservable(Subscription s, String... values) {
this.s = s;
public TestMultiThreadedObservable(String... values) {
this.values = values;
this.threadPool = Executors.newCachedThreadPool();
}

@Override
public Subscription onSubscribe(final Observer<? super String> observer) {
public void call(final Subscriber<? super String> observer) {
System.out.println("TestMultiThreadedObservable subscribed to ...");
t = new Thread(new Runnable() {

@Override
public void run() {
try {
System.out.println("running TestMultiThreadedObservable thread");
int j = 0;
for (final String s : values) {
final int fj = ++j;
threadPool.execute(new Runnable() {

@Override
public void run() {
threadsRunning.incrementAndGet();
try {
// perform onNext call
System.out.println("TestMultiThreadedObservable onNext: " + s);
System.out.println("TestMultiThreadedObservable onNext: " + s + " on thread " + Thread.currentThread().getName());
if (s == null) {
// force an error
throw new NullPointerException();
} else {
// allow the exception to queue up
int sleep = (fj % 3) * 10;
if (sleep != 0) {
Thread.sleep(sleep);
}
}
observer.onNext(s);
// capture 'maxThreads'
Expand All @@ -714,7 +725,9 @@ public void run() {
// wait until all threads are done, then mark it as COMPLETED
try {
// wait for all the threads to finish
threadPool.awaitTermination(2, TimeUnit.SECONDS);
if (!threadPool.awaitTermination(5, TimeUnit.SECONDS)) {
System.out.println("Threadpool did not terminate in time.");
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand All @@ -724,7 +737,6 @@ public void run() {
System.out.println("starting TestMultiThreadedObservable thread");
t.start();
System.out.println("done starting TestMultiThreadedObservable thread");
return s;
}

public void waitToFinish() {
Expand Down Expand Up @@ -776,7 +788,7 @@ public void onNext(String args) {
onNextCount.incrementAndGet();
try {
// simulate doing something computational
Thread.sleep(200);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
Expand Down