diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSwitchMap.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSwitchMap.java index e213352a05..3e0558aacf 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSwitchMap.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSwitchMap.java @@ -349,9 +349,10 @@ public void onSubscribe(Disposable d) { @Override public void onNext(R t) { - if (index == parent.unique) { + SimpleQueue q = queue; + if (index == parent.unique && q != null) { if (t != null) { - queue.offer(t); + q.offer(t); } parent.drain(); } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSwitchTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSwitchTest.java index 2bb5a05808..9f200e3c56 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSwitchTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSwitchTest.java @@ -1377,4 +1377,19 @@ Flowable createFlowable(AtomicInteger inner) { inner.incrementAndGet(); }); } + + @Test + public void innerOnSubscribeOuterCancelRace() { + TestSubscriber ts = new TestSubscriber(); + + Flowable.just(1) + .hide() + .switchMap(v -> Flowable.just(1) + .doOnSubscribe(d -> ts.cancel()) + .scan(1, (a, b) -> a) + ) + .subscribe(ts); + + ts.assertEmpty(); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSwitchTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSwitchTest.java index ffa4049a44..72e6c93bb8 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSwitchTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSwitchTest.java @@ -1438,4 +1438,19 @@ Observable createObservable(AtomicInteger inner) { inner.incrementAndGet(); }); } + + @Test + public void innerOnSubscribeOuterCancelRace() { + TestObserver to = new TestObserver(); + + Observable.just(1) + .hide() + .switchMap(v -> Observable.just(1) + .doOnSubscribe(d -> to.dispose()) + .scan(1, (a, b) -> a) + ) + .subscribe(to); + + to.assertEmpty(); + } }