Skip to content

Operator async #1101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 30, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2013 Netflix, Inc.
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
Expand Down Expand Up @@ -54,10 +54,10 @@
import rx.subjects.Subject;
import rx.subscriptions.SerialSubscription;
import rx.util.async.operators.Functionals;
import rx.util.async.operators.OperationDeferFuture;
import rx.util.async.operators.OperationForEachFuture;
import rx.util.async.operators.OperationFromFunctionals;
import rx.util.async.operators.OperationStartFuture;
import rx.util.async.operators.OperatorDeferFuture;
import rx.util.async.operators.OperatorForEachFuture;
import rx.util.async.operators.OperatorFromFunctionals;
import rx.util.async.operators.OperatorStartFuture;

/**
* Utility methods to convert functions and actions into asynchronous operations
Expand Down Expand Up @@ -1377,7 +1377,7 @@ public static <R> FuncN<Observable<R>> asyncFunc(final FuncN<? extends R> func,
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#startfuture">RxJava Wiki: startFuture()</a>
*/
public static <T> Observable<T> startFuture(Func0<? extends Future<? extends T>> functionAsync) {
return OperationStartFuture.startFuture(functionAsync);
return OperatorStartFuture.startFuture(functionAsync);
}

/**
Expand All @@ -1395,7 +1395,7 @@ public static <T> Observable<T> startFuture(Func0<? extends Future<? extends T>>
*/
public static <T> Observable<T> startFuture(Func0<? extends Future<? extends T>> functionAsync,
Scheduler scheduler) {
return OperationStartFuture.startFuture(functionAsync, scheduler);
return OperatorStartFuture.startFuture(functionAsync, scheduler);
}

/**
Expand All @@ -1416,7 +1416,7 @@ public static <T> Observable<T> startFuture(Func0<? extends Future<? extends T>>
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#deferfuture">RxJava Wiki: deferFuture()</a>
*/
public static <T> Observable<T> deferFuture(Func0<? extends Future<? extends Observable<? extends T>>> observableFactoryAsync) {
return OperationDeferFuture.deferFuture(observableFactoryAsync);
return OperatorDeferFuture.deferFuture(observableFactoryAsync);
}

/**
Expand All @@ -1437,7 +1437,7 @@ public static <T> Observable<T> deferFuture(Func0<? extends Future<? extends Obs
public static <T> Observable<T> deferFuture(
Func0<? extends Future<? extends Observable<? extends T>>> observableFactoryAsync,
Scheduler scheduler) {
return OperationDeferFuture.deferFuture(observableFactoryAsync, scheduler);
return OperatorDeferFuture.deferFuture(observableFactoryAsync, scheduler);
}

/**
Expand All @@ -1453,13 +1453,13 @@ public static <T> Observable<T> deferFuture(
* @param source the source Observable
* @param onNext the action to call with each emitted element
* @return the Future representing the entire for-each operation
* @see #forEachFuture(rx.functions.Action1, rx.Scheduler)
* @see #forEachFuture(rx.Observable, rx.functions.Action1, rx.Scheduler)
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#foreachfuture">RxJava Wiki: forEachFuture()</a>
*/
public static <T> FutureTask<Void> forEachFuture(
Observable<? extends T> source,
Action1<? super T> onNext) {
return OperationForEachFuture.forEachFuture(source, onNext);
return OperatorForEachFuture.forEachFuture(source, onNext);
}


