From a0797c55a0839290491087e208472b5daa5c5c27 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 14 Nov 2019 17:57:20 +0100 Subject: [PATCH 1/2] 3.x: Fix MulticastProcessor not requesting more after limit is reached --- .../rxjava3/processors/MulticastProcessor.java | 1 + .../processors/MulticastProcessorTest.java | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/src/main/java/io/reactivex/rxjava3/processors/MulticastProcessor.java b/src/main/java/io/reactivex/rxjava3/processors/MulticastProcessor.java index fbf3f17521..50497473ac 100644 --- a/src/main/java/io/reactivex/rxjava3/processors/MulticastProcessor.java +++ b/src/main/java/io/reactivex/rxjava3/processors/MulticastProcessor.java @@ -570,6 +570,7 @@ void drain() { } } + consumed = c; missed = wip.addAndGet(-missed); if (missed == 0) { break; diff --git a/src/test/java/io/reactivex/rxjava3/processors/MulticastProcessorTest.java b/src/test/java/io/reactivex/rxjava3/processors/MulticastProcessorTest.java index 0274a1f9a1..a3aabdcd64 100644 --- a/src/test/java/io/reactivex/rxjava3/processors/MulticastProcessorTest.java +++ b/src/test/java/io/reactivex/rxjava3/processors/MulticastProcessorTest.java @@ -784,4 +784,22 @@ public void noUpstream() { assertTrue(mp.hasSubscribers()); } + @Test + public void requestUpstreamPrefetchNonFused() { + for (int j = 1; j < 12; j++) { + MulticastProcessor mp = MulticastProcessor.create(2, true); + + TestSubscriber ts = mp.test(0).withTag("Prefetch: " + j); + + Flowable.range(1, 10).hide().subscribe(mp); + + ts.assertEmpty() + .requestMore(3) + .assertValuesOnly(1, 2, 3) + .requestMore(3) + .assertValuesOnly(1, 2, 3, 4, 5, 6) + .requestMore(4) + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + } } From 3869fc1f6fb1fdd01071aedcf71700b99675c427 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 14 Nov 2019 18:03:20 +0100 Subject: [PATCH 2/2] Test for more prefetch values and patterns. --- .../processors/MulticastProcessorTest.java | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/test/java/io/reactivex/rxjava3/processors/MulticastProcessorTest.java b/src/test/java/io/reactivex/rxjava3/processors/MulticastProcessorTest.java index a3aabdcd64..d34fbabcb4 100644 --- a/src/test/java/io/reactivex/rxjava3/processors/MulticastProcessorTest.java +++ b/src/test/java/io/reactivex/rxjava3/processors/MulticastProcessorTest.java @@ -787,7 +787,7 @@ public void noUpstream() { @Test public void requestUpstreamPrefetchNonFused() { for (int j = 1; j < 12; j++) { - MulticastProcessor mp = MulticastProcessor.create(2, true); + MulticastProcessor mp = MulticastProcessor.create(j, true); TestSubscriber ts = mp.test(0).withTag("Prefetch: " + j); @@ -802,4 +802,23 @@ public void requestUpstreamPrefetchNonFused() { .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); } } + + @Test + public void requestUpstreamPrefetchNonFused2() { + for (int j = 1; j < 12; j++) { + MulticastProcessor mp = MulticastProcessor.create(j, true); + + TestSubscriber ts = mp.test(0).withTag("Prefetch: " + j); + + Flowable.range(1, 10).hide().subscribe(mp); + + ts.assertEmpty() + .requestMore(2) + .assertValuesOnly(1, 2) + .requestMore(2) + .assertValuesOnly(1, 2, 3, 4) + .requestMore(6) + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + } }