(scheduler, false));
+ }
+ /**
+ * Asynchronously subscribes and unsubscribes Observers to this Observable on the specified {@link Scheduler}
+ * and allows buffering some events emitted from the source in the time gap between the original and
+ * actual subscription, and any excess events will block the source until the actual subscription happens.
+ *
+ * This overload should help mitigate issues when subscribing to a PublishSubject (and derivatives
+ * such as GroupedObservable in operator groupBy) and events fired between the original and actual subscriptions
+ * are lost.
+ *
+ *
+ *
+ * @param scheduler
+ * the {@link Scheduler} to perform subscription and unsubscription actions on
+ * @param bufferSize the number of events to buffer before blocking the source while in the time gap,
+ * negative value indicates an unlimited buffer
+ * @return the source Observable modified so that its subscriptions and unsubscriptions happen
+ * on the specified {@link Scheduler}
+ * @see RxJava Wiki: subscribeOn()
+ */
+ public final Observable subscribeOn(Scheduler scheduler, int bufferSize) {
+ return nest().lift(new OperatorSubscribeOn(scheduler, true, bufferSize));
}
/**
diff --git a/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java b/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java
new file mode 100644
index 0000000000..7526674636
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java
@@ -0,0 +1,193 @@
+/**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.operators;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import rx.Subscriber;
+import rx.subscriptions.CompositeSubscription;
+
+/**
+ * Buffers the incoming events until notified, then replays the
+ * buffered events and continues as a simple pass-through subscriber.
+ * @param the streamed value type
+ */
+public class BufferUntilSubscriber extends Subscriber {
+ /** The actual subscriber. */
+ private final Subscriber super T> actual;
+ /** Indicate the pass-through mode. */
+ private volatile boolean passthroughMode;
+ /** Protect mode transition. */
+ private final Object gate = new Object();
+ /** The buffered items. */
+ private final Queue queue = new LinkedList();
+ /** The queue capacity. */
+ private final int capacity;
+ /** Null sentinel (in case queue type is changed). */
+ private static final Object NULL_SENTINEL = new Object();
+ /** Complete sentinel. */
+ private static final Object COMPLETE_SENTINEL = new Object();
+ /**
+ * Container for an onError event.
+ */
+ private static final class ErrorSentinel {
+ final Throwable t;
+
+ public ErrorSentinel(Throwable t) {
+ this.t = t;
+ }
+
+ }
+ /**
+ * Constructor that wraps the actual subscriber and shares its subscription.
+ * @param capacity the queue capacity to accept before blocking, negative value indicates an unbounded queue
+ * @param actual
+ */
+ public BufferUntilSubscriber(int capacity, Subscriber super T> actual) {
+ super(actual);
+ this.actual = actual;
+ this.capacity = capacity;
+ }
+ /**
+ * Constructor that wraps the actual subscriber and uses the given composite
+ * subscription.
+ * @param capacity the queue capacity to accept before blocking, negative value indicates an unbounded queue
+ * @param actual
+ * @param cs
+ */
+ public BufferUntilSubscriber(int capacity, Subscriber super T> actual, CompositeSubscription cs) {
+ super(cs);
+ this.actual = actual;
+ this.capacity = capacity;
+ }
+
+ /**
+ * Call this method to replay the buffered events and continue as a pass-through subscriber.
+ * If already in pass-through mode, this method is a no-op.
+ */
+ public void enterPassthroughMode() {
+ if (!passthroughMode) {
+ synchronized (gate) {
+ if (!passthroughMode) {
+ while (!queue.isEmpty()) {
+ Object o = queue.poll();
+ if (!actual.isUnsubscribed()) {
+ if (o == NULL_SENTINEL) {
+ actual.onNext(null);
+ } else
+ if (o == COMPLETE_SENTINEL) {
+ actual.onCompleted();
+ } else
+ if (o instanceof ErrorSentinel) {
+ actual.onError(((ErrorSentinel)o).t);
+ } else
+ if (o != null) {
+ @SuppressWarnings("unchecked")
+ T v = (T)o;
+ actual.onNext(v);
+ } else {
+ throw new NullPointerException();
+ }
+ }
+ }
+ passthroughMode = true;
+ gate.notifyAll();
+ }
+ }
+ }
+ }
+ @Override
+ public void onNext(T t) {
+ if (!passthroughMode) {
+ synchronized (gate) {
+ if (!passthroughMode) {
+ if (capacity < 0 || queue.size() < capacity) {
+ queue.offer(t != null ? t : NULL_SENTINEL);
+ return;
+ }
+ try {
+ while (!passthroughMode) {
+ gate.wait();
+ }
+ if (actual.isUnsubscribed()) {
+ return;
+ }
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ actual.onError(ex);
+ return;
+ }
+ }
+ }
+ }
+ actual.onNext(t);
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ if (!passthroughMode) {
+ synchronized (gate) {
+ if (!passthroughMode) {
+ if (capacity < 0 || queue.size() < capacity) {
+ queue.offer(new ErrorSentinel(e));
+ return;
+ }
+ try {
+ while (!passthroughMode) {
+ gate.wait();
+ }
+ if (actual.isUnsubscribed()) {
+ return;
+ }
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ actual.onError(ex);
+ return;
+ }
+ }
+ }
+ }
+ actual.onError(e);
+ }
+
+ @Override
+ public void onCompleted() {
+ if (!passthroughMode) {
+ synchronized (gate) {
+ if (!passthroughMode) {
+ if (capacity < 0 || queue.size() < capacity) {
+ queue.offer(COMPLETE_SENTINEL);
+ return;
+ }
+ try {
+ while (!passthroughMode) {
+ gate.wait();
+ }
+ if (actual.isUnsubscribed()) {
+ return;
+ }
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ actual.onError(ex);
+ return;
+ }
+ }
+ }
+ }
+ actual.onCompleted();
+ }
+
+}
diff --git a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java
index bdf33d501b..55bef3bfdb 100644
--- a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java
+++ b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java
@@ -20,6 +20,8 @@
import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.Subscriber;
+import rx.observables.GroupedObservable;
+import rx.subjects.PublishSubject;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
@@ -33,9 +35,27 @@
public class OperatorSubscribeOn implements Operator> {
private final Scheduler scheduler;
-
- public OperatorSubscribeOn(Scheduler scheduler) {
+ /**
+ * Indicate that events fired between the original subscription time and
+ * the actual subscription time should not get lost.
+ */
+ private final boolean dontLoseEvents;
+ /** The buffer size to avoid flooding. Negative value indicates an unbounded buffer. */
+ private final int bufferSize;
+ public OperatorSubscribeOn(Scheduler scheduler, boolean dontLoseEvents) {
+ this(scheduler, dontLoseEvents, -1);
+ }
+ /**
+ * Construct a SubscribeOn operator.
+ * @param scheduler the target scheduler
+ * @param dontLoseEvents indicate that events should be buffered until the actual subscription happens
+ * @param bufferSize if dontLoseEvents == true, this indicates the buffer size. Filling the buffer will
+ * block the source. -1 indicates an unbounded buffer
+ */
+ public OperatorSubscribeOn(Scheduler scheduler, boolean dontLoseEvents, int bufferSize) {
this.scheduler = scheduler;
+ this.dontLoseEvents = dontLoseEvents;
+ this.bufferSize = bufferSize;
}
@Override
@@ -51,9 +71,38 @@ public void onCompleted() {
public void onError(Throwable e) {
subscriber.onError(e);
}
-
+ boolean checkNeedBuffer(Observable> o) {
+ return dontLoseEvents || ((o instanceof GroupedObservable, ?>)
+ || (o instanceof PublishSubject>)
+ // || (o instanceof BehaviorSubject, ?>)
+ );
+ }
@Override
public void onNext(final Observable o) {
+ if (checkNeedBuffer(o)) {
+ final CompositeSubscription cs = new CompositeSubscription();
+ subscriber.add(cs);
+ final BufferUntilSubscriber bus = new BufferUntilSubscriber(bufferSize, subscriber, new CompositeSubscription());
+ o.subscribe(bus);
+ scheduler.schedule(new Action1() {
+ @Override
+ public void call(final Inner inner) {
+ cs.add(Subscriptions.create(new Action0() {
+ @Override
+ public void call() {
+ inner.schedule(new Action1() {
+ @Override
+ public void call(final Inner inner) {
+ bus.unsubscribe();
+ }
+ });
+ }
+ }));
+ bus.enterPassthroughMode();
+ }
+ });
+ return;
+ }
scheduler.schedule(new Action1() {
@Override
diff --git a/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java
index a0dfe2e7fc..b05b7724f8 100644
--- a/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java
+++ b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java
@@ -1,77 +1,92 @@
-/**
- * Copyright 2014 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ /**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package rx.operators;
+import java.util.ArrayList;
import static org.junit.Assert.*;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.junit.Ignore;
import org.junit.Test;
import rx.Observable;
import rx.Observable.OnSubscribe;
+import rx.Scheduler;
+import rx.Scheduler.Inner;
import rx.Subscriber;
import rx.Subscription;
+import rx.observables.GroupedObservable;
import rx.observers.TestObserver;
+import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
+import rx.subjects.PublishSubject;
+import rx.subscriptions.CompositeSubscription;
+import rx.subscriptions.MultipleAssignmentSubscription;
import rx.subscriptions.Subscriptions;
+import rx.util.Timestamped;
import rx.util.functions.Action0;
+import rx.util.functions.Action1;
+import rx.util.functions.Func1;
public class OperatorSubscribeOnTest {
-
+
private class ThreadSubscription implements Subscription {
private volatile Thread thread;
-
+
private final CountDownLatch latch = new CountDownLatch(1);
-
+
private final Subscription s = Subscriptions.create(new Action0() {
-
+
@Override
public void call() {
thread = Thread.currentThread();
latch.countDown();
}
-
+
});
-
+
@Override
public void unsubscribe() {
s.unsubscribe();
}
-
+
@Override
public boolean isUnsubscribed() {
return s.isUnsubscribed();
}
-
+
public Thread getThread() throws InterruptedException {
latch.await();
return thread;
}
}
-
+
@Test
public void testSubscribeOnAndVerifySubscribeAndUnsubscribeThreads()
throws InterruptedException {
final ThreadSubscription subscription = new ThreadSubscription();
final AtomicReference subscribeThread = new AtomicReference();
Observable w = Observable.create(new OnSubscribe() {
-
+
@Override
public void call(Subscriber super Integer> t1) {
subscribeThread.set(Thread.currentThread());
@@ -81,33 +96,33 @@ public void call(Subscriber super Integer> t1) {
t1.onCompleted();
}
});
-
+
TestObserver observer = new TestObserver();
w.subscribeOn(Schedulers.newThread()).subscribe(observer);
-
+
Thread unsubscribeThread = subscription.getThread();
-
+
assertNotNull(unsubscribeThread);
assertNotSame(Thread.currentThread(), unsubscribeThread);
-
+
assertNotNull(subscribeThread.get());
assertNotSame(Thread.currentThread(), subscribeThread.get());
// True for Schedulers.newThread()
assertTrue(unsubscribeThread == subscribeThread.get());
-
+
observer.assertReceivedOnNext(Arrays.asList(1, 2));
observer.assertTerminalEvent();
}
-
+
@Test
public void testIssue813() throws InterruptedException {
// https://github.com/Netflix/RxJava/issues/813
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch doneLatch = new CountDownLatch(1);
-
+
TestObserver observer = new TestObserver();
final ThreadSubscription s = new ThreadSubscription();
-
+
final Subscription subscription = Observable
.create(new Observable.OnSubscribe() {
@Override
@@ -132,7 +147,7 @@ public void call(
}
}
}).subscribeOn(Schedulers.computation()).subscribe(observer);
-
+
subscription.unsubscribe();
// As unsubscribe is called in other thread, we need to wait for it.
s.getThread();
@@ -141,4 +156,218 @@ public void call(
assertEquals(0, observer.getOnErrorEvents().size());
assertEquals(1, observer.getOnCompletedEvents().size());
}
+
+ public static class SlowScheduler extends Scheduler {
+ final Scheduler actual;
+ final long delay;
+ final TimeUnit unit;
+ public SlowScheduler() {
+ this(Schedulers.computation(), 2, TimeUnit.SECONDS);
+ }
+ public SlowScheduler(Scheduler actual, long delay, TimeUnit unit) {
+ this.actual = actual;
+ this.delay = delay;
+ this.unit = unit;
+ }
+
+ @Override
+ public Subscription schedule(final Action1 action) {
+ return actual.schedule(action, delay, unit);
+ }
+
+ @Override
+ public Subscription schedule(final Action1 action, final long delayTime, final TimeUnit delayUnit) {
+ TimeUnit common = delayUnit.compareTo(unit) < 0 ? delayUnit : unit;
+ long t = common.convert(delayTime, delayUnit) + common.convert(delay, unit);
+ return actual.schedule(action, t, common);
+ }
+ }
+
+ @Test
+ public void testSubscribeOnPublishSubjectWithSlowScheduler() {
+ PublishSubject ps = PublishSubject.create();
+ TestSubscriber ts = new TestSubscriber();
+ ps.subscribeOn(new SlowScheduler()).subscribe(ts);
+ ps.onNext(1);
+ ps.onNext(2);
+ ps.onCompleted();
+
+ ts.awaitTerminalEvent();
+ ts.assertReceivedOnNext(Arrays.asList(1, 2));
+ }
+
+ @Test
+ public void testGroupsWithNestedSubscribeOn() throws InterruptedException {
+ final ArrayList results = new ArrayList();
+ Observable.create(new OnSubscribe() {
+
+ @Override
+ public void call(Subscriber super Integer> sub) {
+ sub.onNext(1);
+ sub.onNext(2);
+ sub.onNext(1);
+ sub.onNext(2);
+ sub.onCompleted();
+ }
+
+ }).groupBy(new Func1() {
+
+ @Override
+ public Integer call(Integer t) {
+ return t;
+ }
+
+ }).flatMap(new Func1, Observable>() {
+
+ @Override
+ public Observable call(final GroupedObservable group) {
+ return group.subscribeOn(Schedulers.newThread()).map(new Func1() {
+
+ @Override
+ public String call(Integer t1) {
+ System.out.println("Received: " + t1 + " on group : " + group.getKey());
+ return "first groups: " + t1;
+ }
+
+ });
+ }
+
+ }).toBlockingObservable().forEach(new Action1() {
+
+ @Override
+ public void call(String s) {
+ results.add(s);
+ }
+
+ });
+
+ System.out.println("Results: " + results);
+ assertEquals(4, results.size());
+ }
+
+ @Test
+ public void testFirstGroupsCompleteAndParentSlowToThenEmitFinalGroupsWhichThenSubscribesOnAndDelaysAndThenCompletes() throws InterruptedException {
+ final CountDownLatch first = new CountDownLatch(2); // there are two groups to first complete
+ final ArrayList results = new ArrayList();
+ Observable.create(new OnSubscribe() {
+
+ @Override
+ public void call(Subscriber super Integer> sub) {
+ sub.onNext(1);
+ sub.onNext(2);
+ sub.onNext(1);
+ sub.onNext(2);
+ try {
+ first.await();
+ } catch (InterruptedException e) {
+ sub.onError(e);
+ return;
+ }
+ sub.onNext(3);
+ sub.onNext(3);
+ sub.onCompleted();
+ }
+
+ }).groupBy(new Func1() {
+
+ @Override
+ public Integer call(Integer t) {
+ return t;
+ }
+
+ }).flatMap(new Func1, Observable>() {
+
+ @Override
+ public Observable call(final GroupedObservable group) {
+ if (group.getKey() < 3) {
+ return group.map(new Func1() {
+
+ @Override
+ public String call(Integer t1) {
+ return "first groups: " + t1;
+ }
+
+ })
+ // must take(2) so an onCompleted + unsubscribe happens on these first 2 groups
+ .take(2).doOnCompleted(new Action0() {
+
+ @Override
+ public void call() {
+ first.countDown();
+ }
+
+ });
+ } else {
+ return group.subscribeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Func1() {
+
+ @Override
+ public String call(Integer t1) {
+ return "last group: " + t1;
+ }
+
+ });
+ }
+ }
+
+ }).toBlockingObservable().forEach(new Action1() {
+
+ @Override
+ public void call(String s) {
+ results.add(s);
+ }
+
+ });
+
+ System.out.println("Results: " + results);
+ assertEquals(6, results.size());
+ }
+ void testBoundedBufferingWithSize(int size) throws Exception {
+ Observable timer = Observable.timer(100, 100, TimeUnit.MILLISECONDS);
+
+ final List deltas = Collections.synchronizedList(new ArrayList());
+
+ Subscription s = timer.timestamp().subscribeOn(
+ new SlowScheduler(Schedulers.computation(), 1, TimeUnit.SECONDS), size).map(new Func1, Long>() {
+ @Override
+ public Long call(Timestamped t1) {
+ long v = System.currentTimeMillis() - t1.getTimestampMillis();
+ return v;
+ }
+ }).doOnNext(new Action1() {
+ @Override
+ public void call(Long t1) {
+ deltas.add(t1);
+ }
+ }).subscribe();
+
+ Thread.sleep(2050);
+
+ s.unsubscribe();
+
+ if (deltas.size() < size + 1) {
+ fail("To few items in deltas: " + deltas);
+ }
+ for (int i = 0; i < size + 1; i++) {
+ if (deltas.get(i) < 500) {
+ fail(i + "th item arrived too early: " + deltas);
+ }
+ }
+ for (int i = size + 1; i < deltas.size(); i++) {
+ if (deltas.get(i) >= 500) {
+ fail(i + "th item arrived too late: " + deltas);
+ }
+ }
+ }
+ @Test(timeout = 5000)
+ public void testBoundedBufferingOfZero() throws Exception {
+ testBoundedBufferingWithSize(0);
+ }
+ @Test(timeout = 5000)
+ public void testBoundedBufferingOfOne() throws Exception {
+ testBoundedBufferingWithSize(1);
+ }
+ @Test(timeout = 5000)
+ public void testBoundedBufferingOfTwo() throws Exception {
+ testBoundedBufferingWithSize(2);
+ }
}