g) {
+ // Must use observeOn not subscribeOn because we have a single source behind groupBy.
+ // The origin is already subscribed to, we are moving each group on to a new thread
+ // but the origin itself can only be on a single thread.
return f.call(g.observeOn(scheduler));
}
});
diff --git a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java
index bdf33d501b..b8e782ef37 100644
--- a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java
+++ b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java
@@ -20,27 +20,54 @@
import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.Subscriber;
-import rx.subscriptions.CompositeSubscription;
-import rx.subscriptions.Subscriptions;
-import rx.util.functions.Action0;
import rx.util.functions.Action1;
/**
- * Asynchronously subscribes and unsubscribes Observers on the specified Scheduler.
+ * Subscribes and unsubscribes Observers on the specified Scheduler.
*
+ * Will occur asynchronously except when subscribing to `GroupedObservable`, `PublishSubject` and possibly other "hot" Observables
+ * in which case it will subscribe synchronously and buffer/block onNext calls until the subscribe has occurred.
+ *
+ * See https://github.com/Netflix/RxJava/issues/844 for more information on the "time gap" issue that the synchronous
+ * subscribe is solving.
+ *
*
*/
public class OperatorSubscribeOn implements Operator> {
private final Scheduler scheduler;
+ /**
+ * Indicate that events fired between the original subscription time and
+ * the actual subscription time should not get lost.
+ */
+ private final boolean dontLoseEvents;
+ /** The buffer size to avoid flooding. Negative value indicates an unbounded buffer. */
+ private final int bufferSize;
public OperatorSubscribeOn(Scheduler scheduler) {
this.scheduler = scheduler;
+ this.dontLoseEvents = false;
+ this.bufferSize = -1;
+ }
+
+ /**
+ * Construct a SubscribeOn operator.
+ *
+ * @param scheduler
+ * the target scheduler
+ * @param bufferSize
+ * if dontLoseEvents == true, this indicates the buffer size. Filling the buffer will
+ * block the source. -1 indicates an unbounded buffer
+ */
+ public OperatorSubscribeOn(Scheduler scheduler, int bufferSize) {
+ this.scheduler = scheduler;
+ this.dontLoseEvents = true;
+ this.bufferSize = bufferSize;
}
@Override
public Subscriber super Observable> call(final Subscriber super T> subscriber) {
- return new Subscriber>() {
+ return new Subscriber>(subscriber) {
@Override
public void onCompleted() {
@@ -52,48 +79,33 @@ public void onError(Throwable e) {
subscriber.onError(e);
}
+ boolean checkNeedBuffer(Observable> o) {
+ return dontLoseEvents;
+ }
+
@Override
public void onNext(final Observable o) {
- scheduler.schedule(new Action1() {
-
- @Override
- public void call(final Inner inner) {
- final CompositeSubscription cs = new CompositeSubscription();
- subscriber.add(Subscriptions.create(new Action0() {
-
- @Override
- public void call() {
- inner.schedule(new Action1() {
-
- @Override
- public void call(final Inner inner) {
- cs.unsubscribe();
- }
-
- });
- }
-
- }));
- cs.add(subscriber);
- o.subscribe(new Subscriber(cs) {
-
- @Override
- public void onCompleted() {
- subscriber.onCompleted();
- }
-
- @Override
- public void onError(Throwable e) {
- subscriber.onError(e);
- }
-
- @Override
- public void onNext(T t) {
- subscriber.onNext(t);
- }
- });
- }
- });
+ if (checkNeedBuffer(o)) {
+ // use buffering (possibly blocking) for a possibly synchronous subscribe
+ final BufferUntilSubscriber bus = new BufferUntilSubscriber(bufferSize, subscriber);
+ o.subscribe(bus);
+ subscriber.add(scheduler.schedule(new Action1() {
+ @Override
+ public void call(final Inner inner) {
+ bus.enterPassthroughMode();
+ }
+ }));
+ return;
+ } else {
+ // no buffering (async subscribe)
+ subscriber.add(scheduler.schedule(new Action1() {
+
+ @Override
+ public void call(final Inner inner) {
+ o.subscribe(subscriber);
+ }
+ }));
+ }
}
};
diff --git a/rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java b/rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java
index 7fc1207208..2f201dbb1b 100644
--- a/rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java
+++ b/rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java
@@ -253,7 +253,6 @@ public void testUnsubscribeOnNestedTakeAndSyncInfiniteStream() throws Interrupte
/*
* We will only take 1 group with 20 events from it and then unsubscribe.
*/
- @Ignore // failing because of subscribeOn time gap issue: https://github.com/Netflix/RxJava/issues/844
@Test
public void testUnsubscribeOnNestedTakeAndAsyncInfiniteStream() throws InterruptedException {
final AtomicInteger subscribeCounter = new AtomicInteger();
@@ -648,7 +647,6 @@ public void call(String s) {
assertEquals(6, results.size());
}
- @Ignore // failing because of subscribeOn time gap issue: https://github.com/Netflix/RxJava/issues/844
@Test
public void testFirstGroupsCompleteAndParentSlowToThenEmitFinalGroupsWhichThenSubscribesOnAndDelaysAndThenCompletes() throws InterruptedException {
final CountDownLatch first = new CountDownLatch(2); // there are two groups to first complete
@@ -702,7 +700,7 @@ public void call() {
});
} else {
- return group.subscribeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Func1() {
+ return group.nest().lift(new OperatorSubscribeOn(Schedulers.newThread(), 1)).delay(400, TimeUnit.MILLISECONDS).map(new Func1() {
@Override
public String call(Integer t1) {
@@ -803,7 +801,6 @@ public void call(String s) {
assertEquals(6, results.size());
}
- @Ignore // failing because of subscribeOn time gap issue: https://github.com/Netflix/RxJava/issues/844
@Test
public void testGroupsWithNestedSubscribeOn() throws InterruptedException {
final ArrayList results = new ArrayList();
@@ -829,7 +826,7 @@ public Integer call(Integer t) {
@Override
public Observable call(final GroupedObservable group) {
- return group.subscribeOn(Schedulers.newThread()).map(new Func1() {
+ return group.nest().lift(new OperatorSubscribeOn(Schedulers.newThread(), 0)).map(new Func1() {
@Override
public String call(Integer t1) {
diff --git a/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java
index a0dfe2e7fc..54d348dc3c 100644
--- a/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java
+++ b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java
@@ -17,24 +17,36 @@
import static org.junit.Assert.*;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import rx.Observable;
import rx.Observable.OnSubscribe;
+import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
+import rx.observables.GroupedObservable;
import rx.observers.TestObserver;
+import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
+import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;
+import rx.util.Timestamped;
import rx.util.functions.Action0;
+import rx.util.functions.Action1;
+import rx.util.functions.Func1;
public class OperatorSubscribeOnTest {
- private class ThreadSubscription implements Subscription {
+ private static class ThreadSubscription implements Subscription {
private volatile Thread thread;
private final CountDownLatch latch = new CountDownLatch(1);
@@ -99,9 +111,10 @@ public void call(Subscriber super Integer> t1) {
observer.assertTerminalEvent();
}
- @Test
+ @Test(timeout = 2000)
public void testIssue813() throws InterruptedException {
// https://github.com/Netflix/RxJava/issues/813
+ final CountDownLatch scheduled = new CountDownLatch(1);
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch doneLatch = new CountDownLatch(1);
@@ -114,16 +127,14 @@ public void testIssue813() throws InterruptedException {
public void call(
final Subscriber super Integer> subscriber) {
subscriber.add(s);
+ scheduled.countDown();
try {
latch.await();
- // Already called "unsubscribe", "isUnsubscribed"
- // shouble be true
- if (!subscriber.isUnsubscribed()) {
- throw new IllegalStateException(
- "subscriber.isUnsubscribed should be true");
- }
+
+ // this should not run because the await above will be interrupted by the unsubscribe
subscriber.onCompleted();
} catch (InterruptedException e) {
+ System.out.println("Interrupted because it is unsubscribed");
Thread.currentThread().interrupt();
} catch (Throwable e) {
subscriber.onError(e);
@@ -133,12 +144,260 @@ public void call(
}
}).subscribeOn(Schedulers.computation()).subscribe(observer);
+ // wait for scheduling
+ scheduled.await();
+ // trigger unsubscribe
subscription.unsubscribe();
// As unsubscribe is called in other thread, we need to wait for it.
s.getThread();
latch.countDown();
doneLatch.await();
assertEquals(0, observer.getOnErrorEvents().size());
- assertEquals(1, observer.getOnCompletedEvents().size());
+ // 0 because the unsubscribe interrupts and prevents onCompleted from being executed
+ assertEquals(0, observer.getOnCompletedEvents().size());
+ }
+
+ public static class SlowScheduler extends Scheduler {
+ final Scheduler actual;
+ final long delay;
+ final TimeUnit unit;
+
+ public SlowScheduler() {
+ this(Schedulers.computation(), 2, TimeUnit.SECONDS);
+ }
+
+ public SlowScheduler(Scheduler actual, long delay, TimeUnit unit) {
+ this.actual = actual;
+ this.delay = delay;
+ this.unit = unit;
+ }
+
+ @Override
+ public Subscription schedule(final Action1 action) {
+ return actual.schedule(action, delay, unit);
+ }
+
+ @Override
+ public Subscription schedule(final Action1 action, final long delayTime, final TimeUnit delayUnit) {
+ TimeUnit common = delayUnit.compareTo(unit) < 0 ? delayUnit : unit;
+ long t = common.convert(delayTime, delayUnit) + common.convert(delay, unit);
+ return actual.schedule(action, t, common);
+ }
}
+
+ @Test
+ public void testSubscribeOnPublishSubjectWithSlowScheduler() {
+ PublishSubject ps = PublishSubject.create();
+ TestSubscriber ts = new TestSubscriber();
+ ps.nest().lift(new OperatorSubscribeOn(new SlowScheduler(), 0)).subscribe(ts);
+ ps.onNext(1);
+ ps.onNext(2);
+ ps.onCompleted();
+
+ ts.awaitTerminalEvent();
+ ts.assertReceivedOnNext(Arrays.asList(1, 2));
+ }
+
+ @Test
+ public void testGroupsWithNestedSubscribeOn() throws InterruptedException {
+ final ArrayList results = new ArrayList();
+ Observable.create(new OnSubscribe() {
+
+ @Override
+ public void call(Subscriber super Integer> sub) {
+ sub.onNext(1);
+ sub.onNext(2);
+ sub.onNext(1);
+ sub.onNext(2);
+ sub.onCompleted();
+ }
+
+ }).groupBy(new Func1() {
+
+ @Override
+ public Integer call(Integer t) {
+ return t;
+ }
+
+ }).flatMap(new Func1, Observable>() {
+
+ @Override
+ public Observable call(final GroupedObservable group) {
+ return group.nest().lift(new OperatorSubscribeOn(Schedulers.newThread(), 0)).map(new Func1() {
+
+ @Override
+ public String call(Integer t1) {
+ System.out.println("Received: " + t1 + " on group : " + group.getKey());
+ return "first groups: " + t1;
+ }
+
+ });
+ }
+
+ }).toBlockingObservable().forEach(new Action1() {
+
+ @Override
+ public void call(String s) {
+ results.add(s);
+ }
+
+ });
+
+ System.out.println("Results: " + results);
+ assertEquals(4, results.size());
+ }
+
+ @Test
+ public void testFirstGroupsCompleteAndParentSlowToThenEmitFinalGroupsWhichThenSubscribesOnAndDelaysAndThenCompletes() throws InterruptedException {
+ final CountDownLatch first = new CountDownLatch(2); // there are two groups to first complete
+ final ArrayList results = new ArrayList();
+ Observable.create(new OnSubscribe() {
+
+ @Override
+ public void call(Subscriber super Integer> sub) {
+ sub.onNext(1);
+ sub.onNext(2);
+ sub.onNext(1);
+ sub.onNext(2);
+ try {
+ first.await();
+ } catch (InterruptedException e) {
+ sub.onError(e);
+ return;
+ }
+ sub.onNext(3);
+ sub.onNext(3);
+ sub.onCompleted();
+ }
+
+ }).groupBy(new Func1() {
+
+ @Override
+ public Integer call(Integer t) {
+ return t;
+ }
+
+ }).flatMap(new Func1, Observable>() {
+
+ @Override
+ public Observable call(final GroupedObservable group) {
+ if (group.getKey() < 3) {
+ return group.map(new Func1() {
+
+ @Override
+ public String call(Integer t1) {
+ return "first groups: " + t1;
+ }
+
+ })
+ // must take(2) so an onCompleted + unsubscribe happens on these first 2 groups
+ .take(2).doOnCompleted(new Action0() {
+
+ @Override
+ public void call() {
+ first.countDown();
+ }
+
+ });
+ } else {
+ return group.nest().lift(new OperatorSubscribeOn(Schedulers.newThread(), 0))
+ .delay(400, TimeUnit.MILLISECONDS).map(new Func1() {
+
+ @Override
+ public String call(Integer t1) {
+ return "last group: " + t1;
+ }
+
+ });
+ }
+ }
+
+ }).toBlockingObservable().forEach(new Action1() {
+
+ @Override
+ public void call(String s) {
+ results.add(s);
+ }
+
+ });
+
+ System.out.println("Results: " + results);
+ assertEquals(6, results.size());
+ }
+
+ void testBoundedBufferingWithSize(int size) throws Exception {
+ Observable timer = Observable.timer(100, 100, TimeUnit.MILLISECONDS);
+
+ final List deltas = Collections.synchronizedList(new ArrayList());
+
+ Subscription s = timer.timestamp().nest().lift(new OperatorSubscribeOn>(
+ new SlowScheduler(Schedulers.computation(), 1, TimeUnit.SECONDS), size)).map(new Func1, Long>() {
+ @Override
+ public Long call(Timestamped t1) {
+ long v = System.currentTimeMillis() - t1.getTimestampMillis();
+ return v;
+ }
+ }).doOnNext(new Action1() {
+ @Override
+ public void call(Long t1) {
+ deltas.add(t1);
+ }
+ }).subscribe();
+
+ Thread.sleep(2050);
+
+ s.unsubscribe();
+
+ if (deltas.size() < size + 1) {
+ fail("To few items in deltas: " + deltas);
+ }
+ for (int i = 0; i < size + 1; i++) {
+ if (deltas.get(i) < 500) {
+ fail(i + "th item arrived too early: " + deltas);
+ }
+ }
+ for (int i = size + 1; i < deltas.size(); i++) {
+ if (deltas.get(i) >= 500) {
+ fail(i + "th item arrived too late: " + deltas);
+ }
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testBoundedBufferingOfZero() throws Exception {
+ testBoundedBufferingWithSize(0);
+ }
+
+ @Test(timeout = 5000)
+ public void testBoundedBufferingOfOne() throws Exception {
+ testBoundedBufferingWithSize(1);
+ }
+
+ @Test(timeout = 5000)
+ public void testBoundedBufferingOfTwo() throws Exception {
+ testBoundedBufferingWithSize(2);
+ }
+
+ @Test(timeout = 5000)
+ public void testUnsubscribeInfiniteStream() throws InterruptedException {
+ TestSubscriber ts = new TestSubscriber();
+ final AtomicInteger count = new AtomicInteger();
+ Observable.create(new OnSubscribe() {
+
+ @Override
+ public void call(Subscriber super Integer> sub) {
+ for (int i = 1; !sub.isUnsubscribed(); i++) {
+ count.incrementAndGet();
+ sub.onNext(i);
+ }
+ }
+
+ }).subscribeOn(Schedulers.newThread()).take(10).subscribe(ts);
+
+ ts.awaitTerminalEventAndUnsubscribeOnTimeout(1000, TimeUnit.MILLISECONDS);
+ Thread.sleep(200); // give time for the loop to continue
+ ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
+ assertEquals(10, count.get());
+ }
+
}
diff --git a/rxjava-core/src/test/java/rx/operators/OperatorTimeoutWithSelectorTest.java b/rxjava-core/src/test/java/rx/operators/OperatorTimeoutWithSelectorTest.java
index 2dcb84dc8f..709a82caa3 100644
--- a/rxjava-core/src/test/java/rx/operators/OperatorTimeoutWithSelectorTest.java
+++ b/rxjava-core/src/test/java/rx/operators/OperatorTimeoutWithSelectorTest.java
@@ -15,12 +15,20 @@
*/
package rx.operators;
-import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Test;
import org.mockito.InOrder;
@@ -30,7 +38,9 @@
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
+import rx.Scheduler;
import rx.Subscriber;
+import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;
@@ -328,35 +338,34 @@ public void testTimeoutSelectorWithTimeoutAndOnNextRaceCondition() throws Interr
final CountDownLatch observerReceivedTwo = new CountDownLatch(1);
final CountDownLatch timeoutEmittedOne = new CountDownLatch(1);
final CountDownLatch observerCompleted = new CountDownLatch(1);
+ final CountDownLatch enteredTimeoutOne = new CountDownLatch(1);
+ final AtomicBoolean latchTimeout = new AtomicBoolean(false);
final Func1> timeoutFunc = new Func1>() {
@Override
public Observable call(Integer t1) {
if (t1 == 1) {
// Force "unsubscribe" run on another thread
- return Observable.create(new OnSubscribe(){
+ return Observable.create(new OnSubscribe() {
@Override
public void call(Subscriber super Integer> subscriber) {
- subscriber.add(Subscriptions.create(new Action0(){
- @Override
- public void call() {
- try {
- // emulate "unsubscribe" is busy and finishes after timeout.onNext(1)
- timeoutEmittedOne.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }}));
+ enteredTimeoutOne.countDown();
// force the timeout message be sent after observer.onNext(2)
- try {
- observerReceivedTwo.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- if(!subscriber.isUnsubscribed()) {
- subscriber.onNext(1);
- timeoutEmittedOne.countDown();
+ while (true) {
+ try {
+ if (!observerReceivedTwo.await(30, TimeUnit.SECONDS)) {
+ // CountDownLatch timeout
+ // There should be something wrong
+ latchTimeout.set(true);
+ }
+ break;
+ } catch (InterruptedException e) {
+ // Since we just want to emulate a busy method,
+ // we ignore the interrupt signal from Scheduler.
+ }
}
+ subscriber.onNext(1);
+ timeoutEmittedOne.countDown();
}
}).subscribeOn(Schedulers.newThread());
} else {
@@ -386,16 +395,27 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
}).when(o).onCompleted();
+ final TestSubscriber ts = new TestSubscriber(o);
+
new Thread(new Runnable() {
@Override
public void run() {
PublishSubject source = PublishSubject.create();
- source.timeout(timeoutFunc, Observable.from(3)).subscribe(o);
+ source.timeout(timeoutFunc, Observable.from(3)).subscribe(ts);
source.onNext(1); // start timeout
+ try {
+ if(!enteredTimeoutOne.await(30, TimeUnit.SECONDS)) {
+ latchTimeout.set(true);
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
source.onNext(2); // disable timeout
try {
- timeoutEmittedOne.await();
+ if(!timeoutEmittedOne.await(30, TimeUnit.SECONDS)) {
+ latchTimeout.set(true);
+ }
} catch (InterruptedException e) {
e.printStackTrace();
}
@@ -404,7 +424,11 @@ public void run() {
}).start();
- observerCompleted.await();
+ if(!observerCompleted.await(30, TimeUnit.SECONDS)) {
+ latchTimeout.set(true);
+ }
+
+ assertFalse("CoundDownLatch timeout", latchTimeout.get());
InOrder inOrder = inOrder(o);
inOrder.verify(o).onNext(1);