Skip to content

Commit bc202f4

Browse files
committed
Operator When
1 parent 95e0636 commit bc202f4

File tree

3 files changed

+25
-25
lines changed

3 files changed

+25
-25
lines changed

rxjava-contrib/rxjava-joins/src/main/java/rx/joins/operators/OperationJoinPatterns.java renamed to rxjava-contrib/rxjava-joins/src/main/java/rx/joins/operators/OperatorJoinPatterns.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,22 @@
2222
import java.util.Map;
2323

2424
import rx.Observable;
25-
import rx.Observable.OnSubscribeFunc;
25+
import rx.Observable.OnSubscribe;
2626
import rx.Observer;
27-
import rx.Subscription;
27+
import rx.Subscriber;
2828
import rx.functions.Action1;
2929
import rx.functions.Func1;
3030
import rx.joins.ActivePlan0;
3131
import rx.joins.JoinObserver;
3232
import rx.joins.Pattern1;
3333
import rx.joins.Pattern2;
3434
import rx.joins.Plan0;
35-
import rx.observers.Subscribers;
3635
import rx.subscriptions.CompositeSubscription;
3736

3837
/**
3938
* Join patterns: And, Then, When.
4039
*/
41-
public class OperationJoinPatterns {
40+
public class OperatorJoinPatterns {
4241
/**
4342
* Creates a pattern that matches when both observable sequences have an available element.
4443
*/
@@ -68,7 +67,7 @@ public static <T1, R> Plan0<R> then(/* this */Observable<T1> source, Func1<T1, R
6867
/**
6968
* Joins together the results from several patterns.
7069
*/
71-
public static <R> OnSubscribeFunc<R> when(Plan0<R>... plans) {
70+
public static <R> OnSubscribe<R> when(Plan0<R>... plans) {
7271
if (plans == null) {
7372
throw new NullPointerException("plans");
7473
}
@@ -78,13 +77,13 @@ public static <R> OnSubscribeFunc<R> when(Plan0<R>... plans) {
7877
/**
7978
* Joins together the results from several patterns.
8079
*/
81-
public static <R> OnSubscribeFunc<R> when(final Iterable<? extends Plan0<R>> plans) {
80+
public static <R> OnSubscribe<R> when(final Iterable<? extends Plan0<R>> plans) {
8281
if (plans == null) {
8382
throw new NullPointerException("plans");
8483
}
85-
return new OnSubscribeFunc<R>() {
84+
return new OnSubscribe<R>() {
8685
@Override
87-
public Subscription onSubscribe(final Observer<? super R> t1) {
86+
public void call(final Subscriber<? super R> t1) {
8887
final Map<Object, JoinObserver> externalSubscriptions = new HashMap<Object, JoinObserver>();
8988
final Object gate = new Object();
9089
final List<ActivePlan0> activePlans = new ArrayList<ActivePlan0>();
@@ -122,14 +121,15 @@ public void call(ActivePlan0 activePlan) {
122121
}));
123122
}
124123
} catch (Throwable t) {
125-
return Observable.<R> error(t).unsafeSubscribe(Subscribers.from(t1));
124+
Observable.<R> error(t).unsafeSubscribe(t1);
125+
return;
126126
}
127127
CompositeSubscription group = new CompositeSubscription();
128+
t1.add(group);
128129
for (JoinObserver jo : externalSubscriptions.values()) {
129130
jo.subscribe(gate);
130131
group.add(jo);
131132
}
132-
return group;
133133
}
134134
};
135135
}

rxjava-contrib/rxjava-joins/src/main/java/rx/observables/JoinObservable.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import rx.functions.Func1;
55
import rx.joins.Pattern2;
66
import rx.joins.Plan0;
7-
import rx.joins.operators.OperationJoinPatterns;
7+
import rx.joins.operators.OperatorJoinPatterns;
88

99
public class JoinObservable<T> {
1010

@@ -32,7 +32,7 @@ public static <T> JoinObservable<T> from(Observable<T> o) {
3232
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229153.aspx">MSDN: Observable.And</a>
3333
*/
3434
public final <T2> Pattern2<T, T2> and(Observable<T2> right) {
35-
return OperationJoinPatterns.and(o, right);
35+
return OperatorJoinPatterns.and(o, right);
3636
}
3737

3838
/**
@@ -52,7 +52,7 @@ public final static <R> JoinObservable<R> when(Iterable<? extends Plan0<R>> plan
5252
if (plans == null) {
5353
throw new NullPointerException("plans");
5454
}
55-
return from(Observable.create(OperationJoinPatterns.when(plans)));
55+
return from(Observable.create(OperatorJoinPatterns.when(plans)));
5656
}
5757

5858
/**
@@ -69,7 +69,7 @@ public final static <R> JoinObservable<R> when(Iterable<? extends Plan0<R>> plan
6969
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229889.aspx">MSDN: Observable.When</a>
7070
*/
7171
public final static <R> JoinObservable<R> when(Plan0<R>... plans) {
72-
return from(Observable.create(OperationJoinPatterns.when(plans)));
72+
return from(Observable.create(OperatorJoinPatterns.when(plans)));
7373
}
7474

7575
/**
@@ -85,7 +85,7 @@ public final static <R> JoinObservable<R> when(Plan0<R>... plans) {
8585
*/
8686
@SuppressWarnings("unchecked")
8787
public final static <R> JoinObservable<R> when(Plan0<R> p1) {
88-
return from(Observable.create(OperationJoinPatterns.when(p1)));
88+
return from(Observable.create(OperatorJoinPatterns.when(p1)));
8989
}
9090

9191
/**
@@ -103,7 +103,7 @@ public final static <R> JoinObservable<R> when(Plan0<R> p1) {
103103
*/
104104
@SuppressWarnings("unchecked")
105105
public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2) {
106-
return from(Observable.create(OperationJoinPatterns.when(p1, p2)));
106+
return from(Observable.create(OperatorJoinPatterns.when(p1, p2)));
107107
}
108108

109109
/**
@@ -123,7 +123,7 @@ public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2) {
123123
*/
124124
@SuppressWarnings("unchecked")
125125
public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3) {
126-
return from(Observable.create(OperationJoinPatterns.when(p1, p2, p3)));
126+
return from(Observable.create(OperatorJoinPatterns.when(p1, p2, p3)));
127127
}
128128

129129
/**
@@ -145,7 +145,7 @@ public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R
145145
*/
146146
@SuppressWarnings("unchecked")
147147
public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4) {
148-
return from(Observable.create(OperationJoinPatterns.when(p1, p2, p3, p4)));
148+
return from(Observable.create(OperatorJoinPatterns.when(p1, p2, p3, p4)));
149149
}
150150

151151
/**
@@ -169,7 +169,7 @@ public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R
169169
*/
170170
@SuppressWarnings("unchecked")
171171
public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4, Plan0<R> p5) {
172-
return from(Observable.create(OperationJoinPatterns.when(p1, p2, p3, p4, p5)));
172+
return from(Observable.create(OperatorJoinPatterns.when(p1, p2, p3, p4, p5)));
173173
}
174174

175175
/**
@@ -195,7 +195,7 @@ public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R
195195
*/
196196
@SuppressWarnings("unchecked")
197197
public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4, Plan0<R> p5, Plan0<R> p6) {
198-
return from(Observable.create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6)));
198+
return from(Observable.create(OperatorJoinPatterns.when(p1, p2, p3, p4, p5, p6)));
199199
}
200200

