diff --git a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java index d37f7820b1..bd5f16b571 100644 --- a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java +++ b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java @@ -25,6 +25,7 @@ import rx.Observer; import rx.functions.Action0; import rx.functions.Action1; +import rx.operators.NotificationLite; import rx.subjects.SubjectSubscriptionManager.SubjectObserver; /** @@ -127,7 +128,7 @@ public void onCompleted() { @Override public void call() { - state.history.complete(Notification. createOnCompleted()); + state.history.complete(); } }); if (observers != null) { @@ -145,7 +146,7 @@ public void onError(final Throwable e) { @Override public void call() { - state.history.complete(Notification. createOnError(e)); + state.history.complete(e); } }); if (observers != null) { @@ -159,7 +160,7 @@ public void call() { @Override public void onNext(T v) { - if (state.history.terminalValue.get() != null) { + if (state.history.terminated) { return; } state.history.next(v); @@ -200,12 +201,9 @@ private void replayObserver(SubjectObserver observer) { private static int replayObserverFromIndex(History history, Integer l, SubjectObserver observer) { while (l < history.index.get()) { - observer.onNext(history.list.get(l)); + history.accept(observer, l); l++; } - if (history.terminalValue.get() != null) { - history.terminalValue.get().accept(observer); - } return l; } @@ -217,19 +215,19 @@ private static int replayObserverFromIndex(History history, Integer l, Su * @param */ private static class History { + private final NotificationLite nl = NotificationLite.instance(); private final AtomicInteger index; - private final ArrayList list; - private final AtomicReference> terminalValue; + private final ArrayList list; + private boolean terminated; public History(int initialCapacity) { index = new AtomicInteger(0); - list = new ArrayList(initialCapacity); - terminalValue = new AtomicReference>(); + list = new ArrayList(initialCapacity); } public boolean next(T n) { - if (terminalValue.get() == null) { - list.add(n); + if (!terminated) { + list.add(nl.next(n)); index.getAndIncrement(); return true; } else { @@ -237,8 +235,23 @@ public boolean next(T n) { } } - public void complete(Notification n) { - terminalValue.set(n); + public void accept(Observer o, int idx) { + nl.accept(o, list.get(idx)); + } + + public void complete() { + if (!terminated) { + terminated = true; + list.add(nl.completed()); + index.getAndIncrement(); + } + } + public void complete(Throwable e) { + if (!terminated) { + terminated = true; + list.add(nl.error(e)); + index.getAndIncrement(); + } } } diff --git a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectConcurrencyTest.java b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectConcurrencyTest.java index ced51e9608..354701b041 100644 --- a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectConcurrencyTest.java +++ b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectConcurrencyTest.java @@ -17,9 +17,7 @@ import static org.junit.Assert.assertEquals; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -32,6 +30,8 @@ import rx.Subscriber; import rx.Subscription; import rx.functions.Action1; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; import rx.subscriptions.Subscriptions; public class ReplaySubjectConcurrencyTest { @@ -303,6 +303,21 @@ public void run() { } } + + /** + * https://github.com/Netflix/RxJava/issues/1147 + */ + @Test + public void testRaceForTerminalState() { + final List expected = Arrays.asList(1); + for (int i = 0; i < 100000; i++) { + TestSubscriber ts = new TestSubscriber(); + Observable.just(1).subscribeOn(Schedulers.computation()).cache().subscribe(ts); + ts.awaitTerminalEvent(); + ts.assertReceivedOnNext(expected); + ts.assertTerminalEvent(); + } + } private static class SubjectObserverThread extends Thread {