Skip to content

Commit b16727c

Browse files
committed
Fix for SerializedObserverTest
1 parent 95e0636 commit b16727c

File tree

1 file changed

+107
-95
lines changed

1 file changed

+107
-95
lines changed

rxjava-core/src/test/java/rx/observers/SerializedObserverTest.java

Lines changed: 107 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ private Observer<String> serializedObserver(Observer<String> o) {
6464
@Test
6565
public void testSingleThreadedBasic() {
6666
Subscription s = mock(Subscription.class);
67-
TestSingleThreadedObservable onSubscribe = new TestSingleThreadedObservable(s, "one", "two", "three");
67+
TestSingleThreadedObservable onSubscribe = new TestSingleThreadedObservable("one", "two", "three");
6868
Observable<String> w = Observable.create(onSubscribe);
6969

7070
Observer<String> aw = serializedObserver(observer);
@@ -84,8 +84,7 @@ public void testSingleThreadedBasic() {
8484

8585
@Test
8686
public void testMultiThreadedBasic() {
87-
Subscription s = mock(Subscription.class);
88-
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three");
87+
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable("one", "two", "three");
8988
Observable<String> w = Observable.create(onSubscribe);
9089

9190
BusyObserver busyObserver = new BusyObserver();
@@ -109,8 +108,7 @@ public void testMultiThreadedBasic() {
109108

110109
@Test(timeout = 1000)
111110
public void testMultiThreadedWithNPE() throws InterruptedException {
112-
Subscription s = mock(Subscription.class);
113-
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null);
111+
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable("one", "two", "three", null);
114112
Observable<String> w = Observable.create(onSubscribe);
115113

116114
BusyObserver busyObserver = new BusyObserver();
@@ -141,40 +139,40 @@ public void testMultiThreadedWithNPE() throws InterruptedException {
141139

142140
@Test
143141
public void testMultiThreadedWithNPEinMiddle() {
144-
Subscription s = mock(Subscription.class);
145-
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null, "four", "five", "six", "seven", "eight", "nine");
146-
Observable<String> w = Observable.create(onSubscribe);
147-
148-
BusyObserver busyObserver = new BusyObserver();
149-
Observer<String> aw = serializedObserver(busyObserver);
150-
151-
w.subscribe(aw);
152-
onSubscribe.waitToFinish();
153-
154-
System.out.println("OnSubscribe maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get() + " Observer maxConcurrentThreads: " + busyObserver.maxConcurrentThreads.get());
155-
156-
// we can have concurrency ...
157-
assertTrue(onSubscribe.maxConcurrentThreads.get() > 1);
158-
// ... but the onNext execution should be single threaded
159-
assertEquals(1, busyObserver.maxConcurrentThreads.get());
160-
161-
// this should not be the full number of items since the error should stop it before it completes all 9
162-
System.out.println("onNext count: " + busyObserver.onNextCount.get());
163-
assertTrue(busyObserver.onNextCount.get() < 9);
164-
assertTrue(busyObserver.onError);
165-
// no onCompleted because onError was invoked
166-
assertFalse(busyObserver.onCompleted);
167-
// non-deterministic because unsubscribe happens after 'waitToFinish' releases
168-
// so commenting out for now as this is not a critical thing to test here
169-
// verify(s, times(1)).unsubscribe();
142+
int n = 10;
143+
for (int i = 0; i < n; i++) {
144+
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable("one", "two", "three", null,
145+
"four", "five", "six", "seven", "eight", "nine");
146+
Observable<String> w = Observable.create(onSubscribe);
147+
148+
BusyObserver busyObserver = new BusyObserver();
149+
Observer<String> aw = serializedObserver(busyObserver);
150+
151+
w.subscribe(aw);
152+
onSubscribe.waitToFinish();
153+
154+
System.out.println("OnSubscribe maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get() + " Observer maxConcurrentThreads: " + busyObserver.maxConcurrentThreads.get());
155+
156+
// we can have concurrency ...
157+
assertTrue(onSubscribe.maxConcurrentThreads.get() > 1);
158+
// ... but the onNext execution should be single threaded
159+
assertEquals(1, busyObserver.maxConcurrentThreads.get());
160+
161+
// this should not be the full number of items since the error should stop it before it completes all 9
162+
System.out.println("onNext count: " + busyObserver.onNextCount.get());
163+
assertFalse(busyObserver.onCompleted);
164+
assertTrue(busyObserver.onError);
165+
assertTrue(busyObserver.onNextCount.get() < 9);
166+
// no onCompleted because onError was invoked
167+
// non-deterministic because unsubscribe happens after 'waitToFinish' releases
168+
// so commenting out for now as this is not a critical thing to test here
169+
// verify(s, times(1)).unsubscribe();
170+
}
170171
}
171172

172173
/**
173174
* A non-realistic use case that tries to expose thread-safety issues by throwing lots of out-of-order
174175
* events on many threads.
175-
*
176-
* @param w
177-
* @param tw
178176
*/
179177
@Test
180178
public void runOutOfOrderConcurrencyTest() {
@@ -226,11 +224,6 @@ public void runOutOfOrderConcurrencyTest() {
226224
}
227225
}
228226

229-
/**
230-
*
231-
* @param w
232-
* @param tw
233-
*/
234227
@Test
235228
public void runConcurrencyTest() {
236229
ExecutorService tp = Executors.newFixedThreadPool(20);
@@ -283,59 +276,68 @@ public void runConcurrencyTest() {
283276
@Test
284277
public void testNotificationDelay() throws InterruptedException {
285278
ExecutorService tp = Executors.newFixedThreadPool(2);
279+
try {
280+
int n = 10;
281+
for (int i = 0; i < n; i++) {
282+
final CountDownLatch firstOnNext = new CountDownLatch(1);
283+
final CountDownLatch onNextCount = new CountDownLatch(2);
284+
final CountDownLatch latch = new CountDownLatch(1);
285+
final CountDownLatch running = new CountDownLatch(2);
286286

287-
final CountDownLatch firstOnNext = new CountDownLatch(1);
288-
final CountDownLatch onNextCount = new CountDownLatch(2);
289-
final CountDownLatch latch = new CountDownLatch(1);
287+
TestSubscriber<String> to = new TestSubscriber<String>(new Observer<String>() {
290288

291-
TestSubscriber<String> to = new TestSubscriber<String>(new Observer<String>() {
289+
@Override
290+
public void onCompleted() {
292291

293-
@Override
294-
public void onCompleted() {
295-
296-
}
292+
}
297293

298-
@Override
299-
public void onError(Throwable e) {
294+
@Override
295+
public void onError(Throwable e) {
300296

301-
}
297+
}
302298

303-
@Override
304-
public void onNext(String t) {
305-
firstOnNext.countDown();
306-
// force it to take time when delivering so the second one is enqueued
307-
try {
308-
latch.await();
309-
} catch (InterruptedException e) {
310-
}
311-
}
299+
@Override
300+
public void onNext(String t) {
301+
firstOnNext.countDown();
302+
// force it to take time when delivering so the second one is enqueued
303+
try {
304+
latch.await();
305+
} catch (InterruptedException e) {
306+
}
307+
}
312308

313-
});
314-
Observer<String> o = serializedObserver(to);
309+
});
310+
Observer<String> o = serializedObserver(to);
315311

316-
Future<?> f1 = tp.submit(new OnNextThread(o, 1, onNextCount));
317-
Future<?> f2 = tp.submit(new OnNextThread(o, 1, onNextCount));
312+
Future<?> f1 = tp.submit(new OnNextThread(o, 1, onNextCount, running));
313+
Future<?> f2 = tp.submit(new OnNextThread(o, 1, onNextCount, running));
318314

319-
firstOnNext.await();
315+
running.await(); // let one of the OnNextThread actually run before proceeding
316+
317+
firstOnNext.await();
320318

321-
Thread t1 = to.getLastSeenThread();
322-
System.out.println("first onNext on thread: " + t1);
319+
Thread t1 = to.getLastSeenThread();
320+
System.out.println("first onNext on thread: " + t1);
323321

324-
latch.countDown();
322+
latch.countDown();
325323

326-
waitOnThreads(f1, f2);
327-
// not completed yet
324+
waitOnThreads(f1, f2);
325+
// not completed yet
328326

329-
assertEquals(2, to.getOnNextEvents().size());
327+
assertEquals(2, to.getOnNextEvents().size());
330328

331-
Thread t2 = to.getLastSeenThread();
332-
System.out.println("second onNext on thread: " + t2);
329+
Thread t2 = to.getLastSeenThread();
330+
System.out.println("second onNext on thread: " + t2);
333331

334-
assertSame(t1, t2);
332+
assertSame(t1, t2);
335333

336-
System.out.println(to.getOnNextEvents());
337-
o.onCompleted();
338-
System.out.println(to.getOnNextEvents());
334+
System.out.println(to.getOnNextEvents());
335+
o.onCompleted();
336+
System.out.println(to.getOnNextEvents());
337+
}
338+
} finally {
339+
tp.shutdown();
340+
}
339341
}
340342

341343
/**
@@ -435,20 +437,22 @@ public static class OnNextThread implements Runnable {
435437
private final Observer<String> observer;
436438
private final int numStringsToSend;
437439
final AtomicInteger produced;
440+
private final CountDownLatch running;
438441

439-
OnNextThread(Observer<String> observer, int numStringsToSend, CountDownLatch latch) {
440-
this(observer, numStringsToSend, new AtomicInteger(), latch);
442+
OnNextThread(Observer<String> observer, int numStringsToSend, CountDownLatch latch, CountDownLatch running) {
443+
this(observer, numStringsToSend, new AtomicInteger(), latch, running);
441444
}
442445

443446
OnNextThread(Observer<String> observer, int numStringsToSend, AtomicInteger produced) {
444-
this(observer, numStringsToSend, produced, null);
447+
this(observer, numStringsToSend, produced, null, null);
445448
}
446449

447-
OnNextThread(Observer<String> observer, int numStringsToSend, AtomicInteger produced, CountDownLatch latch) {
450+
OnNextThread(Observer<String> observer, int numStringsToSend, AtomicInteger produced, CountDownLatch latch, CountDownLatch running) {
448451
this.observer = observer;
449452
this.numStringsToSend = numStringsToSend;
450453
this.produced = produced;
451454
this.latch = latch;
455+
this.running = running;
452456
}
453457

454458
OnNextThread(Observer<String> observer, int numStringsToSend) {
@@ -457,6 +461,9 @@ public static class OnNextThread implements Runnable {
457461

458462
@Override
459463
public void run() {
464+
if (running != null) {
465+
running.countDown();
466+
}
460467
for (int i = 0; i < numStringsToSend; i++) {
461468
observer.onNext(Thread.currentThread().getId() + "-" + i);
462469
if (latch != null) {
@@ -603,19 +610,18 @@ public int assertEvents(TestConcurrencyObserverEvent expectedEndingEvent) throws
603610
/**
604611
* This spawns a single thread for the subscribe execution
605612
*/
606-
private static class TestSingleThreadedObservable implements Observable.OnSubscribeFunc<String> {
613+
private static class TestSingleThreadedObservable implements Observable.OnSubscribe<String> {
607614

608-
final Subscription s;
609615
final String[] values;
610616
private Thread t = null;
611617

612-
public TestSingleThreadedObservable(final Subscription s, final String... values) {
613-
this.s = s;
618+
public TestSingleThreadedObservable(final String... values) {
614619
this.values = values;
615620

616621
}
617622

618-
public Subscription onSubscribe(final Observer<? super String> observer) {
623+
@Override
624+
public void call(final Subscriber<? super String> observer) {
619625
System.out.println("TestSingleThreadedObservable subscribed to ...");
620626
t = new Thread(new Runnable() {
621627

@@ -637,7 +643,6 @@ public void run() {
637643
System.out.println("starting TestSingleThreadedObservable thread");
638644
t.start();
639645
System.out.println("done starting TestSingleThreadedObservable thread");
640-
return s;
641646
}
642647

643648
public void waitToFinish() {
@@ -653,42 +658,48 @@ public void waitToFinish() {
653658
/**
654659
* This spawns a thread for the subscription, then a separate thread for each onNext call.
655660
*/
656-
private static class TestMultiThreadedObservable implements Observable.OnSubscribeFunc<String> {
661+
private static class TestMultiThreadedObservable implements Observable.OnSubscribe<String> {
657662

658-
final Subscription s;
659663
final String[] values;
660664
Thread t = null;
661665
AtomicInteger threadsRunning = new AtomicInteger();
662666
AtomicInteger maxConcurrentThreads = new AtomicInteger();
663667
ExecutorService threadPool;
664668

665-
public TestMultiThreadedObservable(Subscription s, String... values) {
666-
this.s = s;
669+
public TestMultiThreadedObservable(String... values) {
667670
this.values = values;
668671
this.threadPool = Executors.newCachedThreadPool();
669672
}
670673

671674
@Override
672-
public Subscription onSubscribe(final Observer<? super String> observer) {
675+
public void call(final Subscriber<? super String> observer) {
673676
System.out.println("TestMultiThreadedObservable subscribed to ...");
674677
t = new Thread(new Runnable() {
675678

676679
@Override
677680
public void run() {
678681
try {
679682
System.out.println("running TestMultiThreadedObservable thread");
683+
int j = 0;
680684
for (final String s : values) {
685+
final int fj = ++j;
681686
threadPool.execute(new Runnable() {
682687

683688
@Override
684689
public void run() {
685690
threadsRunning.incrementAndGet();
686691
try {
687692
// perform onNext call
688-
System.out.println("TestMultiThreadedObservable onNext: " + s);
693+
System.out.println("TestMultiThreadedObservable onNext: " + s + " on thread " + Thread.currentThread().getName());
689694
if (s == null) {
690695
// force an error
691696
throw new NullPointerException();
697+
} else {
698+
// allow the exception to queue up
699+
int sleep = (fj % 3) * 10;
700+
if (sleep != 0) {
701+
Thread.sleep(sleep);
702+
}
692703
}
693704
observer.onNext(s);
694705
// capture 'maxThreads'
@@ -714,7 +725,9 @@ public void run() {
714725
// wait until all threads are done, then mark it as COMPLETED
715726
try {
716727
// wait for all the threads to finish
717-
threadPool.awaitTermination(2, TimeUnit.SECONDS);
728+
if (!threadPool.awaitTermination(5, TimeUnit.SECONDS)) {
729+
System.out.println("Threadpool did not terminate in time.");
730+
}
718731
} catch (InterruptedException e) {
719732
throw new RuntimeException(e);
720733
}
@@ -724,7 +737,6 @@ public void run() {
724737
System.out.println("starting TestMultiThreadedObservable thread");
725738
t.start();
726739
System.out.println("done starting TestMultiThreadedObservable thread");
727-
return s;
728740
}
729741

730742
public void waitToFinish() {
@@ -776,7 +788,7 @@ public void onNext(String args) {
776788
onNextCount.incrementAndGet();
777789
try {
778790
// simulate doing something computational
779-
Thread.sleep(200);
791+
Thread.sleep(100);
780792
} catch (InterruptedException e) {
781793
e.printStackTrace();
782794
}

0 commit comments

Comments
 (0)