From 1383d3d76369353695e7869cdad5780648171e63 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 29 Apr 2014 15:56:39 +0200 Subject: [PATCH] Operator TakeTimed --- rxjava-core/src/main/java/rx/Observable.java | 4 +- .../java/rx/operators/OperationTakeTimed.java | 289 ------------------ .../java/rx/operators/OperatorTakeTimed.java | 84 +++++ ...edTest.java => OperatorTakeTimedTest.java} | 2 +- 4 files changed, 87 insertions(+), 292 deletions(-) delete mode 100644 rxjava-core/src/main/java/rx/operators/OperationTakeTimed.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorTakeTimed.java rename rxjava-core/src/test/java/rx/operators/{OperationTakeTimedTest.java => OperatorTakeTimedTest.java} (99%) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 746362ca78..868199ebe2 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -68,7 +68,6 @@ import rx.operators.OperationSkipUntil; import rx.operators.OperationSwitch; import rx.operators.OperationTakeLast; -import rx.operators.OperationTakeTimed; import rx.operators.OperationTakeUntil; import rx.operators.OperationTakeWhile; import rx.operators.OperationThrottleFirst; @@ -121,6 +120,7 @@ import rx.operators.OperatorSkipWhile; import rx.operators.OperatorSubscribeOn; import rx.operators.OperatorTake; +import rx.operators.OperatorTakeTimed; import rx.operators.OperatorTimeout; import rx.operators.OperatorTimeoutWithSelector; import rx.operators.OperatorTimestamp; @@ -6447,7 +6447,7 @@ public final Observable take(long time, TimeUnit unit) { * @see RxJava Wiki: take() */ public final Observable take(long time, TimeUnit unit, Scheduler scheduler) { - return create(new OperationTakeTimed.TakeTimed(this, time, unit, scheduler)); + return lift(new OperatorTakeTimed(time, unit, scheduler)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeTimed.java b/rxjava-core/src/main/java/rx/operators/OperationTakeTimed.java deleted file mode 100644 index dd51c9e9e7..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeTimed.java +++ /dev/null @@ -1,289 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Scheduler; -import rx.Scheduler.Worker; -import rx.Subscriber; -import rx.Subscription; -import rx.functions.Action0; -import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.Subscriptions; - -/** - * Returns an Observable that emits the first num items emitted by the source - * Observable. - *

- * - *

- * You can choose to pay attention only to the first num items emitted by an - * Observable by using the take operation. This operation returns an Observable that will invoke a - * subscribing Observer's onNext function a maximum of num times before - * invoking onCompleted. - */ -public final class OperationTakeTimed { - - //TODO this has not been migrated to use bind yet - - /** - * Returns a specified number of contiguous values from the start of an observable sequence. - * - * @param items - * @param num - * @return the specified number of contiguous values from the start of the given observable sequence - */ - public static OnSubscribeFunc take(final Observable items, final int num) { - // wrap in a Func so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take rather than 1 handing both, which is not thread-safe. - return new OnSubscribeFunc() { - - @Override - public Subscription onSubscribe(Observer observer) { - return new Take(items, num).onSubscribe(observer); - } - - }; - } - - /** - * This class is NOT thread-safe if invoked and referenced multiple times. In other words, don't subscribe to it multiple times from different threads. - *

- * It IS thread-safe from within it while receiving onNext events from multiple threads. - *

- * This should all be fine as long as it's kept as a private class and a new instance created from static factory method above. - *

- * Note how the take() factory method above protects us from a single instance being exposed with the Observable wrapper handling the subscribe flow. - * - * @param - */ - private static class Take implements OnSubscribeFunc { - private final Observable items; - private final int num; - private final SafeObservableSubscription subscription = new SafeObservableSubscription(); - - private Take(Observable items, int num) { - this.items = items; - this.num = num; - } - - @Override - public Subscription onSubscribe(Observer observer) { - if (num < 1) { - items.unsafeSubscribe(new Subscriber() - { - @Override - public void onCompleted() - { - } - - @Override - public void onError(Throwable e) - { - } - - @Override - public void onNext(T args) - { - } - }).unsubscribe(); - observer.onCompleted(); - return Subscriptions.empty(); - } - - return subscription.wrap(items.unsafeSubscribe(new ItemObserver(observer))); - } - - private class ItemObserver extends Subscriber { - private final Observer observer; - - private final AtomicInteger counter = new AtomicInteger(); - private volatile boolean hasEmitedError = false; - - public ItemObserver(Observer observer) { - this.observer = observer; - } - - @Override - public void onCompleted() { - if (hasEmitedError) { - return; - } - if (counter.getAndSet(num) < num) { - observer.onCompleted(); - } - } - - @Override - public void onError(Throwable e) { - if (hasEmitedError) { - return; - } - if (counter.getAndSet(num) < num) { - observer.onError(e); - } - } - - @Override - public void onNext(T args) { - if (hasEmitedError) { - return; - } - final int count = counter.incrementAndGet(); - if (count <= num) { - try { - observer.onNext(args); - } catch (Throwable ex) { - hasEmitedError = true; - observer.onError(ex); - subscription.unsubscribe(); - return; - } - if (count == num) { - observer.onCompleted(); - } - } - if (count >= num) { - // this will work if the sequence is asynchronous, it will have no effect on a synchronous observable - subscription.unsubscribe(); - } - } - - } - - } - - /** - * Takes values from the source until a timer fires. - * - * @param - * the result value type - */ - public static final class TakeTimed implements OnSubscribeFunc { - final Observable source; - final long time; - final TimeUnit unit; - final Scheduler scheduler; - - public TakeTimed(Observable source, long time, TimeUnit unit, Scheduler scheduler) { - this.source = source; - this.time = time; - this.unit = unit; - this.scheduler = scheduler; - } - - @Override - public Subscription onSubscribe(Observer t1) { - Worker inner = scheduler.createWorker(); - CompositeSubscription csub = new CompositeSubscription(inner); - final SourceObserver so = new SourceObserver(t1, csub); - csub.add(so); - source.unsafeSubscribe(so); - if (!so.isUnsubscribed()) { - inner.schedule(so, time, unit); - } - - return csub; - } - - /** - * Observes the source and relays its values until gate turns into false. - * - * @param - * the observed value type - */ - private static final class SourceObserver extends Subscriber implements Action0 { - final Observer observer; - final Subscription cancel; - final AtomicInteger state = new AtomicInteger(); - static final int ACTIVE = 0; - static final int NEXT = 1; - static final int DONE = 2; - - public SourceObserver(Observer observer, - Subscription cancel) { - this.observer = observer; - this.cancel = cancel; - } - - @Override - public void onNext(T args) { - do { - int s = state.get(); - if (s == DONE) { - return; - } - if (state.compareAndSet(s, NEXT)) { - try { - observer.onNext(args); - } finally { - state.set(ACTIVE); - return; - } - } - } while (true); - } - - @Override - public void onError(Throwable e) { - do { - int s = state.get(); - if (s == DONE) { - return; - } else if (s == NEXT) { - continue; - } else if (state.compareAndSet(s, DONE)) { - try { - observer.onError(e); - } finally { - cancel.unsubscribe(); - } - return; - } - } while (true); - } - - @Override - public void onCompleted() { - do { - int s = state.get(); - if (s == DONE) { - return; - } else if (s == NEXT) { - continue; - } else if (state.compareAndSet(s, DONE)) { - try { - observer.onCompleted(); - } finally { - cancel.unsubscribe(); - } - return; - } - } while (true); - } - - @Override - public void call() { - onCompleted(); - } - - } - } -} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorTakeTimed.java b/rxjava-core/src/main/java/rx/operators/OperatorTakeTimed.java new file mode 100644 index 0000000000..33cc48404d --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorTakeTimed.java @@ -0,0 +1,84 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.operators; + +import java.util.concurrent.TimeUnit; +import rx.Observable.Operator; +import rx.Scheduler; +import rx.Scheduler.Worker; +import rx.Subscriber; +import rx.functions.Action0; +import rx.observers.SerializedSubscriber; + +/** + * Takes values from the source until the specific time ellapses. + * + * @param + * the result value type + */ +public final class OperatorTakeTimed implements Operator { + final long time; + final TimeUnit unit; + final Scheduler scheduler; + + public OperatorTakeTimed(long time, TimeUnit unit, Scheduler scheduler) { + this.time = time; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public Subscriber call(Subscriber child) { + Worker worker = scheduler.createWorker(); + child.add(worker); + + TakeSubscriber ts = new TakeSubscriber(new SerializedSubscriber(child)); + worker.schedule(ts, time, unit); + return ts; + } + /** Subscribed to source and scheduled on a worker. */ + static final class TakeSubscriber extends Subscriber implements Action0 { + final Subscriber child; + public TakeSubscriber(Subscriber child) { + super(child); + this.child = child; + } + + @Override + public void onNext(T t) { + child.onNext(t); + } + + @Override + public void onError(Throwable e) { + child.onError(e); + unsubscribe(); + } + + @Override + public void onCompleted() { + child.onCompleted(); + unsubscribe(); + } + + @Override + public void call() { + onCompleted(); + } + + + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationTakeTimedTest.java b/rxjava-core/src/test/java/rx/operators/OperatorTakeTimedTest.java similarity index 99% rename from rxjava-core/src/test/java/rx/operators/OperationTakeTimedTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorTakeTimedTest.java index 856aef6ea3..49877e1c7e 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationTakeTimedTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorTakeTimedTest.java @@ -32,7 +32,7 @@ import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; -public class OperationTakeTimedTest { +public class OperatorTakeTimedTest { @Test public void testTakeTimed() {