diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index d83e5a4dbe..639a711d02 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -7060,9 +7060,32 @@ public final Subscription subscribe(Subscriber observer, Scheduler sc * @return the source Observable modified so that its subscriptions and unsubscriptions happen * on the specified {@link Scheduler} * @see RxJava Wiki: subscribeOn() + * @see #subscribeOn(rx.Scheduler, int) */ public final Observable subscribeOn(Scheduler scheduler) { - return nest().lift(new OperatorSubscribeOn(scheduler)); + return nest().lift(new OperatorSubscribeOn(scheduler, false)); + } + /** + * Asynchronously subscribes and unsubscribes Observers to this Observable on the specified {@link Scheduler} + * and allows buffering some events emitted from the source in the time gap between the original and + * actual subscription, and any excess events will block the source until the actual subscription happens. + *

+ * This overload should help mitigate issues when subscribing to a PublishSubject (and derivatives + * such as GroupedObservable in operator groupBy) and events fired between the original and actual subscriptions + * are lost. + *

+ * + * + * @param scheduler + * the {@link Scheduler} to perform subscription and unsubscription actions on + * @param bufferSize the number of events to buffer before blocking the source while in the time gap, + * negative value indicates an unlimited buffer + * @return the source Observable modified so that its subscriptions and unsubscriptions happen + * on the specified {@link Scheduler} + * @see RxJava Wiki: subscribeOn() + */ + public final Observable subscribeOn(Scheduler scheduler, int bufferSize) { + return nest().lift(new OperatorSubscribeOn(scheduler, true, bufferSize)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java b/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java new file mode 100644 index 0000000000..7526674636 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java @@ -0,0 +1,193 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.LinkedList; +import java.util.Queue; +import rx.Subscriber; +import rx.subscriptions.CompositeSubscription; + +/** + * Buffers the incoming events until notified, then replays the + * buffered events and continues as a simple pass-through subscriber. + * @param the streamed value type + */ +public class BufferUntilSubscriber extends Subscriber { + /** The actual subscriber. */ + private final Subscriber actual; + /** Indicate the pass-through mode. */ + private volatile boolean passthroughMode; + /** Protect mode transition. */ + private final Object gate = new Object(); + /** The buffered items. */ + private final Queue queue = new LinkedList(); + /** The queue capacity. */ + private final int capacity; + /** Null sentinel (in case queue type is changed). */ + private static final Object NULL_SENTINEL = new Object(); + /** Complete sentinel. */ + private static final Object COMPLETE_SENTINEL = new Object(); + /** + * Container for an onError event. + */ + private static final class ErrorSentinel { + final Throwable t; + + public ErrorSentinel(Throwable t) { + this.t = t; + } + + } + /** + * Constructor that wraps the actual subscriber and shares its subscription. + * @param capacity the queue capacity to accept before blocking, negative value indicates an unbounded queue + * @param actual + */ + public BufferUntilSubscriber(int capacity, Subscriber actual) { + super(actual); + this.actual = actual; + this.capacity = capacity; + } + /** + * Constructor that wraps the actual subscriber and uses the given composite + * subscription. + * @param capacity the queue capacity to accept before blocking, negative value indicates an unbounded queue + * @param actual + * @param cs + */ + public BufferUntilSubscriber(int capacity, Subscriber actual, CompositeSubscription cs) { + super(cs); + this.actual = actual; + this.capacity = capacity; + } + + /** + * Call this method to replay the buffered events and continue as a pass-through subscriber. + * If already in pass-through mode, this method is a no-op. + */ + public void enterPassthroughMode() { + if (!passthroughMode) { + synchronized (gate) { + if (!passthroughMode) { + while (!queue.isEmpty()) { + Object o = queue.poll(); + if (!actual.isUnsubscribed()) { + if (o == NULL_SENTINEL) { + actual.onNext(null); + } else + if (o == COMPLETE_SENTINEL) { + actual.onCompleted(); + } else + if (o instanceof ErrorSentinel) { + actual.onError(((ErrorSentinel)o).t); + } else + if (o != null) { + @SuppressWarnings("unchecked") + T v = (T)o; + actual.onNext(v); + } else { + throw new NullPointerException(); + } + } + } + passthroughMode = true; + gate.notifyAll(); + } + } + } + } + @Override + public void onNext(T t) { + if (!passthroughMode) { + synchronized (gate) { + if (!passthroughMode) { + if (capacity < 0 || queue.size() < capacity) { + queue.offer(t != null ? t : NULL_SENTINEL); + return; + } + try { + while (!passthroughMode) { + gate.wait(); + } + if (actual.isUnsubscribed()) { + return; + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + actual.onError(ex); + return; + } + } + } + } + actual.onNext(t); + } + + @Override + public void onError(Throwable e) { + if (!passthroughMode) { + synchronized (gate) { + if (!passthroughMode) { + if (capacity < 0 || queue.size() < capacity) { + queue.offer(new ErrorSentinel(e)); + return; + } + try { + while (!passthroughMode) { + gate.wait(); + } + if (actual.isUnsubscribed()) { + return; + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + actual.onError(ex); + return; + } + } + } + } + actual.onError(e); + } + + @Override + public void onCompleted() { + if (!passthroughMode) { + synchronized (gate) { + if (!passthroughMode) { + if (capacity < 0 || queue.size() < capacity) { + queue.offer(COMPLETE_SENTINEL); + return; + } + try { + while (!passthroughMode) { + gate.wait(); + } + if (actual.isUnsubscribed()) { + return; + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + actual.onError(ex); + return; + } + } + } + } + actual.onCompleted(); + } + +} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java index bdf33d501b..55bef3bfdb 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java @@ -20,6 +20,8 @@ import rx.Scheduler; import rx.Scheduler.Inner; import rx.Subscriber; +import rx.observables.GroupedObservable; +import rx.subjects.PublishSubject; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; @@ -33,9 +35,27 @@ public class OperatorSubscribeOn implements Operator> { private final Scheduler scheduler; - - public OperatorSubscribeOn(Scheduler scheduler) { + /** + * Indicate that events fired between the original subscription time and + * the actual subscription time should not get lost. + */ + private final boolean dontLoseEvents; + /** The buffer size to avoid flooding. Negative value indicates an unbounded buffer. */ + private final int bufferSize; + public OperatorSubscribeOn(Scheduler scheduler, boolean dontLoseEvents) { + this(scheduler, dontLoseEvents, -1); + } + /** + * Construct a SubscribeOn operator. + * @param scheduler the target scheduler + * @param dontLoseEvents indicate that events should be buffered until the actual subscription happens + * @param bufferSize if dontLoseEvents == true, this indicates the buffer size. Filling the buffer will + * block the source. -1 indicates an unbounded buffer + */ + public OperatorSubscribeOn(Scheduler scheduler, boolean dontLoseEvents, int bufferSize) { this.scheduler = scheduler; + this.dontLoseEvents = dontLoseEvents; + this.bufferSize = bufferSize; } @Override @@ -51,9 +71,38 @@ public void onCompleted() { public void onError(Throwable e) { subscriber.onError(e); } - + boolean checkNeedBuffer(Observable o) { + return dontLoseEvents || ((o instanceof GroupedObservable) + || (o instanceof PublishSubject) + // || (o instanceof BehaviorSubject) + ); + } @Override public void onNext(final Observable o) { + if (checkNeedBuffer(o)) { + final CompositeSubscription cs = new CompositeSubscription(); + subscriber.add(cs); + final BufferUntilSubscriber bus = new BufferUntilSubscriber(bufferSize, subscriber, new CompositeSubscription()); + o.subscribe(bus); + scheduler.schedule(new Action1() { + @Override + public void call(final Inner inner) { + cs.add(Subscriptions.create(new Action0() { + @Override + public void call() { + inner.schedule(new Action1() { + @Override + public void call(final Inner inner) { + bus.unsubscribe(); + } + }); + } + })); + bus.enterPassthroughMode(); + } + }); + return; + } scheduler.schedule(new Action1() { @Override diff --git a/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java index a0dfe2e7fc..b05b7724f8 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java @@ -1,77 +1,92 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ + /** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.operators; +import java.util.ArrayList; import static org.junit.Assert.*; import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.junit.Ignore; import org.junit.Test; import rx.Observable; import rx.Observable.OnSubscribe; +import rx.Scheduler; +import rx.Scheduler.Inner; import rx.Subscriber; import rx.Subscription; +import rx.observables.GroupedObservable; import rx.observers.TestObserver; +import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.MultipleAssignmentSubscription; import rx.subscriptions.Subscriptions; +import rx.util.Timestamped; import rx.util.functions.Action0; +import rx.util.functions.Action1; +import rx.util.functions.Func1; public class OperatorSubscribeOnTest { - + private class ThreadSubscription implements Subscription { private volatile Thread thread; - + private final CountDownLatch latch = new CountDownLatch(1); - + private final Subscription s = Subscriptions.create(new Action0() { - + @Override public void call() { thread = Thread.currentThread(); latch.countDown(); } - + }); - + @Override public void unsubscribe() { s.unsubscribe(); } - + @Override public boolean isUnsubscribed() { return s.isUnsubscribed(); } - + public Thread getThread() throws InterruptedException { latch.await(); return thread; } } - + @Test public void testSubscribeOnAndVerifySubscribeAndUnsubscribeThreads() throws InterruptedException { final ThreadSubscription subscription = new ThreadSubscription(); final AtomicReference subscribeThread = new AtomicReference(); Observable w = Observable.create(new OnSubscribe() { - + @Override public void call(Subscriber t1) { subscribeThread.set(Thread.currentThread()); @@ -81,33 +96,33 @@ public void call(Subscriber t1) { t1.onCompleted(); } }); - + TestObserver observer = new TestObserver(); w.subscribeOn(Schedulers.newThread()).subscribe(observer); - + Thread unsubscribeThread = subscription.getThread(); - + assertNotNull(unsubscribeThread); assertNotSame(Thread.currentThread(), unsubscribeThread); - + assertNotNull(subscribeThread.get()); assertNotSame(Thread.currentThread(), subscribeThread.get()); // True for Schedulers.newThread() assertTrue(unsubscribeThread == subscribeThread.get()); - + observer.assertReceivedOnNext(Arrays.asList(1, 2)); observer.assertTerminalEvent(); } - + @Test public void testIssue813() throws InterruptedException { // https://github.com/Netflix/RxJava/issues/813 final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch doneLatch = new CountDownLatch(1); - + TestObserver observer = new TestObserver(); final ThreadSubscription s = new ThreadSubscription(); - + final Subscription subscription = Observable .create(new Observable.OnSubscribe() { @Override @@ -132,7 +147,7 @@ public void call( } } }).subscribeOn(Schedulers.computation()).subscribe(observer); - + subscription.unsubscribe(); // As unsubscribe is called in other thread, we need to wait for it. s.getThread(); @@ -141,4 +156,218 @@ public void call( assertEquals(0, observer.getOnErrorEvents().size()); assertEquals(1, observer.getOnCompletedEvents().size()); } + + public static class SlowScheduler extends Scheduler { + final Scheduler actual; + final long delay; + final TimeUnit unit; + public SlowScheduler() { + this(Schedulers.computation(), 2, TimeUnit.SECONDS); + } + public SlowScheduler(Scheduler actual, long delay, TimeUnit unit) { + this.actual = actual; + this.delay = delay; + this.unit = unit; + } + + @Override + public Subscription schedule(final Action1 action) { + return actual.schedule(action, delay, unit); + } + + @Override + public Subscription schedule(final Action1 action, final long delayTime, final TimeUnit delayUnit) { + TimeUnit common = delayUnit.compareTo(unit) < 0 ? delayUnit : unit; + long t = common.convert(delayTime, delayUnit) + common.convert(delay, unit); + return actual.schedule(action, t, common); + } + } + + @Test + public void testSubscribeOnPublishSubjectWithSlowScheduler() { + PublishSubject ps = PublishSubject.create(); + TestSubscriber ts = new TestSubscriber(); + ps.subscribeOn(new SlowScheduler()).subscribe(ts); + ps.onNext(1); + ps.onNext(2); + ps.onCompleted(); + + ts.awaitTerminalEvent(); + ts.assertReceivedOnNext(Arrays.asList(1, 2)); + } + + @Test + public void testGroupsWithNestedSubscribeOn() throws InterruptedException { + final ArrayList results = new ArrayList(); + Observable.create(new OnSubscribe() { + + @Override + public void call(Subscriber sub) { + sub.onNext(1); + sub.onNext(2); + sub.onNext(1); + sub.onNext(2); + sub.onCompleted(); + } + + }).groupBy(new Func1() { + + @Override + public Integer call(Integer t) { + return t; + } + + }).flatMap(new Func1, Observable>() { + + @Override + public Observable call(final GroupedObservable group) { + return group.subscribeOn(Schedulers.newThread()).map(new Func1() { + + @Override + public String call(Integer t1) { + System.out.println("Received: " + t1 + " on group : " + group.getKey()); + return "first groups: " + t1; + } + + }); + } + + }).toBlockingObservable().forEach(new Action1() { + + @Override + public void call(String s) { + results.add(s); + } + + }); + + System.out.println("Results: " + results); + assertEquals(4, results.size()); + } + + @Test + public void testFirstGroupsCompleteAndParentSlowToThenEmitFinalGroupsWhichThenSubscribesOnAndDelaysAndThenCompletes() throws InterruptedException { + final CountDownLatch first = new CountDownLatch(2); // there are two groups to first complete + final ArrayList results = new ArrayList(); + Observable.create(new OnSubscribe() { + + @Override + public void call(Subscriber sub) { + sub.onNext(1); + sub.onNext(2); + sub.onNext(1); + sub.onNext(2); + try { + first.await(); + } catch (InterruptedException e) { + sub.onError(e); + return; + } + sub.onNext(3); + sub.onNext(3); + sub.onCompleted(); + } + + }).groupBy(new Func1() { + + @Override + public Integer call(Integer t) { + return t; + } + + }).flatMap(new Func1, Observable>() { + + @Override + public Observable call(final GroupedObservable group) { + if (group.getKey() < 3) { + return group.map(new Func1() { + + @Override + public String call(Integer t1) { + return "first groups: " + t1; + } + + }) + // must take(2) so an onCompleted + unsubscribe happens on these first 2 groups + .take(2).doOnCompleted(new Action0() { + + @Override + public void call() { + first.countDown(); + } + + }); + } else { + return group.subscribeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Func1() { + + @Override + public String call(Integer t1) { + return "last group: " + t1; + } + + }); + } + } + + }).toBlockingObservable().forEach(new Action1() { + + @Override + public void call(String s) { + results.add(s); + } + + }); + + System.out.println("Results: " + results); + assertEquals(6, results.size()); + } + void testBoundedBufferingWithSize(int size) throws Exception { + Observable timer = Observable.timer(100, 100, TimeUnit.MILLISECONDS); + + final List deltas = Collections.synchronizedList(new ArrayList()); + + Subscription s = timer.timestamp().subscribeOn( + new SlowScheduler(Schedulers.computation(), 1, TimeUnit.SECONDS), size).map(new Func1, Long>() { + @Override + public Long call(Timestamped t1) { + long v = System.currentTimeMillis() - t1.getTimestampMillis(); + return v; + } + }).doOnNext(new Action1() { + @Override + public void call(Long t1) { + deltas.add(t1); + } + }).subscribe(); + + Thread.sleep(2050); + + s.unsubscribe(); + + if (deltas.size() < size + 1) { + fail("To few items in deltas: " + deltas); + } + for (int i = 0; i < size + 1; i++) { + if (deltas.get(i) < 500) { + fail(i + "th item arrived too early: " + deltas); + } + } + for (int i = size + 1; i < deltas.size(); i++) { + if (deltas.get(i) >= 500) { + fail(i + "th item arrived too late: " + deltas); + } + } + } + @Test(timeout = 5000) + public void testBoundedBufferingOfZero() throws Exception { + testBoundedBufferingWithSize(0); + } + @Test(timeout = 5000) + public void testBoundedBufferingOfOne() throws Exception { + testBoundedBufferingWithSize(1); + } + @Test(timeout = 5000) + public void testBoundedBufferingOfTwo() throws Exception { + testBoundedBufferingWithSize(2); + } }