Skip to content

Commit 43437fe

Browse files
SubscribeOn Scheduler/Unsubscribe Behavior
1 parent 7084cd0 commit 43437fe

File tree

2 files changed

+17
-29
lines changed

2 files changed

+17
-29
lines changed

rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public OperatorSubscribeOn(Scheduler scheduler, int bufferSize) {
6767

6868
@Override
6969
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
70-
return new Subscriber<Observable<T>>() {
70+
return new Subscriber<Observable<T>>(subscriber) {
7171

7272
@Override
7373
public void onCompleted() {
@@ -98,29 +98,13 @@ public void call(final Inner inner) {
9898
return;
9999
} else {
100100
// no buffering (async subscribe)
101-
scheduler.schedule(new Action1<Inner>() {
101+
subscriber.add(scheduler.schedule(new Action1<Inner>() {
102102

103103
@Override
104104
public void call(final Inner inner) {
105-
o.subscribe(new Subscriber<T>(subscriber) {
106-
107-
@Override
108-
public void onCompleted() {
109-
subscriber.onCompleted();
110-
}
111-
112-
@Override
113-
public void onError(Throwable e) {
114-
subscriber.onError(e);
115-
}
116-
117-
@Override
118-
public void onNext(T t) {
119-
subscriber.onNext(t);
120-
}
121-
});
105+
o.subscribe(subscriber);
122106
}
123-
});
107+
}));
124108
}
125109
}
126110

rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646

4747
public class OperatorSubscribeOnTest {
4848

49-
private class ThreadSubscription implements Subscription {
49+
private static class ThreadSubscription implements Subscription {
5050
private volatile Thread thread;
5151

5252
private final CountDownLatch latch = new CountDownLatch(1);
@@ -111,9 +111,10 @@ public void call(Subscriber<? super Integer> t1) {
111111
observer.assertTerminalEvent();
112112
}
113113

114-
@Test
114+
@Test(timeout = 2000)
115115
public void testIssue813() throws InterruptedException {
116116
// https://github.com/Netflix/RxJava/issues/813
117+
final CountDownLatch scheduled = new CountDownLatch(1);
117118
final CountDownLatch latch = new CountDownLatch(1);
118119
final CountDownLatch doneLatch = new CountDownLatch(1);
119120

@@ -126,16 +127,14 @@ public void testIssue813() throws InterruptedException {
126127
public void call(
127128
final Subscriber<? super Integer> subscriber) {
128129
subscriber.add(s);
130+
scheduled.countDown();
129131
try {
130132
latch.await();
131-
// Already called "unsubscribe", "isUnsubscribed"
132-
// shouble be true
133-
if (!subscriber.isUnsubscribed()) {
134-
throw new IllegalStateException(
135-
"subscriber.isUnsubscribed should be true");
136-
}
133+
134+
// this should not run because the await above will be interrupted by the unsubscribe
137135
subscriber.onCompleted();
138136
} catch (InterruptedException e) {
137+
System.out.println("Interrupted because it is unsubscribed");
139138
Thread.currentThread().interrupt();
140139
} catch (Throwable e) {
141140
subscriber.onError(e);
@@ -145,13 +144,17 @@ public void call(
145144
}
146145
}).subscribeOn(Schedulers.computation()).subscribe(observer);
147146

147+
// wait for scheduling
148+
scheduled.await();
149+
// trigger unsubscribe
148150
subscription.unsubscribe();
149151
// As unsubscribe is called in other thread, we need to wait for it.
150152
s.getThread();
151153
latch.countDown();
152154
doneLatch.await();
153155
assertEquals(0, observer.getOnErrorEvents().size());
154-
assertEquals(1, observer.getOnCompletedEvents().size());
156+
// 0 because the unsubscribe interrupts and prevents onCompleted from being executed
157+
assertEquals(0, observer.getOnCompletedEvents().size());
155158
}
156159

157160
public static class SlowScheduler extends Scheduler {
@@ -395,4 +398,5 @@ public void call(Subscriber<? super Integer> sub) {
395398
ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
396399
assertEquals(10, count.get());
397400
}
401+
398402
}

0 commit comments

Comments
 (0)