diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorObserveOn.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorObserveOn.java index 42bad82670..e4932e4982 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorObserveOn.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorObserveOn.java @@ -72,11 +72,13 @@ private static final class ObserveOnSubscriber extends Subscriber { = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "counter"); public ObserveOnSubscriber(Scheduler scheduler, Subscriber subscriber) { - super(subscriber); this.observer = subscriber; this.recursiveScheduler = scheduler.createWorker(); this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler); - subscriber.add(scheduledUnsubscribe); + add(scheduledUnsubscribe); + + subscriber.add(recursiveScheduler); + subscriber.add(this); } @Override diff --git a/rxjava-core/src/test/java/rx/internal/operators/OperatorObserveOnTest.java b/rxjava-core/src/test/java/rx/internal/operators/OperatorObserveOnTest.java index e07af38a10..2bb90a06a8 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/OperatorObserveOnTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/OperatorObserveOnTest.java @@ -40,6 +40,7 @@ import rx.Observable; import rx.Observer; import rx.Scheduler; +import rx.Subscription; import rx.exceptions.TestException; import rx.functions.Action0; import rx.functions.Action1; @@ -389,4 +390,21 @@ public void testDelayedErrorDeliveryWhenSafeSubscriberUnsubscribes() { inOrder.verify(o, never()).onNext(anyInt()); inOrder.verify(o, never()).onCompleted(); } + + @Test + public void testAfterUnsubscribeCalledThenObserverOnNextNeverCalled() { + final TestScheduler testScheduler = new TestScheduler(); + final Observer observer = mock(Observer.class); + final Subscription subscription = Observable.from(1, 2, 3) + .observeOn(testScheduler) + .subscribe(observer); + subscription.unsubscribe(); + testScheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + final InOrder inOrder = inOrder(observer); + + inOrder.verify(observer, never()).onNext(anyInt()); + inOrder.verify(observer, never()).onError(any(Exception.class)); + inOrder.verify(observer, never()).onCompleted(); + } }