diff --git a/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java b/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java index 7a66ad3cdf..7efbafb64b 100644 --- a/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java +++ b/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java @@ -235,19 +235,25 @@ protected static class SubjectObserver implements Observer { private final Observer actual; protected volatile boolean caughtUp = false; - + boolean once = true; SubjectObserver(Observer actual) { this.actual = actual; } @Override public void onCompleted() { - this.actual.onCompleted(); + if (once) { + once = false; + this.actual.onCompleted(); + } } @Override public void onError(Throwable e) { - this.actual.onError(e); + if (once) { + once = false; + this.actual.onError(e); + } } @Override diff --git a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java index e0726b33e6..dd018abd72 100644 --- a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java +++ b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java @@ -403,4 +403,38 @@ public void onCompleted() { inOrder.verify(o).onCompleted(); verify(o, never()).onError(any(Throwable.class)); } - }} + } + @Test + public void testTerminateOnce() { + ReplaySubject source = ReplaySubject.create(); + source.onNext(1); + source.onNext(2); + source.onCompleted(); + + @SuppressWarnings("unchecked") + final Observer o = mock(Observer.class); + + source.unsafeSubscribe(new Subscriber() { + + @Override + public void onNext(Integer t) { + o.onNext(t); + } + + @Override + public void onError(Throwable e) { + o.onError(e); + } + + @Override + public void onCompleted() { + o.onCompleted(); + } + }); + + verify(o).onNext(1); + verify(o).onNext(2); + verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } +}