Skip to content

Commit bf976a2

Browse files
committed
OperatorToObservableFuture
1 parent 4e77f8a commit bf976a2

File tree

3 files changed

+38
-34
lines changed

3 files changed

+38
-34
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@
8686
import rx.operators.OperationTimer;
8787
import rx.operators.OperationToMap;
8888
import rx.operators.OperationToMultimap;
89-
import rx.operators.OperationToObservableFuture;
9089
import rx.operators.OperationUsing;
9190
import rx.operators.OperationWindow;
9291
import rx.operators.OperatorAll;
@@ -121,6 +120,7 @@
121120
import rx.operators.OperatorTimeout;
122121
import rx.operators.OperatorTimeoutWithSelector;
123122
import rx.operators.OperatorTimestamp;
123+
import rx.operators.OperatorToObservableFuture;
124124
import rx.operators.OperatorToObservableList;
125125
import rx.operators.OperatorToObservableSortedList;
126126
import rx.operators.OperatorUnsubscribeOn;
@@ -1083,7 +1083,7 @@ public final static <T> Observable<T> error(Throwable exception, Scheduler sched
10831083
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-from">RxJava Wiki: from()</a>
10841084
*/
10851085
public final static <T> Observable<T> from(Future<? extends T> future) {
1086-
return create(OperationToObservableFuture.toObservableFuture(future));
1086+
return create(OperatorToObservableFuture.toObservableFuture(future));
10871087
}
10881088

10891089
/**
@@ -1109,7 +1109,7 @@ public final static <T> Observable<T> from(Future<? extends T> future) {
11091109
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-from">RxJava Wiki: from()</a>
11101110
*/
11111111
public final static <T> Observable<T> from(Future<? extends T> future, long timeout, TimeUnit unit) {
1112-
return create(OperationToObservableFuture.toObservableFuture(future, timeout, unit));
1112+
return create(OperatorToObservableFuture.toObservableFuture(future, timeout, unit));
11131113
}
11141114

11151115
/**
@@ -1132,7 +1132,7 @@ public final static <T> Observable<T> from(Future<? extends T> future, long time
11321132
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-from">RxJava Wiki: from()</a>
11331133
*/
11341134
public final static <T> Observable<T> from(Future<? extends T> future, Scheduler scheduler) {
1135-
return create(OperationToObservableFuture.toObservableFuture(future)).subscribeOn(scheduler);
1135+
return create(OperatorToObservableFuture.toObservableFuture(future)).subscribeOn(scheduler);
11361136
}
11371137

11381138
/**

rxjava-core/src/main/java/rx/operators/OperationToObservableFuture.java renamed to rxjava-core/src/main/java/rx/operators/OperatorToObservableFuture.java

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
import java.util.concurrent.Future;
1919
import java.util.concurrent.TimeUnit;
2020

21-
import rx.Observable.OnSubscribeFunc;
22-
import rx.Observer;
23-
import rx.Subscription;
21+
import rx.Observable.OnSubscribe;
22+
import rx.Subscriber;
23+
import rx.functions.Action0;
2424
import rx.subscriptions.Subscriptions;
2525

2626
/**
@@ -34,15 +34,15 @@
3434
* This is blocking so the Subscription returned when calling
3535
* <code>Observable.unsafeSubscribe(Observer)</code> does nothing.
3636
*/
37-
public class OperationToObservableFuture {
38-
/* package accessible for unit tests */static class ToObservableFuture<T> implements OnSubscribeFunc<T> {
37+
public class OperatorToObservableFuture {
38+
/* package accessible for unit tests */static class ToObservableFuture<T> implements OnSubscribe<T> {
3939
private final Future<? extends T> that;
40-
private final Long time;
40+
private final long time;
4141
private final TimeUnit unit;
4242

4343
public ToObservableFuture(Future<? extends T> that) {
4444
this.that = that;
45-
this.time = null;
45+
this.time = 0;
4646
this.unit = null;
4747
}
4848

@@ -53,29 +53,34 @@ public ToObservableFuture(Future<? extends T> that, long time, TimeUnit unit) {
5353
}
5454

5555
@Override
56-
public Subscription onSubscribe(Observer<? super T> observer) {
57-
try {
58-
T value = (time == null) ? (T) that.get() : (T) that.get(time, unit);
59-
60-
if (!that.isCancelled()) {
61-
observer.onNext(value);
56+
public void call(Subscriber<? super T> subscriber) {
57+
subscriber.add(Subscriptions.create(new Action0() {
58+
@Override
59+
public void call() {
60+
// If the Future is already completed, "cancel" does nothing.
61+
that.cancel(true);
6262
}
63-
observer.onCompleted();
63+
}));
64+
try {
65+
T value = (unit == null) ? (T) that.get() : (T) that.get(time, unit);
66+
subscriber.onNext(value);
67+
subscriber.onCompleted();
6468
} catch (Throwable e) {
65-
observer.onError(e);
69+
// If this Observable is unsubscribed, we will receive an CancellationException.
70+
// However, CancellationException will not be passed to the final Subscriber
71+
// since it's already subscribed.
72+
// If the Future is canceled in other place, CancellationException will be still
73+
// passed to the final Subscriber.
74+
subscriber.onError(e);
6675
}
67-
68-
// the get() has already completed so there is no point in
69-
// giving the user a way to cancel.
70-
return Subscriptions.empty();
7176
}
7277
}
7378

74-
public static <T> OnSubscribeFunc<T> toObservableFuture(final Future<? extends T> that) {
79+
public static <T> OnSubscribe<T> toObservableFuture(final Future<? extends T> that) {
7580
return new ToObservableFuture<T>(that);
7681
}
7782

78-
public static <T> OnSubscribeFunc<T> toObservableFuture(final Future<? extends T> that, long time, TimeUnit unit) {
83+
public static <T> OnSubscribe<T> toObservableFuture(final Future<? extends T> that, long time, TimeUnit unit) {
7984
return new ToObservableFuture<T>(that, time, unit);
8085
}
8186
}

rxjava-core/src/test/java/rx/operators/OperationToObservableFutureTest.java renamed to rxjava-core/src/test/java/rx/operators/OperatorToObservableFutureTest.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.operators;
1717

18+
import static org.mockito.Mockito.any;
1819
import static org.mockito.Mockito.mock;
1920
import static org.mockito.Mockito.never;
2021
import static org.mockito.Mockito.times;
@@ -25,44 +26,42 @@
2526

2627
import org.junit.Test;
2728

29+
import rx.Observable;
2830
import rx.Observer;
2931
import rx.Subscription;
3032
import rx.observers.TestObserver;
31-
import rx.operators.OperationToObservableFuture.ToObservableFuture;
3233

33-
public class OperationToObservableFutureTest {
34+
public class OperatorToObservableFutureTest {
3435

3536
@Test
3637
public void testSuccess() throws Exception {
3738
Future<Object> future = mock(Future.class);
3839
Object value = new Object();
3940
when(future.get()).thenReturn(value);
40-
ToObservableFuture<Object> ob = new ToObservableFuture<Object>(future);
4141
Observer<Object> o = mock(Observer.class);
4242

43-
Subscription sub = ob.onSubscribe(new TestObserver<Object>(o));
43+
Subscription sub = Observable.from(future).subscribe(new TestObserver<Object>(o));
4444
sub.unsubscribe();
4545

4646
verify(o, times(1)).onNext(value);
4747
verify(o, times(1)).onCompleted();
48-
verify(o, never()).onError(null);
49-
verify(future, never()).cancel(true);
48+
verify(o, never()).onError(any(Throwable.class));
49+
verify(future, times(1)).cancel(true);
5050
}
5151

5252
@Test
5353
public void testFailure() throws Exception {
5454
Future<Object> future = mock(Future.class);
5555
RuntimeException e = new RuntimeException();
5656
when(future.get()).thenThrow(e);
57-
ToObservableFuture<Object> ob = new ToObservableFuture<Object>(future);
5857
Observer<Object> o = mock(Observer.class);
5958

60-
Subscription sub = ob.onSubscribe(new TestObserver<Object>(o));
59+
Subscription sub = Observable.from(future).subscribe(new TestObserver<Object>(o));
6160
sub.unsubscribe();
6261

6362
verify(o, never()).onNext(null);
6463
verify(o, never()).onCompleted();
6564
verify(o, times(1)).onError(e);
66-
verify(future, never()).cancel(true);
65+
verify(future, times(1)).cancel(true);
6766
}
6867
}

0 commit comments

Comments
 (0)