Skip to content

Commit 3da9135

Browse files
committed
OperatorAsync
1 parent 95e0636 commit 3da9135

File tree

12 files changed

+59
-142
lines changed

12 files changed

+59
-142
lines changed

rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/Async.java

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2013 Netflix, Inc.
2+
* Copyright 2014 Netflix, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
55
* use this file except in compliance with the License. You may obtain a copy of
@@ -54,10 +54,10 @@
5454
import rx.subjects.Subject;
5555
import rx.subscriptions.SerialSubscription;
5656
import rx.util.async.operators.Functionals;
57-
import rx.util.async.operators.OperationDeferFuture;
58-
import rx.util.async.operators.OperationForEachFuture;
59-
import rx.util.async.operators.OperationFromFunctionals;
60-
import rx.util.async.operators.OperationStartFuture;
57+
import rx.util.async.operators.OperatorDeferFuture;
58+
import rx.util.async.operators.OperatorForEachFuture;
59+
import rx.util.async.operators.OperatorFromFunctionals;
60+
import rx.util.async.operators.OperatorStartFuture;
6161

6262
/**
6363
* Utility methods to convert functions and actions into asynchronous operations
@@ -1377,7 +1377,7 @@ public static <R> FuncN<Observable<R>> asyncFunc(final FuncN<? extends R> func,
13771377
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#startfuture">RxJava Wiki: startFuture()</a>
13781378
*/
13791379
public static <T> Observable<T> startFuture(Func0<? extends Future<? extends T>> functionAsync) {
1380-
return OperationStartFuture.startFuture(functionAsync);
1380+
return OperatorStartFuture.startFuture(functionAsync);
13811381
}
13821382

13831383
/**
@@ -1395,7 +1395,7 @@ public static <T> Observable<T> startFuture(Func0<? extends Future<? extends T>>
13951395
*/
13961396
public static <T> Observable<T> startFuture(Func0<? extends Future<? extends T>> functionAsync,
13971397
Scheduler scheduler) {
1398-
return OperationStartFuture.startFuture(functionAsync, scheduler);
1398+
return OperatorStartFuture.startFuture(functionAsync, scheduler);
13991399
}
14001400

14011401
/**
@@ -1416,7 +1416,7 @@ public static <T> Observable<T> startFuture(Func0<? extends Future<? extends T>>
14161416
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#deferfuture">RxJava Wiki: deferFuture()</a>
14171417
*/
14181418
public static <T> Observable<T> deferFuture(Func0<? extends Future<? extends Observable<? extends T>>> observableFactoryAsync) {
1419-
return OperationDeferFuture.deferFuture(observableFactoryAsync);
1419+
return OperatorDeferFuture.deferFuture(observableFactoryAsync);
14201420
}
14211421

14221422
/**
@@ -1437,7 +1437,7 @@ public static <T> Observable<T> deferFuture(Func0<? extends Future<? extends Obs
14371437
public static <T> Observable<T> deferFuture(
14381438
Func0<? extends Future<? extends Observable<? extends T>>> observableFactoryAsync,
14391439
Scheduler scheduler) {
1440-
return OperationDeferFuture.deferFuture(observableFactoryAsync, scheduler);
1440+
return OperatorDeferFuture.deferFuture(observableFactoryAsync, scheduler);
14411441
}
14421442

14431443
/**
@@ -1453,13 +1453,13 @@ public static <T> Observable<T> deferFuture(
14531453
* @param source the source Observable
14541454
* @param onNext the action to call with each emitted element
14551455
* @return the Future representing the entire for-each operation
1456-
* @see #forEachFuture(rx.functions.Action1, rx.Scheduler)
1456+
* @see #forEachFuture(rx.Observable, rx.functions.Action1, rx.Scheduler)
14571457
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#foreachfuture">RxJava Wiki: forEachFuture()</a>
14581458
*/
14591459
public static <T> FutureTask<Void> forEachFuture(
14601460
Observable<? extends T> source,
14611461
Action1<? super T> onNext) {
1462-
return OperationForEachFuture.forEachFuture(source, onNext);
1462+
return OperatorForEachFuture.forEachFuture(source, onNext);
14631463
}
14641464

14651465

