diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index e5eccfe598..b40864ea6a 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -48,8 +48,6 @@ import rx.observers.SafeSubscriber; import rx.operators.OnSubscribeFromIterable; import rx.operators.OnSubscribeRange; -import rx.operators.OperatorAll; -import rx.operators.OperationAny; import rx.operators.OperationAsObservable; import rx.operators.OperationBuffer; import rx.operators.OperationCombineLatest; @@ -92,7 +90,9 @@ import rx.operators.OperationToObservableFuture; import rx.operators.OperationUsing; import rx.operators.OperationWindow; +import rx.operators.OperatorAll; import rx.operators.OperatorAmb; +import rx.operators.OperatorAny; import rx.operators.OperatorCache; import rx.operators.OperatorCast; import rx.operators.OperatorDoOnEach; @@ -3893,7 +3893,7 @@ public final Observable elementAtOrDefault(int index, T defaultValue) { * @see MSDN: Observable.Any (Note: the description in this page was wrong at the time of this writing) */ public final Observable exists(Func1 predicate) { - return create(OperationAny.exists(this, predicate)); + return lift(new OperatorAny(predicate, false)); } /** @@ -4126,7 +4126,7 @@ public final Observable ignoreElements() { * @see MSDN: Observable.Any */ public final Observable isEmpty() { - return create(OperationAny.isEmpty(this)); + return lift(new OperatorAny(Functions.alwaysTrue(), true)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationAny.java b/rxjava-core/src/main/java/rx/operators/OperationAny.java deleted file mode 100644 index b64bf419c7..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationAny.java +++ /dev/null @@ -1,127 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import static rx.functions.Functions.alwaysTrue; - -import java.util.concurrent.atomic.AtomicBoolean; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Subscriber; -import rx.Subscription; -import rx.functions.Func1; - -/** - * Returns an {@link Observable} that emits true if any element of - * an observable sequence satisfies a condition, otherwise false. - */ -public final class OperationAny { - - /** - * Returns an {@link Observable} that emits true if the source {@link Observable} is not empty, otherwise false. - * - * @param source - * The source {@link Observable} to check if not empty. - * @return A subscription function for creating the target Observable. - */ - public static OnSubscribeFunc any(Observable source) { - return new Any(source, alwaysTrue(), false); - } - - public static OnSubscribeFunc isEmpty(Observable source) { - return new Any(source, alwaysTrue(), true); - } - - /** - * Returns an {@link Observable} that emits true if any element - * of the source {@link Observable} satisfies the given condition, otherwise - * false. Note: always emit false if the source {@link Observable} is empty. - * - * @param source - * The source {@link Observable} to check if any element - * satisfies the given condition. - * @param predicate - * The condition to test every element. - * @return A subscription function for creating the target Observable. - */ - public static OnSubscribeFunc any(Observable source, Func1 predicate) { - return new Any(source, predicate, false); - } - - public static OnSubscribeFunc exists(Observable source, Func1 predicate) { - return any(source, predicate); - } - - private static class Any implements OnSubscribeFunc { - - private final Observable source; - private final Func1 predicate; - private final boolean returnOnEmpty; - - private Any(Observable source, Func1 predicate, boolean returnOnEmpty) { - this.source = source; - this.predicate = predicate; - this.returnOnEmpty = returnOnEmpty; - } - - @Override - public Subscription onSubscribe(final Observer observer) { - final SafeObservableSubscription subscription = new SafeObservableSubscription(); - return subscription.wrap(source.unsafeSubscribe(new Subscriber() { - - private final AtomicBoolean hasEmitted = new AtomicBoolean(false); - - @Override - public void onNext(T value) { - try { - if (hasEmitted.get() == false) { - if (predicate.call(value) == true - && hasEmitted.getAndSet(true) == false) { - observer.onNext(!returnOnEmpty); - observer.onCompleted(); - // this will work if the sequence is asynchronous, it - // will have no effect on a synchronous observable - subscription.unsubscribe(); - } - } - } catch (Throwable ex) { - observer.onError(ex); - // this will work if the sequence is asynchronous, it - // will have no effect on a synchronous observable - subscription.unsubscribe(); - } - - } - - @Override - public void onError(Throwable ex) { - observer.onError(ex); - } - - @Override - public void onCompleted() { - if (!hasEmitted.get()) { - observer.onNext(returnOnEmpty); - observer.onCompleted(); - } - } - })); - } - - } -} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorAny.java b/rxjava-core/src/main/java/rx/operators/OperatorAny.java new file mode 100644 index 0000000000..ff46549a2f --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorAny.java @@ -0,0 +1,74 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + + +import rx.Observable; +import rx.Observable.Operator; +import rx.Subscriber; +import rx.functions.Func1; + +/** + * Returns an {@link Observable} that emits true if any element of + * an observable sequence satisfies a condition, otherwise false. + */ +public final class OperatorAny implements Operator { + private final Func1 predicate; + private final boolean returnOnEmpty; + + public OperatorAny(Func1 predicate, boolean returnOnEmpty) { + this.predicate = predicate; + this.returnOnEmpty = returnOnEmpty; + } + + @Override + public Subscriber call(final Subscriber child) { + return new Subscriber(child) { + boolean hasElements; + boolean done; + @Override + public void onNext(T t) { + hasElements = true; + boolean result = predicate.call(t); + if (result && !done) { + done = true; + child.onNext(!returnOnEmpty); + child.onCompleted(); + unsubscribe(); + } + } + + @Override + public void onError(Throwable e) { + child.onError(e); + } + + @Override + public void onCompleted() { + if (!done) { + done = true; + if (hasElements) { + child.onNext(false); + } else { + child.onNext(returnOnEmpty); + } + child.onCompleted(); + } + } + + }; + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationAnyTest.java b/rxjava-core/src/test/java/rx/operators/OperatorAnyTest.java similarity index 87% rename from rxjava-core/src/test/java/rx/operators/OperationAnyTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorAnyTest.java index d2687a90f7..94c0f76824 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationAnyTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorAnyTest.java @@ -19,22 +19,20 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static rx.operators.OperationAny.any; -import static rx.operators.OperationAny.exists; -import static rx.operators.OperationAny.isEmpty; import org.junit.Test; import rx.Observable; import rx.Observer; import rx.functions.Func1; +import rx.functions.Functions; -public class OperationAnyTest { +public class OperatorAnyTest { @Test public void testAnyWithTwoItems() { Observable w = Observable.from(1, 2); - Observable observable = Observable.create(any(w)); + Observable observable = w.exists(Functions.alwaysTrue()); @SuppressWarnings("unchecked") Observer observer = mock(Observer.class); @@ -48,7 +46,7 @@ public void testAnyWithTwoItems() { @Test public void testIsEmptyWithTwoItems() { Observable w = Observable.from(1, 2); - Observable observable = Observable.create(isEmpty(w)); + Observable observable = w.isEmpty(); @SuppressWarnings("unchecked") Observer observer = mock(Observer.class); @@ -62,7 +60,7 @@ public void testIsEmptyWithTwoItems() { @Test public void testAnyWithOneItem() { Observable w = Observable.from(1); - Observable observable = Observable.create(any(w)); + Observable observable = w.exists(Functions.alwaysTrue()); @SuppressWarnings("unchecked") Observer observer = mock(Observer.class); @@ -76,7 +74,7 @@ public void testAnyWithOneItem() { @Test public void testIsEmptyWithOneItem() { Observable w = Observable.from(1); - Observable observable = Observable.create(isEmpty(w)); + Observable observable = w.isEmpty(); @SuppressWarnings("unchecked") Observer observer = mock(Observer.class); @@ -90,7 +88,7 @@ public void testIsEmptyWithOneItem() { @Test public void testAnyWithEmpty() { Observable w = Observable.empty(); - Observable observable = Observable.create(any(w)); + Observable observable = w.exists(Functions.alwaysTrue()); @SuppressWarnings("unchecked") Observer observer = mock(Observer.class); @@ -104,7 +102,7 @@ public void testAnyWithEmpty() { @Test public void testIsEmptyWithEmpty() { Observable w = Observable.empty(); - Observable observable = Observable.create(isEmpty(w)); + Observable observable = w.isEmpty(); @SuppressWarnings("unchecked") Observer observer = mock(Observer.class); @@ -118,14 +116,14 @@ public void testIsEmptyWithEmpty() { @Test public void testAnyWithPredicate1() { Observable w = Observable.from(1, 2, 3); - Observable observable = Observable.create(any(w, + Observable observable = w.exists( new Func1() { @Override public Boolean call(Integer t1) { return t1 < 2; } - })); + }); @SuppressWarnings("unchecked") Observer observer = mock(Observer.class); @@ -139,14 +137,14 @@ public Boolean call(Integer t1) { @Test public void testExists1() { Observable w = Observable.from(1, 2, 3); - Observable observable = Observable.create(exists(w, + Observable observable = w.exists( new Func1() { @Override public Boolean call(Integer t1) { return t1 < 2; } - })); + }); @SuppressWarnings("unchecked") Observer observer = mock(Observer.class); @@ -160,14 +158,14 @@ public Boolean call(Integer t1) { @Test public void testAnyWithPredicate2() { Observable w = Observable.from(1, 2, 3); - Observable observable = Observable.create(any(w, + Observable observable = w.exists( new Func1() { @Override public Boolean call(Integer t1) { return t1 < 1; } - })); + }); @SuppressWarnings("unchecked") Observer observer = mock(Observer.class); @@ -182,14 +180,14 @@ public Boolean call(Integer t1) { public void testAnyWithEmptyAndPredicate() { // If the source is empty, always output false. Observable w = Observable.empty(); - Observable observable = Observable.create(any(w, + Observable observable = w.exists( new Func1() { @Override public Boolean call(Integer t1) { return true; } - })); + }); @SuppressWarnings("unchecked") Observer observer = mock(Observer.class);