201201
/**
@@ -223,7 +223,7 @@ public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R
223223
*/
224224
@SuppressWarnings("unchecked")
225225
public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4, Plan0<R> p5, Plan0<R> p6, Plan0<R> p7) {
226-
return from(Observable.create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7)));
226+
return from(Observable.create(OperatorJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7)));
227227
}
228228

229229
/**
@@ -253,7 +253,7 @@ public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R
253253
*/
254254
@SuppressWarnings("unchecked")
255255
public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4, Plan0<R> p5, Plan0<R> p6, Plan0<R> p7, Plan0<R> p8) {
256-
return from(Observable.create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8)));
256+
return from(Observable.create(OperatorJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8)));
257257
}
258258

259259
/**
@@ -285,7 +285,7 @@ public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R
285285
*/
286286
@SuppressWarnings("unchecked")
287287
public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4, Plan0<R> p5, Plan0<R> p6, Plan0<R> p7, Plan0<R> p8, Plan0<R> p9) {
288-
return from(Observable.create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8, p9)));
288+
return from(Observable.create(OperatorJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8, p9)));
289289
}
290290

291291
/**
@@ -303,7 +303,7 @@ public final static <R> JoinObservable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R
303303
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211662.aspx">MSDN: Observable.Then</a>
304304
*/
305305
public final <R> Plan0<R> then(Func1<T, R> selector) {
306-
return OperationJoinPatterns.then(o, selector);
306+
return OperatorJoinPatterns.then(o, selector);
307307
}
308308

309309
public Observable<T> toObservable() {

rxjava-contrib/rxjava-joins/src/test/java/rx/joins/operators/OperationJoinsTest.java renamed to rxjava-contrib/rxjava-joins/src/test/java/rx/joins/operators/OperatorJoinsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
import rx.observers.TestSubscriber;
4141
import rx.subjects.PublishSubject;
4242

43-
public class OperationJoinsTest {
43+
public class OperatorJoinsTest {
4444
@Mock
4545
Observer<Integer> observer;
4646

0 commit comments

Comments
 (0)