From 8d75cd456e935da7dc252091898c57c65ff0dde1 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 28 May 2014 21:06:16 +0800 Subject: [PATCH 1/6] RxScala: Add *Option and *OrElse to BlockingObservable --- .../main/scala/rx/lang/scala/Observable.scala | 11 +- .../observables/BlockingObservable.scala | 120 ++++++++++++++---- .../observables/BlockingObservableTest.scala | 116 +++++++++++++++++ 3 files changed, 218 insertions(+), 29 deletions(-) create mode 100644 language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/observables/BlockingObservableTest.scala diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index d3c0621341..9dac2a45ee 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -2862,13 +2862,14 @@ trait Observable[+T] /** * If the source Observable completes after emitting a single item, return an Observable that emits that - * item. If the source Observable emits more than one item or no items, throw an `NoSuchElementException`. + * item. If the source Observable emits more than one item or no items, notify of an `IllegalArgumentException` + * or `NoSuchElementException` respectively. * * * * @return an Observable that emits the single item emitted by the source Observable - * @throws NoSuchElementException - * if the source emits more than one item or no items + * @throws IllegalArgumentException if the source emits more than one item + * @throws NoSuchElementException if the source emits no items * @see RxJava Wiki: single() * @see "MSDN: Observable.singleAsync()" */ @@ -3093,7 +3094,7 @@ trait Observable[+T] */ @deprecated("Use `toBlocking` instead", "0.19") def toBlockingObservable: BlockingObservable[T] = { - new BlockingObservable[T](asJavaObservable.toBlocking) + new BlockingObservable[T](this) } /** @@ -3105,7 +3106,7 @@ trait Observable[+T] * @since 0.19 */ def toBlocking: BlockingObservable[T] = { - new BlockingObservable[T](asJavaObservable.toBlocking) + new BlockingObservable[T](this) } /** diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala index 4ae6a54ce7..836d1c4016 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala @@ -17,18 +17,21 @@ package rx.lang.scala.observables import scala.collection.JavaConverters._ import rx.lang.scala.ImplicitFunctionConversions._ +import rx.lang.scala.Observable +import rx.observables.{BlockingObservable => JBlockingObservable} /** * An Observable that provides blocking operators. * - * You can obtain a BlockingObservable from an Observable using [[rx.lang.scala.Observable.toBlockingObservable]] + * You can obtain a BlockingObservable from an Observable using [[rx.lang.scala.Observable.toBlocking]] */ -// constructor is private because users should use Observable.toBlockingObservable -class BlockingObservable[+T] private[scala] (val asJava: rx.observables.BlockingObservable[_ <: T]) - extends AnyVal +// constructor is private because users should use Observable.toBlocking +class BlockingObservable[+T] private[scala] (val o: Observable[T]) + extends AnyVal { - + // This is def because "field definition is not allowed in value class" + private def asJava: JBlockingObservable[_ <: T] = o.asJavaObservable.toBlocking /** * Invoke a method on each item emitted by the {@link Observable}; block until the Observable * completes. @@ -69,6 +72,34 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking asJava.last : T } + /** + * Returns an `Option` with the last item emitted by the source Observable, + * or `None` if the source Observable completes without emitting any items. + * + * @return an `Option` with the last item emitted by the source Observable, + * or `None` if the source Observable is empty + */ + def lastOption: Option[T] = { + o.lastOption.toBlocking.single + } + + /** + * Returns the last item emitted by the source Observable, or a default item + * if the source Observable completes without emitting any items. + * + * + * + * @param default the default item to emit if the source Observable is empty. + * This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything. + * @return the last item emitted by the source Observable, or a default item if the source Observable is empty + */ + def lastOrElse[U >: T](default: => U): U = { + lastOption match { + case Some(element) => element + case None => default + } + } + /** * Returns the first item emitted by a specified [[Observable]], or * `NoSuchElementException` if source contains no elements. @@ -96,12 +127,32 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking */ def head : T = first - // last -> use toIterable.last - // lastOrDefault -> use toIterable.lastOption - // first -> use toIterable.head - // firstOrDefault -> use toIterable.headOption - // single(predicate) -> use filter and single - // singleOrDefault -> use singleOption + /** + * Returns an `Option` with the very first item emitted by the source Observable, + * or `None` if the source Observable is empty. + * + * @return an `Option` with the very first item from the source, + * or `None` if the source Observable completes without emitting any item. + */ + def headOption: Option[T] = { + o.headOption.toBlocking.single + } + + /** + * Returns the very first item emitted by the source Observable, or a default value if the source Observable is empty. + * + * + * + * @param default The default value to emit if the source Observable doesn't emit anything. + * This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything. + * @return the very first item from the source, or a default value if the source Observable completes without emitting any item. + */ + def headOrElse[U >: T](default: => U): U = { + headOption match { + case Some(element) => element + case None => default + } + } /** * Returns an {@link Iterable} that always returns the item most recently emitted by an {@link Observable}. @@ -130,29 +181,50 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking } /** - * If this {@link Observable} completes after emitting a single item, return that item, - * otherwise throw an exception. - *

