Skip to content

Commit e598966

Browse files
Merge pull request #1378 from benjchristensen/pivot
BugFix: Pivot Concurrency
2 parents 69227ff + e2c5bfc commit e598966

File tree

2 files changed

+26
-13
lines changed

2 files changed

+26
-13
lines changed

rxjava-core/src/main/java/rx/internal/operators/OperatorPivot.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@
2525

2626
import rx.Observable.OnSubscribe;
2727
import rx.Observable.Operator;
28+
import rx.Observer;
2829
import rx.Subscriber;
2930
import rx.Subscription;
3031
import rx.functions.Action0;
3132
import rx.observables.GroupedObservable;
33+
import rx.observers.SerializedObserver;
3234
import rx.subscriptions.Subscriptions;
3335

3436
public final class OperatorPivot<K1, K2, T> implements Operator<GroupedObservable<K2, GroupedObservable<K1, T>>, GroupedObservable<K1, GroupedObservable<K2, T>>> {
@@ -268,7 +270,7 @@ private Outer<K1, K2, T> getOrCreateOuter(final AtomicReference<State> state, fi
268270
return null;
269271
}
270272

271-
Outer<K1, K2, T> newOuter = Outer.<K1, K2, T> create(this, state, key2);
273+
Outer<K1, K2, T> newOuter = Outer.<K1, K2, T> create(key2);
272274
Outer<K1, K2, T> existing = outerSubjects.putIfAbsent(key2, newOuter);
273275
if (existing != null) {
274276
// we lost the race to create so return the one that did
@@ -283,11 +285,12 @@ private Outer<K1, K2, T> getOrCreateOuter(final AtomicReference<State> state, fi
283285

284286
private static final class Inner<K1, K2, T> {
285287

286-
private final BufferUntilSubscriber<T> subscriber;
288+
private final Observer<T> subscriber;
287289
private final GroupedObservable<K1, T> group;
288290

289291
private Inner(BufferUntilSubscriber<T> subscriber, GroupedObservable<K1, T> group) {
290-
this.subscriber = subscriber;
292+
// since multiple threads are being pivoted we need to make sure this is serialized
293+
this.subscriber = new SerializedObserver<T>(subscriber);
291294
this.group = group;
292295
}
293296

@@ -335,15 +338,16 @@ public void onNext(T t) {
335338

336339
private static final class Outer<K1, K2, T> {
337340

338-
private final BufferUntilSubscriber<GroupedObservable<K1, T>> subscriber;
341+
private final Observer<GroupedObservable<K1, T>> subscriber;
339342
private final GroupedObservable<K2, GroupedObservable<K1, T>> group;
340343

341344
private Outer(BufferUntilSubscriber<GroupedObservable<K1, T>> subscriber, GroupedObservable<K2, GroupedObservable<K1, T>> group) {
342-
this.subscriber = subscriber;
345+
// since multiple threads are being pivoted we need to make sure this is serialized
346+
this.subscriber = new SerializedObserver<GroupedObservable<K1, T>>(subscriber);
343347
this.group = group;
344348
}
345349

346-
public static <K1, K2, T> Outer<K1, K2, T> create(final GroupState<K1, K2, T> groups, final AtomicReference<State> state, final K2 key2) {
350+
public static <K1, K2, T> Outer<K1, K2, T> create(final K2 key2) {
347351
final BufferUntilSubscriber<GroupedObservable<K1, T>> subject = BufferUntilSubscriber.create();
348352
GroupedObservable<K2, GroupedObservable<K1, T>> group = new GroupedObservable<K2, GroupedObservable<K1, T>>(key2, new OnSubscribe<GroupedObservable<K1, T>>() {
349353

rxjava-core/src/test/java/rx/internal/operators/OperatorPivotTest.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,26 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
1716
package rx.internal.operators;
1817

1918
import java.util.Random;
19+
2020
import static org.junit.Assert.assertEquals;
2121
import static org.junit.Assert.assertTrue;
2222
import static org.junit.Assert.fail;
2323

2424
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.Executors;
2526
import java.util.concurrent.TimeUnit;
2627
import java.util.concurrent.atomic.AtomicInteger;
2728

29+
import org.junit.Ignore;
2830
import org.junit.Test;
2931

3032
import rx.Observable;
3133
import rx.Observable.OnSubscribe;
3234
import rx.Observer;
35+
import rx.Scheduler;
3336
import rx.Subscriber;
3437
import rx.functions.Func1;
3538
import rx.observables.GroupedObservable;
@@ -38,10 +41,12 @@
3841

3942
public class OperatorPivotTest {
4043

41-
@Test
44+
@Test(timeout=10000)
4245
public void testPivotEvenAndOdd() throws InterruptedException {
43-
Observable<GroupedObservable<Boolean, Integer>> o1 = Observable.range(1, 10).groupBy(modKeySelector).subscribeOn(Schedulers.newThread());
44-
Observable<GroupedObservable<Boolean, Integer>> o2 = Observable.range(11, 10).groupBy(modKeySelector).subscribeOn(Schedulers.newThread());
46+
for(int i=0; i<1000; i++) {
47+
System.out.println("------------------------------------------ testPivotEvenAndOdd -------------------------------------------");
48+
Observable<GroupedObservable<Boolean, Integer>> o1 = Observable.range(1, 10).groupBy(modKeySelector).subscribeOn(Schedulers.computation());
49+
Observable<GroupedObservable<Boolean, Integer>> o2 = Observable.range(11, 10).groupBy(modKeySelector).subscribeOn(Schedulers.computation());
4550
Observable<GroupedObservable<String, GroupedObservable<Boolean, Integer>>> groups = Observable.from(GroupedObservable.from("o1", o1), GroupedObservable.from("o2", o2));
4651
Observable<GroupedObservable<Boolean, GroupedObservable<String, Integer>>> pivoted = Observable.pivot(groups);
4752

@@ -53,10 +58,12 @@ public void testPivotEvenAndOdd() throws InterruptedException {
5358

5459
@Override
5560
public Observable<String> call(final GroupedObservable<Boolean, GroupedObservable<String, Integer>> outerGroup) {
61+
System.out.println("Outer Group: " + outerGroup.getKey());
5662
return outerGroup.flatMap(new Func1<GroupedObservable<String, Integer>, Observable<String>>() {
5763

5864
@Override
5965
public Observable<String> call(final GroupedObservable<String, Integer> innerGroup) {
66+
System.out.println("Inner Group: " + innerGroup.getKey());
6067
return innerGroup.map(new Func1<Integer, String>() {
6168

6269
@Override
@@ -94,14 +101,16 @@ public void onNext(String t) {
94101

95102
});
96103

97-
if (!latch.await(800, TimeUnit.MILLISECONDS)) {
104+
if (!latch.await(20000000, TimeUnit.MILLISECONDS)) {
98105
System.out.println("xxxxxxxxxxxxxxxxxx> TIMED OUT <xxxxxxxxxxxxxxxxxxxx");
99106
System.out.println("Received count: " + count.get());
100107
fail("Timed Out");
101108
}
102109

103110
System.out.println("Received count: " + count.get());
111+
// TODO sometimes this test fails and gets 15 instead of 20 so there is a bug somewhere
104112
assertEquals(20, count.get());
113+
}
105114
}
106115

107116
/**
@@ -112,7 +121,7 @@ public void onNext(String t) {
112121
* It's NOT easy to understand though, and easy to end up with far more data consumed than expected, because pivot by definition
113122
* is inverting the data so we can not unsubscribe from the parent until all children are done since the top key becomes the leaf once pivoted.
114123
*/
115-
@Test
124+
@Test(timeout=10000)
116125
public void testUnsubscribeFromGroups() throws InterruptedException {
117126
AtomicInteger counter1 = new AtomicInteger();
118127
AtomicInteger counter2 = new AtomicInteger();
@@ -221,7 +230,7 @@ public String call(Integer i) {
221230
*
222231
* Then a subsequent step can merge them if desired and add serialization, such as merge(even.o1, even.o2) to become a serialized "even"
223232
*/
224-
@Test
233+
@Test(timeout=10000)
225234
public void testConcurrencyAndSerialization() throws InterruptedException {
226235
final AtomicInteger maxOuterConcurrency = new AtomicInteger();
227236
final AtomicInteger maxGroupConcurrency = new AtomicInteger();

0 commit comments

Comments
 (0)