From 7484fc9af70bea89181782d2d4a8428e72f731fa Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sat, 26 Apr 2014 00:42:51 +0200 Subject: [PATCH] OperatorOnErrorReturn --- rxjava-core/src/main/java/rx/Observable.java | 4 +- .../rx/operators/OperationOnErrorReturn.java | 124 ------------------ .../rx/operators/OperatorOnErrorReturn.java | 79 +++++++++++ ...st.java => OperatorOnErrorReturnTest.java} | 21 ++- 4 files changed, 91 insertions(+), 137 deletions(-) delete mode 100644 rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorOnErrorReturn.java rename rxjava-core/src/test/java/rx/operators/{OperationOnErrorReturnTest.java => OperatorOnErrorReturnTest.java} (88%) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 746362ca78..dcda7c0e80 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -58,7 +58,6 @@ import rx.operators.OperationMergeMaxConcurrent; import rx.operators.OperationMulticast; import rx.operators.OperationOnErrorResumeNextViaObservable; -import rx.operators.OperationOnErrorReturn; import rx.operators.OperationOnExceptionResumeNextViaObservable; import rx.operators.OperationParallelMerge; import rx.operators.OperationReplay; @@ -108,6 +107,7 @@ import rx.operators.OperatorObserveOn; import rx.operators.OperatorOnErrorFlatMap; import rx.operators.OperatorOnErrorResumeNextViaFunction; +import rx.operators.OperatorOnErrorReturn; import rx.operators.OperatorParallel; import rx.operators.OperatorPivot; import rx.operators.OperatorRepeat; @@ -4547,7 +4547,7 @@ public final Observable onErrorResumeNext(final Observable resum * @see RxJava Wiki: onErrorReturn() */ public final Observable onErrorReturn(Func1 resumeFunction) { - return create(OperationOnErrorReturn.onErrorReturn(this, resumeFunction)); + return lift(new OperatorOnErrorReturn(resumeFunction)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java b/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java deleted file mode 100644 index 785775f064..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import java.util.Arrays; -import java.util.concurrent.atomic.AtomicReference; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Subscriber; -import rx.Subscription; -import rx.exceptions.CompositeException; -import rx.functions.Action0; -import rx.functions.Func1; -import rx.subscriptions.Subscriptions; - -/** - * Instruct an Observable to emit a particular item to its Observer's onNext method - * rather than invoking onError if it encounters an error. - *

- * - *

- * By default, when an Observable encounters an error that prevents it from emitting the expected - * item to its Observer, the Observable invokes its Observer's onError method, and then - * quits without invoking any more of its Observer's methods. The onErrorReturn operation changes - * this behavior. If you pass a function (resumeFunction) to onErrorReturn, if the original - * Observable encounters an error, instead of invoking its Observer's onError method, - * it will instead pass the return value of resumeFunction to the Observer's onNext - * method. - *

- * You can use this to prevent errors from propagating or to supply fallback data should errors be - * encountered. - */ -public final class OperationOnErrorReturn { - - public static OnSubscribeFunc onErrorReturn(Observable originalSequence, Func1 resumeFunction) { - return new OnErrorReturn(originalSequence, resumeFunction); - } - - private static class OnErrorReturn implements OnSubscribeFunc { - private final Func1 resumeFunction; - private final Observable originalSequence; - - public OnErrorReturn(Observable originalSequence, Func1 resumeFunction) { - this.resumeFunction = resumeFunction; - this.originalSequence = originalSequence; - } - - public Subscription onSubscribe(final Observer observer) { - final SafeObservableSubscription subscription = new SafeObservableSubscription(); - - // AtomicReference since we'll be accessing/modifying this across threads so we can switch it if needed - final AtomicReference subscriptionRef = new AtomicReference(subscription); - - // subscribe to the original Observable and remember the subscription - subscription.wrap(originalSequence.unsafeSubscribe(new Subscriber() { - public void onNext(T value) { - // forward the successful calls - observer.onNext(value); - } - - /** - * Instead of passing the onError forward, we intercept and "resume" with the resumeSequence. - */ - public void onError(Throwable ex) { - /* remember what the current subscription is so we can determine if someone unsubscribes concurrently */ - SafeObservableSubscription currentSubscription = subscriptionRef.get(); - // check that we have not been unsubscribed before we can process the error - if (currentSubscription != null) { - try { - /* error occurred, so execute the function, give it the exception and call onNext with the response */ - onNext(resumeFunction.call(ex)); - /* - * we are not handling an exception thrown from this function ... should we do something? - * error handling within an error handler is a weird one to determine what we should do - * right now I'm going to just let it throw whatever exceptions occur (such as NPE) - * but I'm considering calling the original Observer.onError to act as if this OnErrorReturn operator didn't happen - */ - - /* we are now completed */ - onCompleted(); - - /* unsubscribe since it blew up */ - currentSubscription.unsubscribe(); - } catch (Throwable e) { - // the return function failed so we need to call onError - // I am using CompositeException so that both exceptions can be seen - observer.onError(new CompositeException("OnErrorReturn function failed", Arrays.asList(ex, e))); - } - } - } - - public void onCompleted() { - // forward the successful calls - observer.onCompleted(); - } - })); - - return Subscriptions.create(new Action0() { - public void call() { - // this will get either the original, or the resumeSequence one and unsubscribe on it - Subscription s = subscriptionRef.getAndSet(null); - if (s != null) { - s.unsubscribe(); - } - } - }); - } - } -} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorOnErrorReturn.java b/rxjava-core/src/main/java/rx/operators/OperatorOnErrorReturn.java new file mode 100644 index 0000000000..c86c36032c --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorOnErrorReturn.java @@ -0,0 +1,79 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.operators; + +import java.util.Arrays; +import rx.Observable.Operator; +import rx.Subscriber; +import rx.exceptions.CompositeException; +import rx.functions.Func1; + +/** + * Instruct an Observable to emit a particular item to its Observer's onNext method + * rather than invoking onError if it encounters an error. + *

+ * + *

+ * By default, when an Observable encounters an error that prevents it from emitting the expected + * item to its Observer, the Observable invokes its Observer's onError method, and then + * quits without invoking any more of its Observer's methods. The onErrorReturn operation changes + * this behavior. If you pass a function (resumeFunction) to onErrorReturn, if the original + * Observable encounters an error, instead of invoking its Observer's onError method, + * it will instead pass the return value of resumeFunction to the Observer's onNext + * method. + *

+ * You can use this to prevent errors from propagating or to supply fallback data should errors be + * encountered. + * + * @param the value type + */ +public final class OperatorOnErrorReturn implements Operator { + final Func1 resultFunction; + + public OperatorOnErrorReturn(Func1 resultFunction) { + this.resultFunction = resultFunction; + } + + @Override + public Subscriber call(final Subscriber child) { + return new Subscriber(child) { + + @Override + public void onNext(T t) { + child.onNext(t); + } + + @Override + public void onError(Throwable e) { + try { + T result = resultFunction.call(e); + + child.onNext(result); + } catch (Throwable x) { + child.onError(new CompositeException(Arrays.asList(e, x))); + return; + } + child.onCompleted(); + } + + @Override + public void onCompleted() { + child.onCompleted(); + } + + }; + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationOnErrorReturnTest.java b/rxjava-core/src/test/java/rx/operators/OperatorOnErrorReturnTest.java similarity index 88% rename from rxjava-core/src/test/java/rx/operators/OperationOnErrorReturnTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorOnErrorReturnTest.java index 32e75a3a48..a73d972025 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationOnErrorReturnTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorOnErrorReturnTest.java @@ -21,7 +21,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static rx.operators.OperationOnErrorReturn.onErrorReturn; import java.util.concurrent.atomic.AtomicReference; @@ -30,10 +29,11 @@ import rx.Observable; import rx.Observer; +import rx.Subscriber; import rx.Subscription; import rx.functions.Func1; -public class OperationOnErrorReturnTest { +public class OperatorOnErrorReturnTest { @Test public void testResumeNext() { @@ -42,7 +42,7 @@ public void testResumeNext() { Observable w = Observable.create(f); final AtomicReference capturedException = new AtomicReference(); - Observable observable = Observable.create(onErrorReturn(w, new Func1() { + Observable observable = w.onErrorReturn(new Func1() { @Override public String call(Throwable e) { @@ -50,7 +50,7 @@ public String call(Throwable e) { return "failure"; } - })); + }); @SuppressWarnings("unchecked") Observer observer = mock(Observer.class); @@ -79,7 +79,7 @@ public void testFunctionThrowsError() { Observable w = Observable.create(f); final AtomicReference capturedException = new AtomicReference(); - Observable observable = Observable.create(onErrorReturn(w, new Func1() { + Observable observable = w.onErrorReturn(new Func1() { @Override public String call(Throwable e) { @@ -87,7 +87,7 @@ public String call(Throwable e) { throw new RuntimeException("exception from function"); } - })); + }); @SuppressWarnings("unchecked") Observer observer = mock(Observer.class); @@ -108,7 +108,7 @@ public String call(Throwable e) { assertNotNull(capturedException.get()); } - private static class TestObservable implements Observable.OnSubscribeFunc { + private static class TestObservable implements Observable.OnSubscribe { final Subscription s; final String[] values; @@ -120,7 +120,7 @@ public TestObservable(Subscription s, String... values) { } @Override - public Subscription onSubscribe(final Observer observer) { + public void call(final Subscriber subscriber) { System.out.println("TestObservable subscribed to ..."); t = new Thread(new Runnable() { @@ -130,11 +130,11 @@ public void run() { System.out.println("running TestObservable thread"); for (String s : values) { System.out.println("TestObservable onNext: " + s); - observer.onNext(s); + subscriber.onNext(s); } throw new RuntimeException("Forced Failure"); } catch (Throwable e) { - observer.onError(e); + subscriber.onError(e); } } @@ -142,7 +142,6 @@ public void run() { System.out.println("starting TestObservable thread"); t.start(); System.out.println("done starting TestObservable thread"); - return s; } } }