- * + * If the source Observable completes after emitting a single item, return that item. If the source Observable + * emits more than one item or no items, notify of an `IllegalArgumentException` or `NoSuchElementException` respectively. * - * @return the single item emitted by the {@link Observable} + * + * + * @return an Observable that emits the single item emitted by the source Observable + * @throws IllegalArgumentException if the source emits more than one item + * @throws NoSuchElementException if the source emits no items */ def single: T = { asJava.single(): T // useless ascription because of compiler bug } /** - * If this {@link Observable} completes after emitting a single item, return an Option containing - * this item, otherwise return {@code None}. + * If the source Observable completes after emitting a single item, return an `Option` with that item; + * if the source Observable is empty, return `None`. If the source Observable emits more than one item, + * throw an `IllegalArgumentException`. + * + * @return an `Option` with the single item emitted by the source Observable, or + * `None` if the source Observable is empty + * @throws IllegalArgumentException if the source Observable emits more than one item */ def singleOption: Option[T] = { - var size: Int = 0 - var last: Option[T] = None - for (t <- toIterable) { - size += 1 - last = Some(t) + o.singleOption.toBlocking.single + } + + /** + * If the source Observable completes after emitting a single item, return that item; + * if the source Observable is empty, return a default item. If the source Observable + * emits more than one item, throw an `IllegalArgumentException`. + * + * + * + * @param default a default value to emit if the source Observable emits no item. + * This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything. + * @return the single item emitted by the source Observable, or a default item if + * the source Observable is empty + * @throws IllegalArgumentException if the source Observable emits more than one item + */ + def singleOrElse[U >: T](default: => U): U = { + singleOption match { + case Some(element) => element + case None => default } - if (size == 1) last else None } // TODO toFuture() diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/observables/BlockingObservableTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/observables/BlockingObservableTest.scala new file mode 100644 index 0000000000..c1da0f400e --- /dev/null +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/observables/BlockingObservableTest.scala @@ -0,0 +1,116 @@ +package rx.lang.scala.observables + +import org.junit.Assert._ +import org.junit.Test +import org.scalatest.junit.JUnitSuite +import scala.language.postfixOps +import rx.lang.scala.Observable + +class BlockingObservableTest extends JUnitSuite { + + @Test + def testSingleOption() { + val o = Observable.items(1) + assertEquals(Some(1), o.toBlocking.singleOption) + } + + @Test + def testSingleOptionWithEmpty() { + val o = Observable.empty + assertEquals(None, o.toBlocking.singleOption) + } + + @Test(expected = classOf[IllegalArgumentException]) + def testSingleOptionWithMultipleItems() { + Observable.items(1, 2).toBlocking.singleOption + } + + @Test + def testSingleOrElse() { + val o = Observable.items(1) + assertEquals(1, o.toBlocking.singleOrElse(2)) + } + + @Test + def testSingleOrElseWithEmpty() { + val o = Observable.empty + assertEquals(2, o.toBlocking.singleOrElse(2)) + } + + @Test(expected = classOf[IllegalArgumentException]) + def testSingleOrElseWithMultipleItems() { + Observable.items(1, 2).toBlocking.singleOrElse(2) + } + + @Test + def testHeadOption() { + val o = Observable.items(1) + assertEquals(Some(1), o.toBlocking.headOption) + } + + @Test + def testHeadOptionWithEmpty() { + val o = Observable.empty + assertEquals(None, o.toBlocking.headOption) + } + + @Test + def testHeadOptionWithMultipleItems() { + val o = Observable.items(1, 2) + assertEquals(Some(1), o.toBlocking.headOption) + } + + @Test + def testHeadOrElse() { + val o = Observable.items(1) + assertEquals(1, o.toBlocking.headOrElse(2)) + } + + @Test + def testHeadOrElseWithEmpty() { + val o = Observable.empty + assertEquals(2, o.toBlocking.headOrElse(2)) + } + + @Test + def testHeadOrElseWithMultipleItems() { + val o = Observable.items(1, 2) + assertEquals(1, o.toBlocking.headOrElse(2)) + } + + @Test + def testLastOption() { + val o = Observable.items(1) + assertEquals(Some(1), o.toBlocking.lastOption) + } + + @Test + def testLastOptionWithEmpty() { + val o = Observable.empty + assertEquals(None, o.toBlocking.lastOption) + } + + @Test + def testLastOptionWithMultipleItems() { + val o = Observable.items(1, 2) + assertEquals(Some(2), o.toBlocking.lastOption) + } + + @Test + def testLastOrElse() { + val o = Observable.items(1) + assertEquals(1, o.toBlocking.lastOrElse(2)) + } + + @Test + def testLastOrElseWithEmpty() { + val o = Observable.empty + assertEquals(2, o.toBlocking.lastOrElse(2)) + } + + @Test + def testLastOrElseWithMultipleItems() { + val o = Observable.items(1, 2) + assertEquals(2, o.toBlocking.lastOrElse(3)) + } +} From 737f780bb02987acb2ad6cd51cf984fbb0ca35b7 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 8 Jun 2014 15:16:52 +0800 Subject: [PATCH 2/6] RxScala: Add latest to BlockingObservable --- .../scala/observables/BlockingObservable.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala index 836d1c4016..652eb823f7 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala @@ -243,6 +243,21 @@ class BlockingObservable[+T] private[scala] (val o: Observable[T]) asJava.toIterable.asScala.toList: List[T] // useless ascription because of compiler bug } + /** + * Returns an `Iterable` that returns the latest item emitted by this `BlockingObservable`, + * waiting if necessary for one to become available. + * + * If this `BlockingObservable` produces items faster than `Iterator.next` takes them, + * `onNext` events might be skipped, but `onError` or `onCompleted` events are not. + * + * Note also that an `onNext` directly followed by `onCompleted` might hide the `onNext` event. + * + * @return an `Iterable` that always returns the latest item emitted by this `BlockingObservable` + */ + def latest: Iterable[T] = { + asJava.latest.asScala: Iterable[T] // useless ascription because of compiler bug + } + } // Cannot yet have inner class because of this error message: From 33e986e6f2b431d48ff6348939b34f9f6ab9d7df Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 8 Jun 2014 15:54:59 +0800 Subject: [PATCH 3/6] RxScala: Add toFuture to BlockingObservable --- .../observables/BlockingObservable.scala | 20 +++++++++-- .../observables/BlockingObservableTest.scala | 36 +++++++++++++++++++ 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala index 652eb823f7..0da0877195 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala @@ -16,6 +16,7 @@ package rx.lang.scala.observables import scala.collection.JavaConverters._ +import scala.concurrent.{Future, Promise} import rx.lang.scala.ImplicitFunctionConversions._ import rx.lang.scala.Observable import rx.observables.{BlockingObservable => JBlockingObservable} @@ -227,8 +228,6 @@ class BlockingObservable[+T] private[scala] (val o: Observable[T]) } } - // TODO toFuture() - /** * Returns an {@link Iterator} that iterates over all items emitted by this {@link Observable}. */ @@ -258,6 +257,23 @@ class BlockingObservable[+T] private[scala] (val o: Observable[T]) asJava.latest.asScala: Iterable[T] // useless ascription because of compiler bug } + /** + * Returns a `Future` representing the single value emitted by this `BlockingObservable`. + * + * `toFuture` throws an `IllegalArgumentException` if the `BlockingObservable` emits more than one item. If the + * `BlockingObservable` may emit more than item, use `BlockingObservable.toList.toFuture`. + * + * `toFuture` throws an `NoSuchElementException` if the `BlockingObservable` is empty. + * + * + * + * @return a `Future` that expects a single item to be emitted by this `BlockingObservable`. + */ + def toFuture: Future[T] = { + val p = Promise[T]() + o.single.subscribe(t => p.success(t), e => p.failure(e)) + p.future + } } // Cannot yet have inner class because of this error message: diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/observables/BlockingObservableTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/observables/BlockingObservableTest.scala index c1da0f400e..d488150f46 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/observables/BlockingObservableTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/observables/BlockingObservableTest.scala @@ -1,5 +1,22 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.lang.scala.observables +import scala.concurrent.Await +import scala.concurrent.duration._ import org.junit.Assert._ import org.junit.Test import org.scalatest.junit.JUnitSuite @@ -113,4 +130,23 @@ class BlockingObservableTest extends JUnitSuite { val o = Observable.items(1, 2) assertEquals(2, o.toBlocking.lastOrElse(3)) } + + @Test + def testToFuture() { + val o = Observable.items(1) + val r = Await.result(o.toBlocking.toFuture, 10 seconds) + assertEquals(1, r) + } + + @Test(expected = classOf[NoSuchElementException]) + def testToFutureWithEmpty() { + val o = Observable.empty + Await.result(o.toBlocking.toFuture, 10 seconds) + } + + @Test(expected = classOf[IllegalArgumentException]) + def testToFutureWithMultipleItems() { + val o = Observable.items(1, 2) + Await.result(o.toBlocking.toFuture, 10 seconds) + } } From 4d0e24638d00c721eda94de662890d6b6df9a20c Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 8 Jun 2014 16:27:25 +0800 Subject: [PATCH 4/6] RxScala: Add examples for BlockingObservable --- .../rx/lang/scala/examples/RxScalaDemo.scala | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index 2c432adf36..f3fa90e916 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -19,6 +19,7 @@ import java.io.IOException import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit +import scala.concurrent.Await import scala.concurrent.duration.Duration import scala.concurrent.duration.DurationInt import scala.concurrent.duration.DurationLong @@ -184,6 +185,28 @@ class RxScalaDemo extends JUnitSuite { assertEquals(squares.toBlockingObservable.toList, List(4, 100, 400, 900)) } + @Test def nextExample() { + val o = Observable.interval(100 millis).take(20) + for(i <- o.toBlocking.next) { + println(i) + Thread.sleep(200) + } + } + + @Test def latestExample() { + val o = Observable.interval(100 millis).take(20) + for(i <- o.toBlocking.latest) { + println(i) + Thread.sleep(200) + } + } + + @Test def toFutureExample() { + val o = Observable.interval(500 millis).take(1) + val r = Await.result(o.toBlocking.toFuture, 2 seconds) + println(r) + } + @Test def testTwoSubscriptionsToOneInterval() { val o = Observable.interval(100 millis).take(8) o.subscribe( From c16b4cbf65c38334070af186b849f05ef765bade Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 8 Jun 2014 21:53:42 +0800 Subject: [PATCH 5/6] Update '***OrElse' to use Option.getOrElse --- .../scala/observables/BlockingObservable.scala | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala index 0da0877195..abeec847f1 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala @@ -95,10 +95,7 @@ class BlockingObservable[+T] private[scala] (val o: Observable[T]) * @return the last item emitted by the source Observable, or a default item if the source Observable is empty */ def lastOrElse[U >: T](default: => U): U = { - lastOption match { - case Some(element) => element - case None => default - } + lastOption getOrElse default } /** @@ -149,10 +146,7 @@ class BlockingObservable[+T] private[scala] (val o: Observable[T]) * @return the very first item from the source, or a default value if the source Observable completes without emitting any item. */ def headOrElse[U >: T](default: => U): U = { - headOption match { - case Some(element) => element - case None => default - } + headOption getOrElse default } /** @@ -222,10 +216,7 @@ class BlockingObservable[+T] private[scala] (val o: Observable[T]) * @throws IllegalArgumentException if the source Observable emits more than one item */ def singleOrElse[U >: T](default: => U): U = { - singleOption match { - case Some(element) => element - case None => default - } + singleOption getOrElse default } /** From 91c8bea857e976c536fc779c64f451d0ee258003 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 12 Jun 2014 22:41:28 +0800 Subject: [PATCH 6/6] Improve docs of 'toFuture' --- .../rx/lang/scala/observables/BlockingObservable.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala index abeec847f1..6dd2ab83ce 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala @@ -251,10 +251,10 @@ class BlockingObservable[+T] private[scala] (val o: Observable[T]) /** * Returns a `Future` representing the single value emitted by this `BlockingObservable`. * - * `toFuture` throws an `IllegalArgumentException` if the `BlockingObservable` emits more than one item. If the - * `BlockingObservable` may emit more than item, use `BlockingObservable.toList.toFuture`. - * - * `toFuture` throws an `NoSuchElementException` if the `BlockingObservable` is empty. + * The returned `Future` will be completed with an `IllegalArgumentException` if the `BlockingObservable` + * emits more than one item. And it will be completed with an `NoSuchElementException` if the `BlockingObservable` + * is empty. Use `Observable.toSeq.toBlocking.toFuture` if you are not sure about the size of `BlockingObservable` + * and do not want to handle these `Exception`s. * * *