Skip to content

Commit 461730f

Browse files
Merge pull request #1177 from akarnokd/GroupByUntilTimeGap
GroupByUntil to use BufferUntilSubscriber
2 parents cbd09d7 + f15a6dd commit 461730f

File tree

3 files changed

+82
-10
lines changed

3 files changed

+82
-10
lines changed

rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
import java.util.concurrent.atomic.AtomicBoolean;
2020
import java.util.concurrent.atomic.AtomicReference;
2121

22-
import rx.Observable;
2322
import rx.Observer;
2423
import rx.Subscriber;
2524
import rx.functions.Action0;
2625
import rx.observers.Subscribers;
26+
import rx.subjects.Subject;
2727
import rx.subscriptions.Subscriptions;
2828

2929
/**
@@ -44,7 +44,7 @@
4444
*
4545
* @param <T>
4646
*/
47-
public class BufferUntilSubscriber<T> extends Observable<T> implements Observer<T> {
47+
public class BufferUntilSubscriber<T> extends Subject<T, T> {
4848

4949
public static <T> BufferUntilSubscriber<T> create() {
5050
State<T> state = new State<T>();
@@ -122,7 +122,7 @@ public void onNext(T t) {
122122
* It will then immediately swap itself out for the actual (after a single notification), but since this is now
123123
* being done on the same producer thread no further buffering will occur.
124124
*/
125-
private static class PassThruObserver<T> extends Subscriber<T> {
125+
private static final class PassThruObserver<T> extends Subscriber<T> {
126126

127127
private final Observer<? super T> actual;
128128
// this assumes single threaded synchronous notifications (the Rx contract for a single Observer)
@@ -166,7 +166,7 @@ private void drainIfNeededAndSwitchToActual() {
166166

167167
}
168168

169-
private static class BufferedObserver<T> extends Subscriber<T> {
169+
private static final class BufferedObserver<T> extends Subscriber<T> {
170170
private final ConcurrentLinkedQueue<Object> buffer = new ConcurrentLinkedQueue<Object>();
171171
private final NotificationLite<T> nl = NotificationLite.instance();
172172

rxjava-core/src/main/java/rx/operators/OperatorGroupByUntil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ public void onCompleted() {
197197
public static final class GroupSubject<K, R> extends Subscriber<R> {
198198

199199
static <K, R> GroupSubject<K, R> create(K key) {
200-
PublishSubject<R> publish = PublishSubject.create();
200+
Subject<R, R> publish = BufferUntilSubscriber.create();
201201
return new GroupSubject<K, R>(key, publish);
202202
}
203203

rxjava-core/src/test/java/rx/operators/OperatorGroupByUntilTest.java

Lines changed: 77 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,15 @@
3131
import org.junit.Before;
3232
import org.junit.Test;
3333
import org.mockito.InOrder;
34+
import static org.mockito.Matchers.anyInt;
3435
import org.mockito.Mock;
36+
import static org.mockito.Mockito.mock;
3537
import org.mockito.MockitoAnnotations;
3638

3739
import rx.Observable;
3840
import rx.Observer;
3941
import rx.Subscriber;
42+
import rx.exceptions.TestException;
4043
import rx.functions.Action1;
4144
import rx.functions.Func1;
4245
import rx.functions.Functions;
@@ -283,14 +286,44 @@ public void call(GroupedObservable<Integer, Integer> t1) {
283286

284287
inner.get().subscribe(observer);
285288

286-
verify(observer, times(1)).onCompleted();
289+
verify(observer).onNext(0);
290+
verify(observer).onCompleted();
287291
verify(observer, never()).onError(any(Throwable.class));
288-
verify(observer, never()).onNext(any());
292+
}
293+
294+
@Test
295+
public void innerEscapeCompletedTwice() {
296+
Observable<Integer> source = Observable.from(0);
297+
298+
final AtomicReference<GroupedObservable<Integer, Integer>> inner = new AtomicReference<GroupedObservable<Integer, Integer>>();
299+
300+
Func1<GroupedObservable<Integer, Integer>, Observable<Object>> duration = just(Observable.never());
301+
302+
Observable<GroupedObservable<Integer, Integer>> m = source.groupByUntil(identity, dbl, duration);
303+
304+
m.subscribe(new Action1<GroupedObservable<Integer, Integer>>() {
305+
@Override
306+
public void call(GroupedObservable<Integer, Integer> t1) {
307+
inner.set(t1);
308+
}
309+
});
310+
311+
inner.get().subscribe(observer);
312+
313+
@SuppressWarnings("unchecked")
314+
Observer<Integer> o2 = mock(Observer.class);
315+
316+
inner.get().subscribe(o2);
317+
318+
verify(o2, never()).onCompleted();
319+
verify(o2, never()).onNext(anyInt());
320+
verify(o2).onError(any(IllegalStateException.class));
289321
}
290322

291323
@Test
292324
public void innerEscapeError() {
293-
Observable<Integer> source = Observable.concat(Observable.from(0), Observable.<Integer> error(new RuntimeException("Forced failure")));
325+
Observable<Integer> source = Observable.concat(Observable.from(0), Observable.<Integer> error(
326+
new TestException("Forced failure")));
294327

295328
final AtomicReference<GroupedObservable<Integer, Integer>> inner = new AtomicReference<GroupedObservable<Integer, Integer>>();
296329

@@ -316,8 +349,47 @@ public void onCompleted() {
316349

317350
inner.get().subscribe(observer);
318351

319-
verify(observer, times(1)).onError(any(Throwable.class));
352+
verify(observer).onNext(0);
353+
verify(observer).onError(any(TestException.class));
320354
verify(observer, never()).onCompleted();
321-
verify(observer, never()).onNext(any());
355+
}
356+
357+
@Test
358+
public void innerEscapeErrorTwice() {
359+
Observable<Integer> source = Observable.concat(Observable.from(0), Observable.<Integer> error(
360+
new TestException("Forced failure")));
361+
362+
final AtomicReference<GroupedObservable<Integer, Integer>> inner = new AtomicReference<GroupedObservable<Integer, Integer>>();
363+
364+
Func1<GroupedObservable<Integer, Integer>, Observable<Object>> duration = just(Observable.never());
365+
366+
Observable<GroupedObservable<Integer, Integer>> m = source.groupByUntil(identity, dbl, duration);
367+
368+
m.subscribe(new Subscriber<GroupedObservable<Integer, Integer>>() {
369+
@Override
370+
public void onNext(GroupedObservable<Integer, Integer> t1) {
371+
inner.set(t1);
372+
}
373+
374+
@Override
375+
public void onError(Throwable e) {
376+
}
377+
378+
@Override
379+
public void onCompleted() {
380+
}
381+
382+
});
383+
384+
inner.get().subscribe(observer);
385+
386+
@SuppressWarnings("unchecked")
387+
Observer<Integer> o2 = mock(Observer.class);
388+
389+
inner.get().subscribe(o2);
390+
391+
verify(o2, never()).onCompleted();
392+
verify(o2, never()).onNext(anyInt());
393+
verify(o2).onError(any(IllegalStateException.class));
322394
}
323395
}

0 commit comments

Comments
 (0)