@@ -1477,14 +1477,14 @@ public static <T> FutureTask<Void> forEachFuture(
14771477
* @param onNext the action to call with each emitted element
14781478
* @param onError the action to call when an exception is emitted
14791479
* @return the Future representing the entire for-each operation
1480-
* @see #forEachFuture(rx.functions.Action1, rx.functions.Action1, rx.Scheduler)
1480+
* @see #forEachFuture(rx.Observable, rx.functions.Action1, rx.functions.Action1, rx.Scheduler)
14811481
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#foreachfuture">RxJava Wiki: forEachFuture()</a>
14821482
*/
14831483
public static <T> FutureTask<Void> forEachFuture(
14841484
Observable<? extends T> source,
14851485
Action1<? super T> onNext,
14861486
Action1<? super Throwable> onError) {
1487-
return OperationForEachFuture.forEachFuture(source, onNext, onError);
1487+
return OperatorForEachFuture.forEachFuture(source, onNext, onError);
14881488
}
14891489

14901490

@@ -1503,15 +1503,15 @@ public static <T> FutureTask<Void> forEachFuture(
15031503
* @param onError the action to call when an exception is emitted
15041504
* @param onCompleted the action to call when the source completes
15051505
* @return the Future representing the entire for-each operation
1506-
* @see #forEachFuture(rx.functions.Action1, rx.functions.Action1, rx.functions.Action0, rx.Scheduler)
1506+
* @see #forEachFuture(rx.Observable, rx.functions.Action1, rx.functions.Action1, rx.functions.Action0, rx.Scheduler)
15071507
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#foreachfuture">RxJava Wiki: forEachFuture()</a>
15081508
*/
15091509
public static <T> FutureTask<Void> forEachFuture(
15101510
Observable<? extends T> source,
15111511
Action1<? super T> onNext,
15121512
Action1<? super Throwable> onError,
15131513
Action0 onCompleted) {
1514-
return OperationForEachFuture.forEachFuture(source, onNext, onError, onCompleted);
1514+
return OperatorForEachFuture.forEachFuture(source, onNext, onError, onCompleted);
15151515
}
15161516

15171517

@@ -1534,7 +1534,7 @@ public static <T> FutureTask<Void> forEachFuture(
15341534
Observable<? extends T> source,
15351535
Action1<? super T> onNext,
15361536
Scheduler scheduler) {
1537-
FutureTask<Void> task = OperationForEachFuture.forEachFuture(source, onNext);
1537+
FutureTask<Void> task = OperatorForEachFuture.forEachFuture(source, onNext);
15381538
final Worker inner = scheduler.createWorker();
15391539
inner.schedule(Functionals.fromRunnable(task, inner));
15401540
return task;
@@ -1562,7 +1562,7 @@ public static <T> FutureTask<Void> forEachFuture(
15621562
Action1<? super T> onNext,
15631563
Action1<? super Throwable> onError,
15641564
Scheduler scheduler) {
1565-
FutureTask<Void> task = OperationForEachFuture.forEachFuture(source, onNext, onError);
1565+
FutureTask<Void> task = OperatorForEachFuture.forEachFuture(source, onNext, onError);
15661566
final Worker inner = scheduler.createWorker();
15671567
inner.schedule(Functionals.fromRunnable(task, inner));
15681568
return task;
@@ -1592,7 +1592,7 @@ public static <T> FutureTask<Void> forEachFuture(
15921592
Action1<? super Throwable> onError,
15931593
Action0 onCompleted,
15941594
Scheduler scheduler) {
1595-
FutureTask<Void> task = OperationForEachFuture.forEachFuture(source, onNext, onError, onCompleted);
1595+
FutureTask<Void> task = OperatorForEachFuture.forEachFuture(source, onNext, onError, onCompleted);
15961596
final Worker inner = scheduler.createWorker();
15971597
inner.schedule(Functionals.fromRunnable(task, inner));
15981598
return task;
@@ -1696,7 +1696,7 @@ public static <R> Observable<R> fromRunnable(final Runnable run, final R result)
16961696
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#fromaction">RxJava Wiki: fromAction()</a>
16971697
*/
16981698
public static <R> Observable<R> fromAction(Action0 action, R result, Scheduler scheduler) {
1699-
return Observable.create(OperationFromFunctionals.fromAction(action, result)).subscribeOn(scheduler);
1699+
return Observable.create(OperatorFromFunctionals.fromAction(action, result)).subscribeOn(scheduler);
17001700
}
17011701

17021702
/**
@@ -1740,7 +1740,7 @@ public static <R> Observable<R> fromFunc0(Func0<? extends R> function, Scheduler
17401740
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#fromcallable">RxJava Wiki: fromCallable()</a>
17411741
*/
17421742
public static <R> Observable<R> fromCallable(Callable<? extends R> callable, Scheduler scheduler) {
1743-
return Observable.create(OperationFromFunctionals.fromCallable(callable)).subscribeOn(scheduler);
1743+
return Observable.create(OperatorFromFunctionals.fromCallable(callable)).subscribeOn(scheduler);
17441744
}
17451745

17461746
/**
@@ -1759,7 +1759,7 @@ public static <R> Observable<R> fromCallable(Callable<? extends R> callable, Sch
17591759
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#fromrunnable">RxJava Wiki: fromRunnable()</a>
17601760
*/
17611761
public static <R> Observable<R> fromRunnable(final Runnable run, final R result, Scheduler scheduler) {
1762-
return Observable.create(OperationFromFunctionals.fromRunnable(run, result)).subscribeOn(scheduler);
1762+
return Observable.create(OperatorFromFunctionals.fromRunnable(run, result)).subscribeOn(scheduler);
17631763
}
17641764
/**
17651765
* Runs the provided action on the given scheduler and allows propagation

rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/Functionals.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2013 Netflix, Inc.
2+
* Copyright 2014 Netflix, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/LatchedObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2013 Netflix, Inc.
2+
* Copyright 2014 Netflix, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/OperationDeferFuture.java renamed to rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/OperatorDeferFuture.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2013 Netflix, Inc.
2+
* Copyright 2014 Netflix, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,9 +24,9 @@
2424
/**
2525
* Defer the execution of a factory method which produces an observable sequence.
2626
*/
27-
public final class OperationDeferFuture {
27+
public final class OperatorDeferFuture {
2828
/** Utility class. */
29-
private OperationDeferFuture() { throw new IllegalStateException("No instances!"); }
29+
private OperatorDeferFuture() { throw new IllegalStateException("No instances!"); }
3030

3131
/**
3232
* Returns an observable sequence that starts the specified asynchronous
@@ -49,7 +49,7 @@ public DeferFutureFunc0(Func0<? extends Future<? extends Observable<? extends T>
4949

5050
@Override
5151
public Observable<T> call() {
52-
return Observable.merge(OperationStartFuture.startFuture(observableFactoryAsync));
52+
return Observable.merge(OperatorStartFuture.startFuture(observableFactoryAsync));
5353
}
5454

5555
}
@@ -81,7 +81,7 @@ public DeferFutureFunc0Scheduled(Func0<? extends Future<? extends Observable<? e
8181

8282
@Override
8383
public Observable<T> call() {
84-
return Observable.merge(OperationStartFuture.startFuture(observableFactoryAsync, scheduler));
84+
return Observable.merge(OperatorStartFuture.startFuture(observableFactoryAsync, scheduler));
8585
}
8686

8787
}

rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/OperationForEachFuture.java renamed to rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/OperatorForEachFuture.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2013 Netflix, Inc.
2+
* Copyright 2014 Netflix, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,9 +29,9 @@
2929
* <p>
3030
* Remark: the cancellation token version's behavior is in doubt, so left out.
3131
*/
32-
public final class OperationForEachFuture {
32+
public final class OperatorForEachFuture {
3333
/** Utility class. */
34-
private OperationForEachFuture() { throw new IllegalStateException("No instances!"); }
34+
private OperatorForEachFuture() { throw new IllegalStateException("No instances!"); }
3535

3636
/**
3737
* Subscribes to the given source and calls the callback for each emitted item,
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2013 Netflix, Inc.
2+
* Copyright 2014 Netflix, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,48 +17,35 @@
1717

1818
import java.util.concurrent.Callable;
1919

20-
import rx.Observable.OnSubscribeFunc;
21-
import rx.Observer;
22-
import rx.Subscription;
20+
import rx.Observable.OnSubscribe;
21+
import rx.Subscriber;
2322
import rx.functions.Action0;
2423
import rx.functions.Actions;
2524
import rx.functions.Func0;
26-
import rx.subscriptions.Subscriptions;
2725

2826
/**
2927
* Operators that invoke a function or action if
3028
* an observer subscribes.
3129
* Asynchrony can be achieved by using subscribeOn or observeOn.
3230
*/
33-
public final class OperationFromFunctionals {
31+
public final class OperatorFromFunctionals {
3432
/** Utility class. */
35-
private OperationFromFunctionals() { throw new IllegalStateException("No instances!"); }
33+
private OperatorFromFunctionals() { throw new IllegalStateException("No instances!"); }
3634

3735
/** Subscriber function that invokes an action and returns the given result. */
38-
public static <R> OnSubscribeFunc<R> fromAction(Action0 action, R result) {
36+
public static <R> OnSubscribe<R> fromAction(Action0 action, R result) {
3937
return new InvokeAsync<R>(Actions.toFunc(action, result));
4038
}
4139

42-
/**
43-
* Subscriber function that invokes a function and returns its value.
44-
*
45-
* @deprecated Unnecessary now that Func0 extends Callable. Just call
46-
* {@link #fromCallable(Callable)} instead.
47-
*/
48-
@Deprecated
49-
public static <R> OnSubscribeFunc<R> fromFunc0(Func0<? extends R> function) {
50-
return fromCallable(function);
51-
}
52-
5340
/**
5441
* Subscriber function that invokes the callable and returns its value or
5542
* propagates its checked exception.
5643
*/
57-
public static <R> OnSubscribeFunc<R> fromCallable(Callable<? extends R> callable) {
44+
public static <R> OnSubscribe<R> fromCallable(Callable<? extends R> callable) {
5845
return new InvokeAsync<R>(callable);
5946
}
6047
/** Subscriber function that invokes a runnable and returns the given result. */
61-
public static <R> OnSubscribeFunc<R> fromRunnable(final Runnable run, final R result) {
48+
public static <R> OnSubscribe<R> fromRunnable(final Runnable run, final R result) {
6249
return new InvokeAsync<R>(new Func0<R>() {
6350
@Override
6451
public R call() {
@@ -72,7 +59,7 @@ public R call() {
7259
* Invokes a java.util.concurrent.Callable when an observer subscribes.
7360
* @param <R> the return type
7461
*/
75-
static final class InvokeAsync<R> implements OnSubscribeFunc<R> {
62+
static final class InvokeAsync<R> implements OnSubscribe<R> {
7663
final Callable<? extends R> callable;
7764
public InvokeAsync(Callable<? extends R> callable) {
7865
if (callable == null) {
@@ -81,16 +68,14 @@ public InvokeAsync(Callable<? extends R> callable) {
8168
this.callable = callable;
8269
}
8370
@Override
84-
public Subscription onSubscribe(Observer<? super R> t1) {
85-
Subscription s = Subscriptions.empty();
71+
public void call(Subscriber<? super R> t1) {
8672
try {
8773
t1.onNext(callable.call());
8874
} catch (Throwable t) {
8975
t1.onError(t);
90-
return s;
76+
return;
9177
}
9278
t1.onCompleted();
93-
return s;
9479
}
9580
}
9681
}

rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/OperationStartFuture.java renamed to rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/OperatorStartFuture.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,6 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
/**
17-
* Copyright 2013 Netflix, Inc.
18-
*
19-
* Licensed under the Apache License, Version 2.0 (the "License");
20-
* you may not use this file except in compliance with the License.
21-
* You may obtain a copy of the License at
22-
*
23-
* http://www.apache.org/licenses/LICENSE-2.0
24-
*
25-
* Unless required by applicable law or agreed to in writing, software
26-
* distributed under the License is distributed on an "AS IS" BASIS,
27-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
28-
* See the License for the specific language governing permissions and
29-
* limitations under the License.
30-
*/
3116
package rx.util.async.operators;
3217

3318
import java.util.concurrent.Future;
@@ -40,9 +25,9 @@
4025
* Start an asynchronous Future immediately and observe its result through
4126
* an observable.
4227
*/
43-
public final class OperationStartFuture {
28+
public final class OperatorStartFuture {
4429
/** Utility class. */
45-
private OperationStartFuture() { throw new IllegalStateException("No instances!"); }
30+
private OperatorStartFuture() { throw new IllegalStateException("No instances!"); }
4631
/**
4732
* Invokes the asynchronous function, surfacing the result through an observable sequence.
4833
* <p>

rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/AsyncTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2013 Netflix, Inc.
2+
* Copyright 2014 Netflix, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

0 commit comments

Comments
 (0)