Skip to content

Commit a2f4782

Browse files
Merge pull request #1243 from benjchristensen/perf
Remove Subscription Wrapper from Observable.subscribe
2 parents 4ae5333 + 0db09c3 commit a2f4782

File tree

4 files changed

+183
-10
lines changed

4 files changed

+183
-10
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6255,16 +6255,7 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
62556255
subscriber = new SafeSubscriber<T>(subscriber);
62566256
}
62576257
onSubscribeFunction.call(subscriber);
6258-
final Subscription returnSubscription = hook.onSubscribeReturn(subscriber);
6259-
// we return it inside a Subscription so it can't be cast back to Subscriber
6260-
return Subscriptions.create(new Action0() {
6261-
6262-
@Override
6263-
public void call() {
6264-
returnSubscription.unsubscribe();
6265-
}
6266-
6267-
});
6258+
return hook.onSubscribeReturn(subscriber);
62686259
} catch (Throwable e) {
62696260
// special handling for certain Throwable/Error/Exception types
62706261
Exceptions.throwIfFatal(e);
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.usecases;
17+
18+
import org.openjdk.jmh.annotations.GenerateMicroBenchmark;
19+
20+
import rx.schedulers.Schedulers;
21+
22+
public class PerfObserveOn {
23+
24+
@GenerateMicroBenchmark
25+
public void observeOn(UseCaseInput input) throws InterruptedException {
26+
input.observable.observeOn(Schedulers.computation()).subscribe(input.observer);
27+
input.awaitCompletion();
28+
}
29+
30+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.usecases;
17+
18+
import org.openjdk.jmh.annotations.GenerateMicroBenchmark;
19+
20+
import rx.Observable;
21+
import rx.functions.Func1;
22+
23+
public class PerfTransforms {
24+
25+
@GenerateMicroBenchmark
26+
public void mapTransformation(UseCaseInput input) throws InterruptedException {
27+
input.observable.map(new Func1<Integer, String>() {
28+
29+
@Override
30+
public String call(Integer i) {
31+
return String.valueOf(i);
32+
}
33+
34+
}).map(new Func1<String, Integer>() {
35+
36+
@Override
37+
public Integer call(String i) {
38+
return Integer.parseInt(i);
39+
}
40+
41+
}).subscribe(input.observer);
42+
input.awaitCompletion();
43+
}
44+
45+
@GenerateMicroBenchmark
46+
public void flatMapTransformsUsingFrom(UseCaseInput input) throws InterruptedException {
47+
input.observable.flatMap(new Func1<Integer, Observable<Integer>>() {
48+
49+
@Override
50+
public Observable<Integer> call(Integer i) {
51+
return Observable.from(i);
52+
}
53+
54+
}).subscribe(input.observer);
55+
input.awaitCompletion();
56+
}
57+
58+
@GenerateMicroBenchmark
59+
public void flatMapTransformsUsingJust(UseCaseInput input) throws InterruptedException {
60+
input.observable.flatMap(new Func1<Integer, Observable<Integer>>() {
61+
62+
@Override
63+
public Observable<Integer> call(Integer i) {
64+
return Observable.just(i);
65+
}
66+
67+
}).subscribe(input.observer);
68+
input.awaitCompletion();
69+
}
70+
71+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.usecases;
17+
18+
import java.util.concurrent.CountDownLatch;
19+
20+
import org.openjdk.jmh.annotations.Param;
21+
import org.openjdk.jmh.annotations.Scope;
22+
import org.openjdk.jmh.annotations.Setup;
23+
import org.openjdk.jmh.annotations.State;
24+
import org.openjdk.jmh.logic.BlackHole;
25+
26+
import rx.Observable;
27+
import rx.Observable.OnSubscribe;
28+
import rx.Observer;
29+
import rx.Subscriber;
30+
31+
/**
32+
* Exposes an Observable and Observer that increments n Integers and consumes them in a Blackhole.
33+
*/
34+
@State(Scope.Thread)
35+
public class UseCaseInput {
36+
@Param({ "1", "1024" })
37+
public int size;
38+
39+
public Observable<Integer> observable;
40+
public Observer<Integer> observer;
41+
42+
private CountDownLatch latch;
43+
44+
@Setup
45+
public void setup() {
46+
observable = Observable.create(new OnSubscribe<Integer>() {
47+
@Override
48+
public void call(Subscriber<? super Integer> o) {
49+
for (int value = 0; value < size; value++) {
50+
o.onNext(value);
51+
}
52+
o.onCompleted();
53+
}
54+
});
55+
56+
final BlackHole bh = new BlackHole();
57+
latch = new CountDownLatch(1);
58+
59+
observer = new Observer<Integer>() {
60+
@Override
61+
public void onCompleted() {
62+
latch.countDown();
63+
}
64+
65+
@Override
66+
public void onError(Throwable e) {
67+
throw new RuntimeException(e);
68+
}
69+
70+
@Override
71+
public void onNext(Integer value) {
72+
bh.consume(value);
73+
}
74+
};
75+
76+
}
77+
78+
public void awaitCompletion() throws InterruptedException {
79+
latch.await();
80+
}
81+
}

0 commit comments

Comments
 (0)