Skip to content

Commit a34cba2

Browse files
Merge pull request #1260 from zsxwing/bo-singleOrDefault-bug
Merge pull request #1260
2 parents ecedbab + 1560ea8 commit a34cba2

File tree

2 files changed

+35
-11
lines changed

2 files changed

+35
-11
lines changed

rxjava-core/src/main/java/rx/observables/BlockingObservable.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import rx.Subscriber;
2626
import rx.functions.Action1;
2727
import rx.functions.Func1;
28+
import rx.functions.Functions;
2829
import rx.operators.BlockingOperatorLatest;
2930
import rx.operators.BlockingOperatorMostRecent;
3031
import rx.operators.BlockingOperatorNext;
@@ -381,17 +382,7 @@ public T single(Func1<? super T, Boolean> predicate) {
381382
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.singleordefault.aspx">MSDN: Observable.SingleOrDefault</a>
382383
*/
383384
public T singleOrDefault(T defaultValue) {
384-
Iterator<? extends T> it = this.toIterable().iterator();
385-
386-
if (!it.hasNext()) {
387-
return defaultValue;
388-
}
389-
390-
T result = it.next();
391-
if (it.hasNext()) {
392-
throw new IllegalArgumentException("Sequence contains too many elements");
393-
}
394-
return result;
385+
return from(o.map(Functions.<T>identity()).singleOrDefault(defaultValue)).single();
395386
}
396387

397388
/**

rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@
1616
package rx.observables;
1717

1818
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertTrue;
1920
import static org.junit.Assert.fail;
2021

2122
import java.util.Iterator;
2223
import java.util.NoSuchElementException;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.TimeUnit;
2326

2427
import org.junit.Assert;
2528
import org.junit.Before;
@@ -28,10 +31,14 @@
2831
import org.mockito.MockitoAnnotations;
2932

3033
import rx.Observable;
34+
import rx.Observable.OnSubscribe;
3135
import rx.Subscriber;
3236
import rx.exceptions.TestException;
37+
import rx.functions.Action0;
3338
import rx.functions.Action1;
3439
import rx.functions.Func1;
40+
import rx.schedulers.Schedulers;
41+
import rx.subscriptions.Subscriptions;
3542

3643
public class BlockingObservableTest {
3744

@@ -377,4 +384,30 @@ public Boolean call(String args) {
377384
});
378385
assertEquals("default", first);
379386
}
387+
388+
@Test
389+
public void testSingleOrDefaultUnsubscribe() throws InterruptedException {
390+
final CountDownLatch unsubscribe = new CountDownLatch(1);
391+
Observable<Integer> o = Observable.create(new OnSubscribe<Integer>() {
392+
@Override
393+
public void call(Subscriber<? super Integer> subscriber) {
394+
subscriber.add(Subscriptions.create(new Action0() {
395+
@Override
396+
public void call() {
397+
unsubscribe.countDown();
398+
}
399+
}));
400+
subscriber.onNext(1);
401+
subscriber.onNext(2);
402+
// Don't call `onCompleted` to emulate an infinite stream
403+
}
404+
}).subscribeOn(Schedulers.newThread());
405+
try {
406+
o.toBlocking().singleOrDefault(-1);
407+
fail("Expected IllegalArgumentException because there are 2 elements");
408+
} catch (IllegalArgumentException e) {
409+
// Expected
410+
}
411+
assertTrue("Timeout means `unsubscribe` is not called", unsubscribe.await(30, TimeUnit.SECONDS));
412+
}
380413
}

0 commit comments

Comments
 (0)