Skip to content

OperatorSequenceEqual #1120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 30, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import rx.operators.OperationParallelMerge;
import rx.operators.OperationReplay;
import rx.operators.OperationSample;
import rx.operators.OperationSequenceEqual;
import rx.operators.OperationSkip;
import rx.operators.OperationSkipUntil;
import rx.operators.OperationSwitch;
Expand Down Expand Up @@ -113,6 +112,7 @@
import rx.operators.OperatorRepeat;
import rx.operators.OperatorRetry;
import rx.operators.OperatorScan;
import rx.operators.OperatorSequenceEqual;
import rx.operators.OperatorSerialize;
import rx.operators.OperatorSingle;
import rx.operators.OperatorSkip;
Expand Down Expand Up @@ -2476,7 +2476,7 @@ public final Boolean call(T first, T second) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Conditional-and-Boolean-Operators#wiki-sequenceequal">RxJava Wiki: sequenceEqual()</a>
*/
public final static <T> Observable<Boolean> sequenceEqual(Observable<? extends T> first, Observable<? extends T> second, Func2<? super T, ? super T, Boolean> equality) {
return OperationSequenceEqual.sequenceEqual(first, second, equality);
return OperatorSequenceEqual.sequenceEqual(first, second, equality);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static rx.Observable.concat;
import static rx.Observable.from;
import static rx.Observable.zip;
import rx.Notification;
import rx.Observable;
import rx.functions.Func1;
import rx.functions.Func2;
Expand All @@ -28,44 +27,43 @@
* Returns an Observable that emits a Boolean value that indicate whether two
* sequences are equal by comparing the elements pairwise.
*/
public class OperationSequenceEqual {

public static <T> Observable<Boolean> sequenceEqual(
Observable<? extends T> first, Observable<? extends T> second,
final Func2<? super T, ? super T, Boolean> equality) {
Observable<Notification<T>> firstObservable = concat(
first.map(new Func1<T, Notification<T>>() {
public final class OperatorSequenceEqual {
private OperatorSequenceEqual() { throw new IllegalStateException("No instances!"); }
/** NotificationLite doesn't work as zip uses it. */
private static final Object LOCAL_ONCOMPLETED = new Object();
static <T> Observable<Object> materializeLite(Observable<T> source) {
return concat(
source.map(new Func1<T, Object>() {

@Override
public Notification<T> call(T t1) {
return Notification.createOnNext(t1);
public Object call(T t1) {
return t1;
}

}), from(Notification.<T>createOnCompleted()));

Observable<Notification<T>> secondObservable = concat(
second.map(new Func1<T, Notification<T>>() {

@Override
public Notification<T> call(T t1) {
return Notification.createOnNext(t1);
}

}), from(Notification.<T>createOnCompleted()));
}), from(LOCAL_ONCOMPLETED));
}
public static <T> Observable<Boolean> sequenceEqual(
Observable<? extends T> first, Observable<? extends T> second,
final Func2<? super T, ? super T, Boolean> equality) {
Observable<Object> firstObservable = materializeLite(first);
Observable<Object> secondObservable = materializeLite(second);

return zip(firstObservable, secondObservable,
new Func2<Notification<T>, Notification<T>, Boolean>() {
new Func2<Object, Object, Boolean>() {

@Override
public Boolean call(Notification<T> t1, Notification<T> t2) {
if (t1.isOnCompleted() && t2.isOnCompleted()) {
@SuppressWarnings("unchecked")
public Boolean call(Object t1, Object t2) {
boolean c1 = t1 == LOCAL_ONCOMPLETED;
boolean c2 = t2 == LOCAL_ONCOMPLETED;
if (c1 && c2) {
return true;
}
if (t1.isOnCompleted() || t2.isOnCompleted()) {
if (c1 || c2) {
return false;
}
// Now t1 and t2 must be 'onNext'.
return equality.call(t1.getValue(), t2.getValue());
return equality.call((T)t1, (T)t2);
}

}).all(Functions.<Boolean> identity());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import rx.Observer;
import rx.functions.Func2;

public class OperationSequenceEqualTests {
public class OperatorSequenceEqualTest {

@Test
public void test1() {
Expand Down