From cd4f112cdc64715307f3bb950fa108fe467ca627 Mon Sep 17 00:00:00 2001 From: David Marques Date: Fri, 4 Jul 2014 10:25:56 -0400 Subject: [PATCH] Avoiding OperatorObserveOn from calling subscriber.onNext(..) after unsubscribe(). The OperatorObserveOn operator uses a scheduler to cancel subscriptions as well as to deliver the objects passing through it's onNext(..) in the right context. Calling unsubscribe will schedule the actual unsubscription while not making sure that the child subscriber will no longer receive calls to onNext(..) after unsubscribe() returns. This fix makes sure that after unsubscribe() returns no more onNext(..) calls will be made on the child subscribers. Signed-off-by: David Marques --- .../internal/operators/OperatorObserveOn.java | 6 ++++-- .../operators/OperatorObserveOnTest.java | 18 ++++++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorObserveOn.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorObserveOn.java index 42bad82670..e4932e4982 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorObserveOn.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorObserveOn.java @@ -72,11 +72,13 @@ private static final class ObserveOnSubscriber extends Subscriber { = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "counter"); public ObserveOnSubscriber(Scheduler scheduler, Subscriber subscriber) { - super(subscriber); this.observer = subscriber; this.recursiveScheduler = scheduler.createWorker(); this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler); - subscriber.add(scheduledUnsubscribe); + add(scheduledUnsubscribe); + + subscriber.add(recursiveScheduler); + subscriber.add(this); } @Override diff --git a/rxjava-core/src/test/java/rx/internal/operators/OperatorObserveOnTest.java b/rxjava-core/src/test/java/rx/internal/operators/OperatorObserveOnTest.java index e07af38a10..2bb90a06a8 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/OperatorObserveOnTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/OperatorObserveOnTest.java @@ -40,6 +40,7 @@ import rx.Observable; import rx.Observer; import rx.Scheduler; +import rx.Subscription; import rx.exceptions.TestException; import rx.functions.Action0; import rx.functions.Action1; @@ -389,4 +390,21 @@ public void testDelayedErrorDeliveryWhenSafeSubscriberUnsubscribes() { inOrder.verify(o, never()).onNext(anyInt()); inOrder.verify(o, never()).onCompleted(); } + + @Test + public void testAfterUnsubscribeCalledThenObserverOnNextNeverCalled() { + final TestScheduler testScheduler = new TestScheduler(); + final Observer observer = mock(Observer.class); + final Subscription subscription = Observable.from(1, 2, 3) + .observeOn(testScheduler) + .subscribe(observer); + subscription.unsubscribe(); + testScheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + final InOrder inOrder = inOrder(observer); + + inOrder.verify(observer, never()).onNext(anyInt()); + inOrder.verify(observer, never()).onError(any(Exception.class)); + inOrder.verify(observer, never()).onCompleted(); + } }