Skip to content

Commit af07b19

Browse files
committed
OperatorUsing
1 parent 95e0636 commit af07b19

File tree

4 files changed

+62
-75
lines changed

4 files changed

+62
-75
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@
7676
import rx.operators.OperationTimer;
7777
import rx.operators.OperationToMap;
7878
import rx.operators.OperationToMultimap;
79-
import rx.operators.OperationUsing;
79+
import rx.operators.OperatorUsing;
8080
import rx.operators.OperationWindow;
8181
import rx.operators.OperatorAll;
8282
import rx.operators.OperatorAmb;
@@ -2589,8 +2589,8 @@ public final static Observable<Long> timer(long delay, TimeUnit unit, Scheduler
25892589
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-using">RxJava Wiki: using()</a>
25902590
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229585.aspx">MSDN: Observable.Using</a>
25912591
*/
2592-
public final static <T, RESOURCE extends Subscription> Observable<T> using(Func0<RESOURCE> resourceFactory, Func1<RESOURCE, Observable<T>> observableFactory) {
2593-
return create(OperationUsing.using(resourceFactory, observableFactory));
2592+
public final static <T, Resource extends Subscription> Observable<T> using(Func0<Resource> resourceFactory, Func1<Resource, ? extends Observable<? extends T>> observableFactory) {
2593+
return create(new OperatorUsing<T, Resource>(resourceFactory, observableFactory));
25942594
}
25952595

25962596
/**

rxjava-core/src/main/java/rx/operators/OperationUsing.java

Lines changed: 0 additions & 60 deletions
This file was deleted.
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable;
19+
import rx.Observable.OnSubscribe;
20+
import rx.Subscriber;
21+
import rx.Subscription;
22+
import rx.functions.Func0;
23+
import rx.functions.Func1;
24+
25+
/**
26+
* Constructs an observable sequence that depends on a resource object.
27+
*/
28+
public final class OperatorUsing<T, Resource extends Subscription> implements OnSubscribe<T> {
29+
30+
private final Func0<Resource> resourceFactory;
31+
private final Func1<Resource, ? extends Observable<? extends T>> observableFactory;
32+
33+
public OperatorUsing(Func0<Resource> resourceFactory, Func1<Resource, ? extends Observable<? extends T>> observableFactory) {
34+
this.resourceFactory = resourceFactory;
35+
this.observableFactory = observableFactory;
36+
}
37+
38+
public void call(Subscriber<? super T> subscriber) {
39+
Resource resource = null;
40+
try {
41+
resource = resourceFactory.call();
42+
subscriber.add(resource);
43+
Observable<? extends T> observable = observableFactory.call(resource);
44+
observable.subscribe(subscriber);
45+
} catch (Throwable e) {
46+
if (resource != null) {
47+
resource.unsubscribe();
48+
}
49+
subscriber.onError(e);
50+
}
51+
}
52+
53+
}

rxjava-core/src/test/java/rx/operators/OperationUsingTest.java renamed to rxjava-core/src/test/java/rx/operators/OperatorUsingTest.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import static org.mockito.Mockito.times;
2222
import static org.mockito.Mockito.verify;
2323
import static org.mockito.Mockito.when;
24-
import static rx.operators.OperationUsing.using;
2524

2625
import org.junit.Test;
2726
import org.mockito.InOrder;
@@ -35,7 +34,7 @@
3534
import rx.functions.Func1;
3635
import rx.subscriptions.Subscriptions;
3736

38-
public class OperationUsingTest {
37+
public class OperatorUsingTest {
3938

4039
@SuppressWarnings("serial")
4140
private static class TestException extends RuntimeException {
@@ -69,8 +68,7 @@ public Observable<String> call(Resource resource) {
6968

7069
@SuppressWarnings("unchecked")
7170
Observer<String> observer = (Observer<String>) mock(Observer.class);
72-
Observable<String> observable = Observable.create(using(
73-
resourceFactory, observableFactory));
71+
Observable<String> observable = Observable.using(resourceFactory, observableFactory);
7472
observable.subscribe(observer);
7573

7674
InOrder inOrder = inOrder(observer);
@@ -124,8 +122,7 @@ public Observable<String> call(Resource resource) {
124122

125123
@SuppressWarnings("unchecked")
126124
Observer<String> observer = (Observer<String>) mock(Observer.class);
127-
Observable<String> observable = Observable.create(using(
128-
resourceFactory, observableFactory));
125+
Observable<String> observable = Observable.using(resourceFactory, observableFactory);
129126
observable.subscribe(observer);
130127
observable.subscribe(observer);
131128

@@ -157,8 +154,7 @@ public Observable<Integer> call(Subscription subscription) {
157154
}
158155
};
159156

160-
Observable.create(using(resourceFactory, observableFactory))
161-
.toBlockingObservable().last();
157+
Observable.using(resourceFactory, observableFactory).toBlockingObservable().last();
162158
}
163159

164160
@Test
@@ -179,8 +175,7 @@ public Observable<Integer> call(Subscription subscription) {
179175
};
180176

181177
try {
182-
Observable.create(using(resourceFactory, observableFactory))
183-
.toBlockingObservable().last();
178+
Observable.using(resourceFactory, observableFactory).toBlockingObservable().last();
184179
fail("Should throw a TestException when the observableFactory throws it");
185180
} catch (TestException e) {
186181
// Make sure that unsubscribe is called so that users can close
@@ -212,8 +207,7 @@ public Subscription onSubscribe(Observer<? super Integer> t1) {
212207
};
213208

214209
try {
215-
Observable.create(using(resourceFactory, observableFactory))
216-
.toBlockingObservable().last();
210+
Observable.using(resourceFactory, observableFactory).toBlockingObservable().last();
217211
fail("Should throw a TestException when the observableFactory throws it");
218212
} catch (TestException e) {
219213
// Make sure that unsubscribe is called so that users can close

0 commit comments

Comments
 (0)