From 641d4bc31dce250b9de821a0b780ace815131074 Mon Sep 17 00:00:00 2001 From: David Marques Date: Wed, 4 Jun 2014 12:17:52 -0400 Subject: [PATCH 1/4] Ensuring Runnables posted with delay to a Handler are removed when unsubscribed. This patch ensures the delayed runnables posted to a Handler are properly removed when Subscription.unsubscribe() is called on the Observable. The original code returns the subscription from schedule() but is not used by the callers who instead add the Worker itself as a subsciption. Signed-off-by: David Marques --- .../schedulers/HandlerThreadScheduler.java | 26 ++++++---- .../HandlerThreadSchedulerTest.java | 50 +++++++++++++++++++ 2 files changed, 65 insertions(+), 11 deletions(-) diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java b/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java index 68d1bb43a1..58a4c23099 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java @@ -1,12 +1,12 @@ /** * Copyright 2014 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,6 +22,7 @@ import rx.functions.Action0; import rx.functions.Action1; import rx.subscriptions.BooleanSubscription; +import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; import android.os.Handler; @@ -34,7 +35,7 @@ public class HandlerThreadScheduler extends Scheduler { /** * Constructs a {@link HandlerThreadScheduler} using the given {@link Handler} - * + * * @param handler * {@link Handler} to use when scheduling actions */ @@ -46,11 +47,12 @@ public HandlerThreadScheduler(Handler handler) { public Worker createWorker() { return new InnerHandlerThreadScheduler(handler); } - + private static class InnerHandlerThreadScheduler extends Worker { private final Handler handler; - private BooleanSubscription innerSubscription = new BooleanSubscription(); + + private final CompositeSubscription mCompositeSubscription = new CompositeSubscription(); public InnerHandlerThreadScheduler(Handler handler) { this.handler = handler; @@ -58,12 +60,12 @@ public InnerHandlerThreadScheduler(Handler handler) { @Override public void unsubscribe() { - innerSubscription.unsubscribe(); + mCompositeSubscription.unsubscribe(); } @Override public boolean isUnsubscribed() { - return innerSubscription.isUnsubscribed(); + return mCompositeSubscription.isUnsubscribed(); } @Override @@ -78,15 +80,17 @@ public void run() { } }; handler.postDelayed(runnable, unit.toMillis(delayTime)); - return Subscriptions.create(new Action0() { + final Subscription subscription = Subscriptions.create(new Action0() { @Override public void call() { handler.removeCallbacks(runnable); - + } - }); + mCompositeSubscription.add(subscription); + + return Subscriptions.empty(); } @Override diff --git a/rxjava-contrib/rxjava-android/src/test/java/rx/android/schedulers/HandlerThreadSchedulerTest.java b/rxjava-contrib/rxjava-android/src/test/java/rx/android/schedulers/HandlerThreadSchedulerTest.java index 3afea125f1..e370f0a9c0 100644 --- a/rxjava-contrib/rxjava-android/src/test/java/rx/android/schedulers/HandlerThreadSchedulerTest.java +++ b/rxjava-contrib/rxjava-android/src/test/java/rx/android/schedulers/HandlerThreadSchedulerTest.java @@ -18,18 +18,26 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.util.concurrent.TimeUnit; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; +import org.mockito.Matchers; +import org.robolectric.Robolectric; import org.robolectric.RobolectricTestRunner; import org.robolectric.annotation.Config; +import rx.Observable; import rx.Scheduler; import rx.Scheduler.Worker; +import rx.Subscriber; +import rx.Subscription; import rx.functions.Action0; import rx.functions.Action1; import android.os.Handler; @@ -75,4 +83,46 @@ public void shouldScheduleDelayedActionOnHandlerThread() { runnable.getValue().run(); verify(action).call(); } + + @Test + public void shouldRemoveCallbacksFromHandlerWhenUnsubscribedSubscription() { + final Handler handler = spy(new Handler()); + final Observable.OnSubscribe onSubscribe = mock(Observable.OnSubscribe.class); + final Subscription subscription = Observable.create(onSubscribe).subscribeOn( + new HandlerThreadScheduler(handler)).subscribe(); + + verify(onSubscribe).call(Matchers.any(Subscriber.class)); + + subscription.unsubscribe(); + + verify(handler).removeCallbacks(Matchers.any(Runnable.class)); + } + + @Test + public void shouldNotCallOnSubscribeWhenSubscriptionUnsubscribedBeforeDelay() { + final Observable.OnSubscribe onSubscribe = mock(Observable.OnSubscribe.class); + final Handler handler = spy(new Handler()); + + final Scheduler scheduler = new HandlerThreadScheduler(handler); + final Worker worker = spy(scheduler.createWorker()); + + final Scheduler spyScheduler = spy(scheduler); + when(spyScheduler.createWorker()).thenReturn(worker); + + final Subscription subscription = Observable.create(onSubscribe) + .delaySubscription(1, TimeUnit.MINUTES, spyScheduler) + .subscribe(); + + verify(worker).schedule(Matchers.any(Action0.class), + Matchers.eq(1L), Matchers.eq(TimeUnit.MINUTES)); + verify(handler).postDelayed(Matchers.any(Runnable.class), + Matchers.eq(TimeUnit.MINUTES.toMillis(1L))); + + subscription.unsubscribe(); + + Robolectric.runUiThreadTasksIncludingDelayedTasks(); + + verify(onSubscribe, never()).call(Matchers.any(Subscriber.class)); + verify(handler).removeCallbacks(Matchers.any(Runnable.class)); + } } From 00459308557548fbd05f386048515d255ac53781 Mon Sep 17 00:00:00 2001 From: David Marques Date: Wed, 4 Jun 2014 20:52:28 -0400 Subject: [PATCH 2/4] Using a ScheduledAction in order to ensure correct subscription behaviour. Signed-off-by: David Marques --- .../schedulers/HandlerThreadScheduler.java | 26 +++++++------------ 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java b/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java index 58a4c23099..ccd8aeda22 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java @@ -21,6 +21,7 @@ import rx.Subscription; import rx.functions.Action0; import rx.functions.Action1; +import rx.internal.schedulers.ScheduledAction; import rx.subscriptions.BooleanSubscription; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; @@ -70,27 +71,18 @@ public boolean isUnsubscribed() { @Override public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { - final Runnable runnable = new Runnable() { - @Override - public void run() { - if (isUnsubscribed()) { - return; - } - action.call(); - } - }; - handler.postDelayed(runnable, unit.toMillis(delayTime)); - - final Subscription subscription = Subscriptions.create(new Action0() { + final ScheduledAction scheduledAction = new ScheduledAction(action); + scheduledAction.addParent(mCompositeSubscription); + scheduledAction.add(Subscriptions.create(new Action0() { @Override public void call() { - handler.removeCallbacks(runnable); - + handler.removeCallbacks(scheduledAction); } - }); - mCompositeSubscription.add(subscription); + })); + + handler.postDelayed(scheduledAction, unit.toMillis(delayTime)); - return Subscriptions.empty(); + return scheduledAction; } @Override From 09c3b624955a8fded421a2ee325ef9021296f405 Mon Sep 17 00:00:00 2001 From: David Marques Date: Thu, 5 Jun 2014 09:52:36 -0400 Subject: [PATCH 3/4] Adding ScheduledSubscription to parent in order to properly chain unsubscribe. Signed-off-by: David Marques --- .../java/rx/android/schedulers/HandlerThreadScheduler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java b/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java index ccd8aeda22..fec2eb80df 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java @@ -72,13 +72,14 @@ public boolean isUnsubscribed() { @Override public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { final ScheduledAction scheduledAction = new ScheduledAction(action); - scheduledAction.addParent(mCompositeSubscription); scheduledAction.add(Subscriptions.create(new Action0() { @Override public void call() { handler.removeCallbacks(scheduledAction); } })); + scheduledAction.addParent(mCompositeSubscription); + mCompositeSubscription.add(scheduledAction); handler.postDelayed(scheduledAction, unit.toMillis(delayTime)); @@ -91,5 +92,4 @@ public Subscription schedule(final Action0 action) { } } - } From b5655d2619808446ca798ee976974b419bb80238 Mon Sep 17 00:00:00 2001 From: David Marques Date: Sat, 7 Jun 2014 06:38:00 -0400 Subject: [PATCH 4/4] Using the correct code style for variable name and removing unused imports. Signed-off-by: David Marques --- .../android/schedulers/HandlerThreadScheduler.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java b/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java index fec2eb80df..4e58087023 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java @@ -20,9 +20,7 @@ import rx.Scheduler; import rx.Subscription; import rx.functions.Action0; -import rx.functions.Action1; import rx.internal.schedulers.ScheduledAction; -import rx.subscriptions.BooleanSubscription; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; import android.os.Handler; @@ -53,7 +51,7 @@ private static class InnerHandlerThreadScheduler extends Worker { private final Handler handler; - private final CompositeSubscription mCompositeSubscription = new CompositeSubscription(); + private final CompositeSubscription compositeSubscription = new CompositeSubscription(); public InnerHandlerThreadScheduler(Handler handler) { this.handler = handler; @@ -61,12 +59,12 @@ public InnerHandlerThreadScheduler(Handler handler) { @Override public void unsubscribe() { - mCompositeSubscription.unsubscribe(); + compositeSubscription.unsubscribe(); } @Override public boolean isUnsubscribed() { - return mCompositeSubscription.isUnsubscribed(); + return compositeSubscription.isUnsubscribed(); } @Override @@ -78,8 +76,8 @@ public void call() { handler.removeCallbacks(scheduledAction); } })); - scheduledAction.addParent(mCompositeSubscription); - mCompositeSubscription.add(scheduledAction); + scheduledAction.addParent(compositeSubscription); + compositeSubscription.add(scheduledAction); handler.postDelayed(scheduledAction, unit.toMillis(delayTime));