diff --git a/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java b/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java index dfade9ea49..f4db4baa5f 100644 --- a/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java +++ b/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java @@ -19,11 +19,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import rx.Observable; import rx.Observer; import rx.Subscriber; import rx.functions.Action0; import rx.observers.Subscribers; +import rx.subjects.Subject; import rx.subscriptions.Subscriptions; /** @@ -44,7 +44,7 @@ * * @param */ -public class BufferUntilSubscriber extends Observable implements Observer { +public class BufferUntilSubscriber extends Subject { public static BufferUntilSubscriber create() { State state = new State(); @@ -122,7 +122,7 @@ public void onNext(T t) { * It will then immediately swap itself out for the actual (after a single notification), but since this is now * being done on the same producer thread no further buffering will occur. */ - private static class PassThruObserver extends Subscriber { + private static final class PassThruObserver extends Subscriber { private final Observer actual; // this assumes single threaded synchronous notifications (the Rx contract for a single Observer) @@ -166,7 +166,7 @@ private void drainIfNeededAndSwitchToActual() { } - private static class BufferedObserver extends Subscriber { + private static final class BufferedObserver extends Subscriber { private final ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue(); private final NotificationLite nl = NotificationLite.instance(); diff --git a/rxjava-core/src/main/java/rx/operators/OperatorGroupByUntil.java b/rxjava-core/src/main/java/rx/operators/OperatorGroupByUntil.java index caecd15440..d40ca81d93 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorGroupByUntil.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorGroupByUntil.java @@ -197,7 +197,7 @@ public void onCompleted() { public static final class GroupSubject extends Subscriber { static GroupSubject create(K key) { - PublishSubject publish = PublishSubject.create(); + Subject publish = BufferUntilSubscriber.create(); return new GroupSubject(key, publish); } diff --git a/rxjava-core/src/test/java/rx/operators/OperatorGroupByUntilTest.java b/rxjava-core/src/test/java/rx/operators/OperatorGroupByUntilTest.java index b21121ecb6..e252c79f68 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorGroupByUntilTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorGroupByUntilTest.java @@ -31,12 +31,15 @@ import org.junit.Before; import org.junit.Test; import org.mockito.InOrder; +import static org.mockito.Matchers.anyInt; import org.mockito.Mock; +import static org.mockito.Mockito.mock; import org.mockito.MockitoAnnotations; import rx.Observable; import rx.Observer; import rx.Subscriber; +import rx.exceptions.TestException; import rx.functions.Action1; import rx.functions.Func1; import rx.functions.Functions; @@ -283,14 +286,44 @@ public void call(GroupedObservable t1) { inner.get().subscribe(observer); - verify(observer, times(1)).onCompleted(); + verify(observer).onNext(0); + verify(observer).onCompleted(); verify(observer, never()).onError(any(Throwable.class)); - verify(observer, never()).onNext(any()); + } + + @Test + public void innerEscapeCompletedTwice() { + Observable source = Observable.from(0); + + final AtomicReference> inner = new AtomicReference>(); + + Func1, Observable> duration = just(Observable.never()); + + Observable> m = source.groupByUntil(identity, dbl, duration); + + m.subscribe(new Action1>() { + @Override + public void call(GroupedObservable t1) { + inner.set(t1); + } + }); + + inner.get().subscribe(observer); + + @SuppressWarnings("unchecked") + Observer o2 = mock(Observer.class); + + inner.get().subscribe(o2); + + verify(o2, never()).onCompleted(); + verify(o2, never()).onNext(anyInt()); + verify(o2).onError(any(IllegalStateException.class)); } @Test public void innerEscapeError() { - Observable source = Observable.concat(Observable.from(0), Observable. error(new RuntimeException("Forced failure"))); + Observable source = Observable.concat(Observable.from(0), Observable. error( + new TestException("Forced failure"))); final AtomicReference> inner = new AtomicReference>(); @@ -316,8 +349,47 @@ public void onCompleted() { inner.get().subscribe(observer); - verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer).onNext(0); + verify(observer).onError(any(TestException.class)); verify(observer, never()).onCompleted(); - verify(observer, never()).onNext(any()); + } + + @Test + public void innerEscapeErrorTwice() { + Observable source = Observable.concat(Observable.from(0), Observable. error( + new TestException("Forced failure"))); + + final AtomicReference> inner = new AtomicReference>(); + + Func1, Observable> duration = just(Observable.never()); + + Observable> m = source.groupByUntil(identity, dbl, duration); + + m.subscribe(new Subscriber>() { + @Override + public void onNext(GroupedObservable t1) { + inner.set(t1); + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onCompleted() { + } + + }); + + inner.get().subscribe(observer); + + @SuppressWarnings("unchecked") + Observer o2 = mock(Observer.class); + + inner.get().subscribe(o2); + + verify(o2, never()).onCompleted(); + verify(o2, never()).onNext(anyInt()); + verify(o2).onError(any(IllegalStateException.class)); } } \ No newline at end of file