Skip to content

Commit 2f842eb

Browse files
Merge pull request ReactiveX#441 from zsxwing/issue-417
Fixed the issue that 'take' does not call 'onError'
2 parents 43a5f76 + e0b50fc commit 2f842eb

File tree

1 file changed

+72
-4
lines changed

1 file changed

+72
-4
lines changed

rxjava-core/src/main/java/rx/operators/OperationTake.java

Lines changed: 72 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,30 @@
1515
*/
1616
package rx.operators;
1717

18-
import static org.junit.Assert.*;
19-
import static org.mockito.Matchers.*;
20-
import static org.mockito.Mockito.*;
18+
import static org.junit.Assert.assertTrue;
19+
import static org.junit.Assert.fail;
20+
import static org.mockito.Matchers.any;
21+
import static org.mockito.Matchers.anyInt;
22+
import static org.mockito.Matchers.anyString;
23+
import static org.mockito.Mockito.inOrder;
24+
import static org.mockito.Mockito.mock;
25+
import static org.mockito.Mockito.never;
26+
import static org.mockito.Mockito.times;
27+
import static org.mockito.Mockito.verify;
28+
import static org.mockito.Mockito.verifyNoMoreInteractions;
2129

2230
import java.util.concurrent.atomic.AtomicBoolean;
2331
import java.util.concurrent.atomic.AtomicInteger;
2432

2533
import org.junit.Test;
34+
import org.mockito.InOrder;
2635

2736
import rx.Observable;
2837
import rx.Observable.OnSubscribeFunc;
2938
import rx.Observer;
3039
import rx.Subscription;
3140
import rx.subscriptions.Subscriptions;
41+
import rx.util.functions.Func1;
3242

3343
/**
3444
* Returns an Observable that emits the first <code>num</code> items emitted by the source
@@ -114,30 +124,47 @@ private class ItemObserver implements Observer<T> {
114124
private final Observer<? super T> observer;
115125

116126
private final AtomicInteger counter = new AtomicInteger();
127+
private volatile boolean hasEmitedError = false;
117128

118129
public ItemObserver(Observer<? super T> observer) {
119130
this.observer = observer;
120131
}
121132

122133
@Override
123134
public void onCompleted() {
135+
if (hasEmitedError) {
136+
return;
137+
}
124138
if (counter.getAndSet(num) < num) {
125139
observer.onCompleted();
126140
}
127141
}
128142

129143
@Override
130144
public void onError(Throwable e) {
145+
if (hasEmitedError) {
146+
return;
147+
}
131148
if (counter.getAndSet(num) < num) {
132149
observer.onError(e);
133150
}
134151
}
135152

136153
@Override
137154
public void onNext(T args) {
155+
if (hasEmitedError) {
156+
return;
157+
}
138158
final int count = counter.incrementAndGet();
139159
if (count <= num) {
140-
observer.onNext(args);
160+
try {
161+
observer.onNext(args);
162+
} catch (Throwable ex) {
163+
hasEmitedError = true;
164+
observer.onError(ex);
165+
subscription.unsubscribe();
166+
return;
167+
}
141168
if (count == num) {
142169
observer.onCompleted();
143170
}
@@ -184,6 +211,47 @@ public void testTake2() {
184211
verify(aObserver, times(1)).onCompleted();
185212
}
186213

214+
@Test(expected = IllegalArgumentException.class)
215+
public void testTakeWithError() {
216+
Observable.from(1, 2, 3).take(1).map(new Func1<Integer, Integer>() {
217+
public Integer call(Integer t1) {
218+
throw new IllegalArgumentException("some error");
219+
}
220+
}).toBlockingObservable().single();
221+
}
222+
223+
@Test
224+
public void testTakeWithErrorHappeningInOnNext() {
225+
Observable<Integer> w = Observable.from(1, 2, 3).take(2).map(new Func1<Integer, Integer>() {
226+
public Integer call(Integer t1) {
227+
throw new IllegalArgumentException("some error");
228+
}
229+
});
230+
231+
@SuppressWarnings("unchecked")
232+
Observer<Integer> observer = mock(Observer.class);
233+
w.subscribe(observer);
234+
InOrder inOrder = inOrder(observer);
235+
inOrder.verify(observer, times(1)).onError(any(IllegalArgumentException.class));
236+
inOrder.verifyNoMoreInteractions();
237+
}
238+
239+
@Test
240+
public void testTakeWithErrorHappeningInTheLastOnNext() {
241+
Observable<Integer> w = Observable.from(1, 2, 3).take(1).map(new Func1<Integer, Integer>() {
242+
public Integer call(Integer t1) {
243+
throw new IllegalArgumentException("some error");
244+
}
245+
});
246+
247+
@SuppressWarnings("unchecked")
248+
Observer<Integer> observer = mock(Observer.class);
249+
w.subscribe(observer);
250+
InOrder inOrder = inOrder(observer);
251+
inOrder.verify(observer, times(1)).onError(any(IllegalArgumentException.class));
252+
inOrder.verifyNoMoreInteractions();
253+
}
254+
187255
@Test
188256
public void testTakeDoesntLeakErrors() {
189257
Observable<String> source = Observable.create(new OnSubscribeFunc<String>()

0 commit comments

Comments
 (0)