Skip to content

Commit 817fb3a

Browse files
committed
OperatorCombineLatest
1 parent 4e77f8a commit 817fb3a

File tree

5 files changed

+1054
-815
lines changed

5 files changed

+1054
-815
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import rx.operators.OnSubscribeFromIterable;
5050
import rx.operators.OnSubscribeRange;
5151
import rx.operators.OperationBuffer;
52-
import rx.operators.OperationCombineLatest;
5352
import rx.operators.OperationConcat;
5453
import rx.operators.OperationDebounce;
5554
import rx.operators.OperationDefaultIfEmpty;
@@ -95,6 +94,7 @@
9594
import rx.operators.OperatorAsObservable;
9695
import rx.operators.OperatorCache;
9796
import rx.operators.OperatorCast;
97+
import rx.operators.OperatorCombineLatest;
9898
import rx.operators.OperatorDoOnEach;
9999
import rx.operators.OperatorElementAt;
100100
import rx.operators.OperatorFilter;
@@ -513,8 +513,8 @@ public final static <T> Observable<T> amb(Observable<? extends T> o1, Observable
513513
* Observables by means of the given aggregation function
514514
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-combinelatest">RxJava Wiki: combineLatest()</a>
515515
*/
516-
public final static <T1, T2, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> combineFunction) {
517-
return create(OperationCombineLatest.combineLatest(o1, o2, combineFunction));
516+
public static final <T1, T2, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> combineFunction) {
517+
return combineLatest(Arrays.asList(o1, o2), Functions.fromFunc(combineFunction));
518518
}
519519

520520
/**
@@ -536,8 +536,8 @@ public final static <T1, T2, R> Observable<R> combineLatest(Observable<? extends
536536
* Observables by means of the given aggregation function
537537
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-combinelatest">RxJava Wiki: combineLatest()</a>
538538
*/
539-
public final static <T1, T2, T3, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Func3<? super T1, ? super T2, ? super T3, ? extends R> combineFunction) {
540-
return create(OperationCombineLatest.combineLatest(o1, o2, o3, combineFunction));
539+
public static final <T1, T2, T3, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Func3<? super T1, ? super T2, ? super T3, ? extends R> combineFunction) {
540+
return combineLatest(Arrays.asList(o1, o2, o3), Functions.fromFunc(combineFunction));
541541
}
542542

543543
/**
@@ -561,9 +561,9 @@ public final static <T1, T2, T3, R> Observable<R> combineLatest(Observable<? ext
561561
* Observables by means of the given aggregation function
562562
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-combinelatest">RxJava Wiki: combineLatest()</a>
563563
*/
564-
public final static <T1, T2, T3, T4, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4,
564+
public static final <T1, T2, T3, T4, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4,
565565
Func4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> combineFunction) {
566-
return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, combineFunction));
566+
return combineLatest(Arrays.asList(o1, o2, o3, o4), Functions.fromFunc(combineFunction));
567567
}
568568

569569
/**
@@ -589,9 +589,9 @@ public final static <T1, T2, T3, T4, R> Observable<R> combineLatest(Observable<?
589589
* Observables by means of the given aggregation function
590590
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-combinelatest">RxJava Wiki: combineLatest()</a>
591591
*/
592-
public final static <T1, T2, T3, T4, T5, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5,
592+
public static final <T1, T2, T3, T4, T5, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5,
593593
Func5<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? extends R> combineFunction) {
594-
return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, combineFunction));
594+
return combineLatest(Arrays.asList(o1, o2, o3, o4, o5), Functions.fromFunc(combineFunction));
595595
}
596596

597597
/**
@@ -619,9 +619,9 @@ public final static <T1, T2, T3, T4, T5, R> Observable<R> combineLatest(Observab
619619
* Observables by means of the given aggregation function
620620
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-combinelatest">RxJava Wiki: combineLatest()</a>
621621
*/
622-
public final static <T1, T2, T3, T4, T5, T6, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6,
622+
public static final <T1, T2, T3, T4, T5, T6, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6,
623623
Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> combineFunction) {
624-
return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, o6, combineFunction));
624+
return combineLatest(Arrays.asList(o1, o2, o3, o4, o5, o6), Functions.fromFunc(combineFunction));
625625
}
626626

627627
/**
@@ -651,9 +651,9 @@ public final static <T1, T2, T3, T4, T5, T6, R> Observable<R> combineLatest(Obse
651651
* Observables by means of the given aggregation function
652652
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-combinelatest">RxJava Wiki: combineLatest()</a>
653653
*/
654-
public final static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7,
654+
public static final <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7,
655655
Func7<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? extends R> combineFunction) {
656-
return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, o6, o7, combineFunction));
656+
return combineLatest(Arrays.asList(o1, o2, o3, o4, o5, o6, o7), Functions.fromFunc(combineFunction));
657657
}
658658

659659
/**
@@ -685,9 +685,9 @@ public final static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> combineLatest(
685685
* Observables by means of the given aggregation function
686686
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-combinelatest">RxJava Wiki: combineLatest()</a>
687687
*/
688-
public final static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
688+
public static final <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
689689
Func8<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? extends R> combineFunction) {
690-
return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, o6, o7, o8, combineFunction));
690+
return combineLatest(Arrays.asList(o1, o2, o3, o4, o5, o6, o7, o8), Functions.fromFunc(combineFunction));
691691
}
692692

693693
/**
@@ -721,10 +721,26 @@ public final static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> combineLat
721721
* Observables by means of the given aggregation function
722722
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-combinelatest">RxJava Wiki: combineLatest()</a>
723723
*/
724-
public final static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
724+
public static final <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
725725
Observable<? extends T9> o9,
726726
Func9<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9, ? extends R> combineFunction) {
727-
return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, o6, o7, o8, o9, combineFunction));
727+
return combineLatest(Arrays.asList(o1, o2, o3, o4, o5, o6, o7, o8, o9), Functions.fromFunc(combineFunction));
728+
}
729+
/**
730+
* Combines nine source Observables by emitting an item that aggregates the latest values of each of the
731+
* source Observables each time an item is received from any of the source Observables, where this
732+
* aggregation is defined by a specified function.
733+
* @param <T> the common base type of source values
734+
* @param <R> the result type
735+
* @param sources the list of observable sources
736+
* @param combineFunction
737+
* the aggregation function used to combine the items emitted by the source Observables
738+
* @return an Observable that emits items that are the result of combining the items emitted by the source
739+
* Observables by means of the given aggregation function
740+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-combinelatest">RxJava Wiki: combineLatest()</a>
741+
*/
742+
public static final <T, R> Observable<R> combineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction) {
743+
return create(new OperatorCombineLatest<T, R>(sources, combineFunction));
728744
}
729745

730746
/**

0 commit comments

Comments
 (0)