diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index cb5b82fd46..be5b2b37a4 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -7052,11 +7052,12 @@ public final Subscription subscribe(Subscriber observer, Scheduler sc * @return the source Observable modified so that its subscriptions and unsubscriptions happen * on the specified {@link Scheduler} * @see RxJava Wiki: subscribeOn() + * @see #subscribeOn(rx.Scheduler, int) */ public final Observable subscribeOn(Scheduler scheduler) { return nest().lift(new OperatorSubscribeOn(scheduler)); } - + /** * Returns an Observable that extracts a Double from each of the items emitted by the source * Observable via a function you specify, and then emits the sum of these Doubles. diff --git a/rxjava-core/src/main/java/rx/observers/TestSubscriber.java b/rxjava-core/src/main/java/rx/observers/TestSubscriber.java index ff875b2655..bf51a2b5b4 100644 --- a/rxjava-core/src/main/java/rx/observers/TestSubscriber.java +++ b/rxjava-core/src/main/java/rx/observers/TestSubscriber.java @@ -101,6 +101,14 @@ public void awaitTerminalEvent(long timeout, TimeUnit unit) { } } + public void awaitTerminalEventAndUnsubscribeOnTimeout(long timeout, TimeUnit unit) { + try { + awaitTerminalEvent(timeout, unit); + } catch (RuntimeException e) { + unsubscribe(); + } + } + public Thread getLastSeenThread() { return lastSeenThread; } diff --git a/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java b/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java new file mode 100644 index 0000000000..7526674636 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java @@ -0,0 +1,193 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.LinkedList; +import java.util.Queue; +import rx.Subscriber; +import rx.subscriptions.CompositeSubscription; + +/** + * Buffers the incoming events until notified, then replays the + * buffered events and continues as a simple pass-through subscriber. + * @param the streamed value type + */ +public class BufferUntilSubscriber extends Subscriber { + /** The actual subscriber. */ + private final Subscriber actual; + /** Indicate the pass-through mode. */ + private volatile boolean passthroughMode; + /** Protect mode transition. */ + private final Object gate = new Object(); + /** The buffered items. */ + private final Queue queue = new LinkedList(); + /** The queue capacity. */ + private final int capacity; + /** Null sentinel (in case queue type is changed). */ + private static final Object NULL_SENTINEL = new Object(); + /** Complete sentinel. */ + private static final Object COMPLETE_SENTINEL = new Object(); + /** + * Container for an onError event. + */ + private static final class ErrorSentinel { + final Throwable t; + + public ErrorSentinel(Throwable t) { + this.t = t; + } + + } + /** + * Constructor that wraps the actual subscriber and shares its subscription. + * @param capacity the queue capacity to accept before blocking, negative value indicates an unbounded queue + * @param actual + */ + public BufferUntilSubscriber(int capacity, Subscriber actual) { + super(actual); + this.actual = actual; + this.capacity = capacity; + } + /** + * Constructor that wraps the actual subscriber and uses the given composite + * subscription. + * @param capacity the queue capacity to accept before blocking, negative value indicates an unbounded queue + * @param actual + * @param cs + */ + public BufferUntilSubscriber(int capacity, Subscriber actual, CompositeSubscription cs) { + super(cs); + this.actual = actual; + this.capacity = capacity; + } + + /** + * Call this method to replay the buffered events and continue as a pass-through subscriber. + * If already in pass-through mode, this method is a no-op. + */ + public void enterPassthroughMode() { + if (!passthroughMode) { + synchronized (gate) { + if (!passthroughMode) { + while (!queue.isEmpty()) { + Object o = queue.poll(); + if (!actual.isUnsubscribed()) { + if (o == NULL_SENTINEL) { + actual.onNext(null); + } else + if (o == COMPLETE_SENTINEL) { + actual.onCompleted(); + } else + if (o instanceof ErrorSentinel) { + actual.onError(((ErrorSentinel)o).t); + } else + if (o != null) { + @SuppressWarnings("unchecked") + T v = (T)o; + actual.onNext(v); + } else { + throw new NullPointerException(); + } + } + } + passthroughMode = true; + gate.notifyAll(); + } + } + } + } + @Override + public void onNext(T t) { + if (!passthroughMode) { + synchronized (gate) { + if (!passthroughMode) { + if (capacity < 0 || queue.size() < capacity) { + queue.offer(t != null ? t : NULL_SENTINEL); + return; + } + try { + while (!passthroughMode) { + gate.wait(); + } + if (actual.isUnsubscribed()) { + return; + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + actual.onError(ex); + return; + } + } + } + } + actual.onNext(t); + } + + @Override + public void onError(Throwable e) { + if (!passthroughMode) { + synchronized (gate) { + if (!passthroughMode) { + if (capacity < 0 || queue.size() < capacity) { + queue.offer(new ErrorSentinel(e)); + return; + } + try { + while (!passthroughMode) { + gate.wait(); + } + if (actual.isUnsubscribed()) { + return; + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + actual.onError(ex); + return; + } + } + } + } + actual.onError(e); + } + + @Override + public void onCompleted() { + if (!passthroughMode) { + synchronized (gate) { + if (!passthroughMode) { + if (capacity < 0 || queue.size() < capacity) { + queue.offer(COMPLETE_SENTINEL); + return; + } + try { + while (!passthroughMode) { + gate.wait(); + } + if (actual.isUnsubscribed()) { + return; + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + actual.onError(ex); + return; + } + } + } + } + actual.onCompleted(); + } + +} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorParallel.java b/rxjava-core/src/main/java/rx/operators/OperatorParallel.java index 9b1c76961d..38559e0500 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorParallel.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorParallel.java @@ -56,6 +56,9 @@ public Integer call(T t) { @Override public Observable call(GroupedObservable g) { + // Must use observeOn not subscribeOn because we have a single source behind groupBy. + // The origin is already subscribed to, we are moving each group on to a new thread + // but the origin itself can only be on a single thread. return f.call(g.observeOn(scheduler)); } }); diff --git a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java index bdf33d501b..b8e782ef37 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java @@ -20,27 +20,54 @@ import rx.Scheduler; import rx.Scheduler.Inner; import rx.Subscriber; -import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.Subscriptions; -import rx.util.functions.Action0; import rx.util.functions.Action1; /** - * Asynchronously subscribes and unsubscribes Observers on the specified Scheduler. + * Subscribes and unsubscribes Observers on the specified Scheduler. *

+ * Will occur asynchronously except when subscribing to `GroupedObservable`, `PublishSubject` and possibly other "hot" Observables + * in which case it will subscribe synchronously and buffer/block onNext calls until the subscribe has occurred. + *

+ * See https://github.com/Netflix/RxJava/issues/844 for more information on the "time gap" issue that the synchronous + * subscribe is solving. + * * */ public class OperatorSubscribeOn implements Operator> { private final Scheduler scheduler; + /** + * Indicate that events fired between the original subscription time and + * the actual subscription time should not get lost. + */ + private final boolean dontLoseEvents; + /** The buffer size to avoid flooding. Negative value indicates an unbounded buffer. */ + private final int bufferSize; public OperatorSubscribeOn(Scheduler scheduler) { this.scheduler = scheduler; + this.dontLoseEvents = false; + this.bufferSize = -1; + } + + /** + * Construct a SubscribeOn operator. + * + * @param scheduler + * the target scheduler + * @param bufferSize + * if dontLoseEvents == true, this indicates the buffer size. Filling the buffer will + * block the source. -1 indicates an unbounded buffer + */ + public OperatorSubscribeOn(Scheduler scheduler, int bufferSize) { + this.scheduler = scheduler; + this.dontLoseEvents = true; + this.bufferSize = bufferSize; } @Override public Subscriber> call(final Subscriber subscriber) { - return new Subscriber>() { + return new Subscriber>(subscriber) { @Override public void onCompleted() { @@ -52,48 +79,33 @@ public void onError(Throwable e) { subscriber.onError(e); } + boolean checkNeedBuffer(Observable o) { + return dontLoseEvents; + } + @Override public void onNext(final Observable o) { - scheduler.schedule(new Action1() { - - @Override - public void call(final Inner inner) { - final CompositeSubscription cs = new CompositeSubscription(); - subscriber.add(Subscriptions.create(new Action0() { - - @Override - public void call() { - inner.schedule(new Action1() { - - @Override - public void call(final Inner inner) { - cs.unsubscribe(); - } - - }); - } - - })); - cs.add(subscriber); - o.subscribe(new Subscriber(cs) { - - @Override - public void onCompleted() { - subscriber.onCompleted(); - } - - @Override - public void onError(Throwable e) { - subscriber.onError(e); - } - - @Override - public void onNext(T t) { - subscriber.onNext(t); - } - }); - } - }); + if (checkNeedBuffer(o)) { + // use buffering (possibly blocking) for a possibly synchronous subscribe + final BufferUntilSubscriber bus = new BufferUntilSubscriber(bufferSize, subscriber); + o.subscribe(bus); + subscriber.add(scheduler.schedule(new Action1() { + @Override + public void call(final Inner inner) { + bus.enterPassthroughMode(); + } + })); + return; + } else { + // no buffering (async subscribe) + subscriber.add(scheduler.schedule(new Action1() { + + @Override + public void call(final Inner inner) { + o.subscribe(subscriber); + } + })); + } } }; diff --git a/rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java b/rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java index 7fc1207208..2f201dbb1b 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java @@ -253,7 +253,6 @@ public void testUnsubscribeOnNestedTakeAndSyncInfiniteStream() throws Interrupte /* * We will only take 1 group with 20 events from it and then unsubscribe. */ - @Ignore // failing because of subscribeOn time gap issue: https://github.com/Netflix/RxJava/issues/844 @Test public void testUnsubscribeOnNestedTakeAndAsyncInfiniteStream() throws InterruptedException { final AtomicInteger subscribeCounter = new AtomicInteger(); @@ -648,7 +647,6 @@ public void call(String s) { assertEquals(6, results.size()); } - @Ignore // failing because of subscribeOn time gap issue: https://github.com/Netflix/RxJava/issues/844 @Test public void testFirstGroupsCompleteAndParentSlowToThenEmitFinalGroupsWhichThenSubscribesOnAndDelaysAndThenCompletes() throws InterruptedException { final CountDownLatch first = new CountDownLatch(2); // there are two groups to first complete @@ -702,7 +700,7 @@ public void call() { }); } else { - return group.subscribeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Func1() { + return group.nest().lift(new OperatorSubscribeOn(Schedulers.newThread(), 1)).delay(400, TimeUnit.MILLISECONDS).map(new Func1() { @Override public String call(Integer t1) { @@ -803,7 +801,6 @@ public void call(String s) { assertEquals(6, results.size()); } - @Ignore // failing because of subscribeOn time gap issue: https://github.com/Netflix/RxJava/issues/844 @Test public void testGroupsWithNestedSubscribeOn() throws InterruptedException { final ArrayList results = new ArrayList(); @@ -829,7 +826,7 @@ public Integer call(Integer t) { @Override public Observable call(final GroupedObservable group) { - return group.subscribeOn(Schedulers.newThread()).map(new Func1() { + return group.nest().lift(new OperatorSubscribeOn(Schedulers.newThread(), 0)).map(new Func1() { @Override public String call(Integer t1) { diff --git a/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java index a0dfe2e7fc..54d348dc3c 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java @@ -17,24 +17,36 @@ import static org.junit.Assert.*; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import rx.Observable; import rx.Observable.OnSubscribe; +import rx.Scheduler; import rx.Subscriber; import rx.Subscription; +import rx.observables.GroupedObservable; import rx.observers.TestObserver; +import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; import rx.subscriptions.Subscriptions; +import rx.util.Timestamped; import rx.util.functions.Action0; +import rx.util.functions.Action1; +import rx.util.functions.Func1; public class OperatorSubscribeOnTest { - private class ThreadSubscription implements Subscription { + private static class ThreadSubscription implements Subscription { private volatile Thread thread; private final CountDownLatch latch = new CountDownLatch(1); @@ -99,9 +111,10 @@ public void call(Subscriber t1) { observer.assertTerminalEvent(); } - @Test + @Test(timeout = 2000) public void testIssue813() throws InterruptedException { // https://github.com/Netflix/RxJava/issues/813 + final CountDownLatch scheduled = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch doneLatch = new CountDownLatch(1); @@ -114,16 +127,14 @@ public void testIssue813() throws InterruptedException { public void call( final Subscriber subscriber) { subscriber.add(s); + scheduled.countDown(); try { latch.await(); - // Already called "unsubscribe", "isUnsubscribed" - // shouble be true - if (!subscriber.isUnsubscribed()) { - throw new IllegalStateException( - "subscriber.isUnsubscribed should be true"); - } + + // this should not run because the await above will be interrupted by the unsubscribe subscriber.onCompleted(); } catch (InterruptedException e) { + System.out.println("Interrupted because it is unsubscribed"); Thread.currentThread().interrupt(); } catch (Throwable e) { subscriber.onError(e); @@ -133,12 +144,260 @@ public void call( } }).subscribeOn(Schedulers.computation()).subscribe(observer); + // wait for scheduling + scheduled.await(); + // trigger unsubscribe subscription.unsubscribe(); // As unsubscribe is called in other thread, we need to wait for it. s.getThread(); latch.countDown(); doneLatch.await(); assertEquals(0, observer.getOnErrorEvents().size()); - assertEquals(1, observer.getOnCompletedEvents().size()); + // 0 because the unsubscribe interrupts and prevents onCompleted from being executed + assertEquals(0, observer.getOnCompletedEvents().size()); + } + + public static class SlowScheduler extends Scheduler { + final Scheduler actual; + final long delay; + final TimeUnit unit; + + public SlowScheduler() { + this(Schedulers.computation(), 2, TimeUnit.SECONDS); + } + + public SlowScheduler(Scheduler actual, long delay, TimeUnit unit) { + this.actual = actual; + this.delay = delay; + this.unit = unit; + } + + @Override + public Subscription schedule(final Action1 action) { + return actual.schedule(action, delay, unit); + } + + @Override + public Subscription schedule(final Action1 action, final long delayTime, final TimeUnit delayUnit) { + TimeUnit common = delayUnit.compareTo(unit) < 0 ? delayUnit : unit; + long t = common.convert(delayTime, delayUnit) + common.convert(delay, unit); + return actual.schedule(action, t, common); + } } + + @Test + public void testSubscribeOnPublishSubjectWithSlowScheduler() { + PublishSubject ps = PublishSubject.create(); + TestSubscriber ts = new TestSubscriber(); + ps.nest().lift(new OperatorSubscribeOn(new SlowScheduler(), 0)).subscribe(ts); + ps.onNext(1); + ps.onNext(2); + ps.onCompleted(); + + ts.awaitTerminalEvent(); + ts.assertReceivedOnNext(Arrays.asList(1, 2)); + } + + @Test + public void testGroupsWithNestedSubscribeOn() throws InterruptedException { + final ArrayList results = new ArrayList(); + Observable.create(new OnSubscribe() { + + @Override + public void call(Subscriber sub) { + sub.onNext(1); + sub.onNext(2); + sub.onNext(1); + sub.onNext(2); + sub.onCompleted(); + } + + }).groupBy(new Func1() { + + @Override + public Integer call(Integer t) { + return t; + } + + }).flatMap(new Func1, Observable>() { + + @Override + public Observable call(final GroupedObservable group) { + return group.nest().lift(new OperatorSubscribeOn(Schedulers.newThread(), 0)).map(new Func1() { + + @Override + public String call(Integer t1) { + System.out.println("Received: " + t1 + " on group : " + group.getKey()); + return "first groups: " + t1; + } + + }); + } + + }).toBlockingObservable().forEach(new Action1() { + + @Override + public void call(String s) { + results.add(s); + } + + }); + + System.out.println("Results: " + results); + assertEquals(4, results.size()); + } + + @Test + public void testFirstGroupsCompleteAndParentSlowToThenEmitFinalGroupsWhichThenSubscribesOnAndDelaysAndThenCompletes() throws InterruptedException { + final CountDownLatch first = new CountDownLatch(2); // there are two groups to first complete + final ArrayList results = new ArrayList(); + Observable.create(new OnSubscribe() { + + @Override + public void call(Subscriber sub) { + sub.onNext(1); + sub.onNext(2); + sub.onNext(1); + sub.onNext(2); + try { + first.await(); + } catch (InterruptedException e) { + sub.onError(e); + return; + } + sub.onNext(3); + sub.onNext(3); + sub.onCompleted(); + } + + }).groupBy(new Func1() { + + @Override + public Integer call(Integer t) { + return t; + } + + }).flatMap(new Func1, Observable>() { + + @Override + public Observable call(final GroupedObservable group) { + if (group.getKey() < 3) { + return group.map(new Func1() { + + @Override + public String call(Integer t1) { + return "first groups: " + t1; + } + + }) + // must take(2) so an onCompleted + unsubscribe happens on these first 2 groups + .take(2).doOnCompleted(new Action0() { + + @Override + public void call() { + first.countDown(); + } + + }); + } else { + return group.nest().lift(new OperatorSubscribeOn(Schedulers.newThread(), 0)) + .delay(400, TimeUnit.MILLISECONDS).map(new Func1() { + + @Override + public String call(Integer t1) { + return "last group: " + t1; + } + + }); + } + } + + }).toBlockingObservable().forEach(new Action1() { + + @Override + public void call(String s) { + results.add(s); + } + + }); + + System.out.println("Results: " + results); + assertEquals(6, results.size()); + } + + void testBoundedBufferingWithSize(int size) throws Exception { + Observable timer = Observable.timer(100, 100, TimeUnit.MILLISECONDS); + + final List deltas = Collections.synchronizedList(new ArrayList()); + + Subscription s = timer.timestamp().nest().lift(new OperatorSubscribeOn>( + new SlowScheduler(Schedulers.computation(), 1, TimeUnit.SECONDS), size)).map(new Func1, Long>() { + @Override + public Long call(Timestamped t1) { + long v = System.currentTimeMillis() - t1.getTimestampMillis(); + return v; + } + }).doOnNext(new Action1() { + @Override + public void call(Long t1) { + deltas.add(t1); + } + }).subscribe(); + + Thread.sleep(2050); + + s.unsubscribe(); + + if (deltas.size() < size + 1) { + fail("To few items in deltas: " + deltas); + } + for (int i = 0; i < size + 1; i++) { + if (deltas.get(i) < 500) { + fail(i + "th item arrived too early: " + deltas); + } + } + for (int i = size + 1; i < deltas.size(); i++) { + if (deltas.get(i) >= 500) { + fail(i + "th item arrived too late: " + deltas); + } + } + } + + @Test(timeout = 5000) + public void testBoundedBufferingOfZero() throws Exception { + testBoundedBufferingWithSize(0); + } + + @Test(timeout = 5000) + public void testBoundedBufferingOfOne() throws Exception { + testBoundedBufferingWithSize(1); + } + + @Test(timeout = 5000) + public void testBoundedBufferingOfTwo() throws Exception { + testBoundedBufferingWithSize(2); + } + + @Test(timeout = 5000) + public void testUnsubscribeInfiniteStream() throws InterruptedException { + TestSubscriber ts = new TestSubscriber(); + final AtomicInteger count = new AtomicInteger(); + Observable.create(new OnSubscribe() { + + @Override + public void call(Subscriber sub) { + for (int i = 1; !sub.isUnsubscribed(); i++) { + count.incrementAndGet(); + sub.onNext(i); + } + } + + }).subscribeOn(Schedulers.newThread()).take(10).subscribe(ts); + + ts.awaitTerminalEventAndUnsubscribeOnTimeout(1000, TimeUnit.MILLISECONDS); + Thread.sleep(200); // give time for the loop to continue + ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + assertEquals(10, count.get()); + } + } diff --git a/rxjava-core/src/test/java/rx/operators/OperatorTimeoutWithSelectorTest.java b/rxjava-core/src/test/java/rx/operators/OperatorTimeoutWithSelectorTest.java index 2dcb84dc8f..709a82caa3 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorTimeoutWithSelectorTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorTimeoutWithSelectorTest.java @@ -15,12 +15,20 @@ */ package rx.operators; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertFalse; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import java.util.Arrays; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Test; import org.mockito.InOrder; @@ -30,7 +38,9 @@ import rx.Observable; import rx.Observable.OnSubscribe; import rx.Observer; +import rx.Scheduler; import rx.Subscriber; +import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; import rx.subjects.PublishSubject; import rx.subscriptions.Subscriptions; @@ -328,35 +338,34 @@ public void testTimeoutSelectorWithTimeoutAndOnNextRaceCondition() throws Interr final CountDownLatch observerReceivedTwo = new CountDownLatch(1); final CountDownLatch timeoutEmittedOne = new CountDownLatch(1); final CountDownLatch observerCompleted = new CountDownLatch(1); + final CountDownLatch enteredTimeoutOne = new CountDownLatch(1); + final AtomicBoolean latchTimeout = new AtomicBoolean(false); final Func1> timeoutFunc = new Func1>() { @Override public Observable call(Integer t1) { if (t1 == 1) { // Force "unsubscribe" run on another thread - return Observable.create(new OnSubscribe(){ + return Observable.create(new OnSubscribe() { @Override public void call(Subscriber subscriber) { - subscriber.add(Subscriptions.create(new Action0(){ - @Override - public void call() { - try { - // emulate "unsubscribe" is busy and finishes after timeout.onNext(1) - timeoutEmittedOne.await(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - }})); + enteredTimeoutOne.countDown(); // force the timeout message be sent after observer.onNext(2) - try { - observerReceivedTwo.await(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - if(!subscriber.isUnsubscribed()) { - subscriber.onNext(1); - timeoutEmittedOne.countDown(); + while (true) { + try { + if (!observerReceivedTwo.await(30, TimeUnit.SECONDS)) { + // CountDownLatch timeout + // There should be something wrong + latchTimeout.set(true); + } + break; + } catch (InterruptedException e) { + // Since we just want to emulate a busy method, + // we ignore the interrupt signal from Scheduler. + } } + subscriber.onNext(1); + timeoutEmittedOne.countDown(); } }).subscribeOn(Schedulers.newThread()); } else { @@ -386,16 +395,27 @@ public Void answer(InvocationOnMock invocation) throws Throwable { }).when(o).onCompleted(); + final TestSubscriber ts = new TestSubscriber(o); + new Thread(new Runnable() { @Override public void run() { PublishSubject source = PublishSubject.create(); - source.timeout(timeoutFunc, Observable.from(3)).subscribe(o); + source.timeout(timeoutFunc, Observable.from(3)).subscribe(ts); source.onNext(1); // start timeout + try { + if(!enteredTimeoutOne.await(30, TimeUnit.SECONDS)) { + latchTimeout.set(true); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } source.onNext(2); // disable timeout try { - timeoutEmittedOne.await(); + if(!timeoutEmittedOne.await(30, TimeUnit.SECONDS)) { + latchTimeout.set(true); + } } catch (InterruptedException e) { e.printStackTrace(); } @@ -404,7 +424,11 @@ public void run() { }).start(); - observerCompleted.await(); + if(!observerCompleted.await(30, TimeUnit.SECONDS)) { + latchTimeout.set(true); + } + + assertFalse("CoundDownLatch timeout", latchTimeout.get()); InOrder inOrder = inOrder(o); inOrder.verify(o).onNext(1);