Expand All @@ -1477,14 +1477,14 @@ public static <T> FutureTask<Void> forEachFuture(
* @param onNext the action to call with each emitted element
* @param onError the action to call when an exception is emitted
* @return the Future representing the entire for-each operation
* @see #forEachFuture(rx.functions.Action1, rx.functions.Action1, rx.Scheduler)
* @see #forEachFuture(rx.Observable, rx.functions.Action1, rx.functions.Action1, rx.Scheduler)
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#foreachfuture">RxJava Wiki: forEachFuture()</a>
*/
public static <T> FutureTask<Void> forEachFuture(
Observable<? extends T> source,
Action1<? super T> onNext,
Action1<? super Throwable> onError) {
return OperationForEachFuture.forEachFuture(source, onNext, onError);
return OperatorForEachFuture.forEachFuture(source, onNext, onError);
}


Expand All @@ -1503,15 +1503,15 @@ public static <T> FutureTask<Void> forEachFuture(
* @param onError the action to call when an exception is emitted
* @param onCompleted the action to call when the source completes
* @return the Future representing the entire for-each operation
* @see #forEachFuture(rx.functions.Action1, rx.functions.Action1, rx.functions.Action0, rx.Scheduler)
* @see #forEachFuture(rx.Observable, rx.functions.Action1, rx.functions.Action1, rx.functions.Action0, rx.Scheduler)
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#foreachfuture">RxJava Wiki: forEachFuture()</a>
*/
public static <T> FutureTask<Void> forEachFuture(
Observable<? extends T> source,
Action1<? super T> onNext,
Action1<? super Throwable> onError,
Action0 onCompleted) {
return OperationForEachFuture.forEachFuture(source, onNext, onError, onCompleted);
return OperatorForEachFuture.forEachFuture(source, onNext, onError, onCompleted);
}


Expand All @@ -1534,7 +1534,7 @@ public static <T> FutureTask<Void> forEachFuture(
Observable<? extends T> source,
Action1<? super T> onNext,
Scheduler scheduler) {
FutureTask<Void> task = OperationForEachFuture.forEachFuture(source, onNext);
FutureTask<Void> task = OperatorForEachFuture.forEachFuture(source, onNext);
final Worker inner = scheduler.createWorker();
inner.schedule(Functionals.fromRunnable(task, inner));
return task;
Expand Down Expand Up @@ -1562,7 +1562,7 @@ public static <T> FutureTask<Void> forEachFuture(
Action1<? super T> onNext,
Action1<? super Throwable> onError,
Scheduler scheduler) {
FutureTask<Void> task = OperationForEachFuture.forEachFuture(source, onNext, onError);
FutureTask<Void> task = OperatorForEachFuture.forEachFuture(source, onNext, onError);
final Worker inner = scheduler.createWorker();
inner.schedule(Functionals.fromRunnable(task, inner));
return task;
Expand Down Expand Up @@ -1592,7 +1592,7 @@ public static <T> FutureTask<Void> forEachFuture(
Action1<? super Throwable> onError,
Action0 onCompleted,
Scheduler scheduler) {
FutureTask<Void> task = OperationForEachFuture.forEachFuture(source, onNext, onError, onCompleted);
FutureTask<Void> task = OperatorForEachFuture.forEachFuture(source, onNext, onError, onCompleted);
final Worker inner = scheduler.createWorker();
inner.schedule(Functionals.fromRunnable(task, inner));
return task;
Expand All @@ -1617,30 +1617,6 @@ public static <R> Observable<R> fromAction(Action0 action, R result) {
return fromAction(action, result, Schedulers.computation());
}

/**
* Return an Observable that calls the given function and emits its
* result when an Observer subscribes.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/fromFunc0.png">
* <p>
* The function is called on the default thread pool for computation.
*
* @param <R> the return type
* @param function the function to call on each subscription
* @return an Observable that calls the given function and emits its
* result when an Observer subscribes
* @see #start(rx.functions.Func0)
* @see #fromCallable(java.util.concurrent.Callable)
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#fromfunc0">RxJava Wiki: fromFunc0()</a>
*
* @deprecated Unnecessary now that Func0 extends Callable. Just call
* {@link #fromCallable(Callable)} instead.
*/
@Deprecated
public static <R> Observable<R> fromFunc0(Func0<? extends R> function) {
return fromCallable(function);
}

/**
* Return an Observable that calls the given Callable and emits its
* result or Exception when an Observer subscribes.
Expand All @@ -1654,7 +1630,6 @@ public static <R> Observable<R> fromFunc0(Func0<? extends R> function) {
* @return an Observable that calls the given Callable and emits its
* result or Exception when an Observer subscribes
* @see #start(rx.functions.Func0)
* @see #fromFunc0(rx.functions.Func0)
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#fromcallable">RxJava Wiki: fromCallable()</a>
*/
public static <R> Observable<R> fromCallable(Callable<? extends R> callable) {
Expand Down Expand Up @@ -1696,33 +1671,9 @@ public static <R> Observable<R> fromRunnable(final Runnable run, final R result)
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#fromaction">RxJava Wiki: fromAction()</a>
*/
public static <R> Observable<R> fromAction(Action0 action, R result, Scheduler scheduler) {
return Observable.create(OperationFromFunctionals.fromAction(action, result)).subscribeOn(scheduler);
return Observable.create(OperatorFromFunctionals.fromAction(action, result)).subscribeOn(scheduler);
}

/**
* Return an Observable that calls the given function and emits its
* result when an Observer subscribes.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/fromFunc0.s.png">
*
* @param <R> the return type
* @param function the function to call on each subscription
* @param scheduler the scheduler where the function is called and the
* result is emitted
* @return an Observable that calls the given function and emits its
* result when an Observer subscribes
* @see #start(rx.functions.Func0)
* @see #fromCallable(java.util.concurrent.Callable)
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#fromfunc0">RxJava Wiki: fromFunc0()</a>
*
* @deprecated Unnecessary now that Func0 extends Callable. Just call
* {@link #fromCallable(Callable, Scheduler)} instead.
*/
@Deprecated
public static <R> Observable<R> fromFunc0(Func0<? extends R> function, Scheduler scheduler) {
return fromCallable(function, scheduler);
}

/**
* Return an Observable that calls the given Callable and emits its
* result or Exception when an Observer subscribes.
Expand All @@ -1736,11 +1687,10 @@ public static <R> Observable<R> fromFunc0(Func0<? extends R> function, Scheduler
* @return an Observable that calls the given Callable and emits its
* result or Exception when an Observer subscribes
* @see #start(rx.functions.Func0)
* @see #fromFunc0(rx.functions.Func0)
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#fromcallable">RxJava Wiki: fromCallable()</a>
*/
public static <R> Observable<R> fromCallable(Callable<? extends R> callable, Scheduler scheduler) {
return Observable.create(OperationFromFunctionals.fromCallable(callable)).subscribeOn(scheduler);
return Observable.create(OperatorFromFunctionals.fromCallable(callable)).subscribeOn(scheduler);
}

/**
Expand All @@ -1759,7 +1709,7 @@ public static <R> Observable<R> fromCallable(Callable<? extends R> callable, Sch
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#fromrunnable">RxJava Wiki: fromRunnable()</a>
*/
public static <R> Observable<R> fromRunnable(final Runnable run, final R result, Scheduler scheduler) {
return Observable.create(OperationFromFunctionals.fromRunnable(run, result)).subscribeOn(scheduler);
return Observable.create(OperatorFromFunctionals.fromRunnable(run, result)).subscribeOn(scheduler);
}
/**
* Runs the provided action on the given scheduler and allows propagation
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2013 Netflix, Inc.
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2013 Netflix, Inc.
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2013 Netflix, Inc.
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,9 +24,9 @@
/**
* Defer the execution of a factory method which produces an observable sequence.
*/
public final class OperationDeferFuture {
public final class OperatorDeferFuture {
/** Utility class. */
private OperationDeferFuture() { throw new IllegalStateException("No instances!"); }
private OperatorDeferFuture() { throw new IllegalStateException("No instances!"); }

/**
* Returns an observable sequence that starts the specified asynchronous
Expand All @@ -49,7 +49,7 @@ public DeferFutureFunc0(Func0<? extends Future<? extends Observable<? extends T>

@Override
public Observable<T> call() {
return Observable.merge(OperationStartFuture.startFuture(observableFactoryAsync));
return Observable.merge(OperatorStartFuture.startFuture(observableFactoryAsync));
}

}
Expand Down Expand Up @@ -81,7 +81,7 @@ public DeferFutureFunc0Scheduled(Func0<? extends Future<? extends Observable<? e

@Override
public Observable<T> call() {
return Observable.merge(OperationStartFuture.startFuture(observableFactoryAsync, scheduler));
return Observable.merge(OperatorStartFuture.startFuture(observableFactoryAsync, scheduler));
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2013 Netflix, Inc.
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,9 +29,9 @@
* <p>
* Remark: the cancellation token version's behavior is in doubt, so left out.
*/
public final class OperationForEachFuture {
public final class OperatorForEachFuture {
/** Utility class. */
private OperationForEachFuture() { throw new IllegalStateException("No instances!"); }
private OperatorForEachFuture() { throw new IllegalStateException("No instances!"); }

/**
* Subscribes to the given source and calls the callback for each emitted item,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2013 Netflix, Inc.
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,48 +17,35 @@

import java.util.concurrent.Callable;

import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Actions;
import rx.functions.Func0;
import rx.subscriptions.Subscriptions;

/**
* Operators that invoke a function or action if
* an observer subscribes.
* Asynchrony can be achieved by using subscribeOn or observeOn.
*/
public final class OperationFromFunctionals {
public final class OperatorFromFunctionals {
/** Utility class. */
private OperationFromFunctionals() { throw new IllegalStateException("No instances!"); }
private OperatorFromFunctionals() { throw new IllegalStateException("No instances!"); }

/** Subscriber function that invokes an action and returns the given result. */
public static <R> OnSubscribeFunc<R> fromAction(Action0 action, R result) {
public static <R> OnSubscribe<R> fromAction(Action0 action, R result) {
return new InvokeAsync<R>(Actions.toFunc(action, result));
}

/**
* Subscriber function that invokes a function and returns its value.
*
* @deprecated Unnecessary now that Func0 extends Callable. Just call
* {@link #fromCallable(Callable)} instead.
*/
@Deprecated
public static <R> OnSubscribeFunc<R> fromFunc0(Func0<? extends R> function) {
return fromCallable(function);
}

/**
* Subscriber function that invokes the callable and returns its value or
* propagates its checked exception.
*/
public static <R> OnSubscribeFunc<R> fromCallable(Callable<? extends R> callable) {
public static <R> OnSubscribe<R> fromCallable(Callable<? extends R> callable) {
return new InvokeAsync<R>(callable);
}
/** Subscriber function that invokes a runnable and returns the given result. */
public static <R> OnSubscribeFunc<R> fromRunnable(final Runnable run, final R result) {
public static <R> OnSubscribe<R> fromRunnable(final Runnable run, final R result) {
return new InvokeAsync<R>(new Func0<R>() {
@Override
public R call() {
Expand All @@ -72,7 +59,7 @@ public R call() {
* Invokes a java.util.concurrent.Callable when an observer subscribes.
* @param <R> the return type
*/
static final class InvokeAsync<R> implements OnSubscribeFunc<R> {
static final class InvokeAsync<R> implements OnSubscribe<R> {
final Callable<? extends R> callable;
public InvokeAsync(Callable<? extends R> callable) {
if (callable == null) {
Expand All @@ -81,16 +68,14 @@ public InvokeAsync(Callable<? extends R> callable) {
this.callable = callable;
}
@Override
public Subscription onSubscribe(Observer<? super R> t1) {
Subscription s = Subscriptions.empty();
public void call(Subscriber<? super R> t1) {
try {
t1.onNext(callable.call());
} catch (Throwable t) {
t1.onError(t);
return s;
return;
}
t1.onCompleted();
return s;
}
}
}
Loading