diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala
index e0e92c83eb..0bc0416d69 100644
--- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala
+++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala
@@ -19,6 +19,7 @@ import java.io.IOException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
+import scala.concurrent.Await
import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.concurrent.duration.DurationInt
@@ -226,6 +227,28 @@ class RxScalaDemo extends JUnitSuite {
assertEquals(squares.toBlockingObservable.toList, List(4, 100, 400, 900))
}
+ @Test def nextExample() {
+ val o = Observable.interval(100 millis).take(20)
+ for(i <- o.toBlocking.next) {
+ println(i)
+ Thread.sleep(200)
+ }
+ }
+
+ @Test def latestExample() {
+ val o = Observable.interval(100 millis).take(20)
+ for(i <- o.toBlocking.latest) {
+ println(i)
+ Thread.sleep(200)
+ }
+ }
+
+ @Test def toFutureExample() {
+ val o = Observable.interval(500 millis).take(1)
+ val r = Await.result(o.toBlocking.toFuture, 2 seconds)
+ println(r)
+ }
+
@Test def testTwoSubscriptionsToOneInterval() {
val o = Observable.interval(100 millis).take(8)
o.subscribe(
diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala
index 0d8ddcda20..1db9b85730 100644
--- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala
+++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala
@@ -2985,13 +2985,14 @@ trait Observable[+T]
/**
* If the source Observable completes after emitting a single item, return an Observable that emits that
- * item. If the source Observable emits more than one item or no items, throw an `NoSuchElementException`.
+ * item. If the source Observable emits more than one item or no items, notify of an `IllegalArgumentException`
+ * or `NoSuchElementException` respectively.
*
*
*
* @return an Observable that emits the single item emitted by the source Observable
- * @throws NoSuchElementException
- * if the source emits more than one item or no items
+ * @throws IllegalArgumentException if the source emits more than one item
+ * @throws NoSuchElementException if the source emits no items
* @see RxJava Wiki: single()
* @see "MSDN: Observable.singleAsync()"
*/
@@ -3252,7 +3253,7 @@ trait Observable[+T]
*/
@deprecated("Use `toBlocking` instead", "0.19")
def toBlockingObservable: BlockingObservable[T] = {
- new BlockingObservable[T](asJavaObservable.toBlocking)
+ new BlockingObservable[T](this)
}
/**
@@ -3264,7 +3265,7 @@ trait Observable[+T]
* @since 0.19
*/
def toBlocking: BlockingObservable[T] = {
- new BlockingObservable[T](asJavaObservable.toBlocking)
+ new BlockingObservable[T](this)
}
/**
diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala
index 4ae6a54ce7..6dd2ab83ce 100644
--- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala
+++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala
@@ -16,19 +16,23 @@
package rx.lang.scala.observables
import scala.collection.JavaConverters._
+import scala.concurrent.{Future, Promise}
import rx.lang.scala.ImplicitFunctionConversions._
+import rx.lang.scala.Observable
+import rx.observables.{BlockingObservable => JBlockingObservable}
/**
* An Observable that provides blocking operators.
*
- * You can obtain a BlockingObservable from an Observable using [[rx.lang.scala.Observable.toBlockingObservable]]
+ * You can obtain a BlockingObservable from an Observable using [[rx.lang.scala.Observable.toBlocking]]
*/
-// constructor is private because users should use Observable.toBlockingObservable
-class BlockingObservable[+T] private[scala] (val asJava: rx.observables.BlockingObservable[_ <: T])
- extends AnyVal
+// constructor is private because users should use Observable.toBlocking
+class BlockingObservable[+T] private[scala] (val o: Observable[T])
+ extends AnyVal
{
-
+ // This is def because "field definition is not allowed in value class"
+ private def asJava: JBlockingObservable[_ <: T] = o.asJavaObservable.toBlocking
/**
* Invoke a method on each item emitted by the {@link Observable}; block until the Observable
* completes.
@@ -69,6 +73,31 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking
asJava.last : T
}
+ /**
+ * Returns an `Option` with the last item emitted by the source Observable,
+ * or `None` if the source Observable completes without emitting any items.
+ *
+ * @return an `Option` with the last item emitted by the source Observable,
+ * or `None` if the source Observable is empty
+ */
+ def lastOption: Option[T] = {
+ o.lastOption.toBlocking.single
+ }
+
+ /**
+ * Returns the last item emitted by the source Observable, or a default item
+ * if the source Observable completes without emitting any items.
+ *
+ *
+ *
+ * @param default the default item to emit if the source Observable is empty.
+ * This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
+ * @return the last item emitted by the source Observable, or a default item if the source Observable is empty
+ */
+ def lastOrElse[U >: T](default: => U): U = {
+ lastOption getOrElse default
+ }
+
/**
* Returns the first item emitted by a specified [[Observable]], or
* `NoSuchElementException` if source contains no elements.
@@ -96,12 +125,29 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking
*/
def head : T = first
- // last -> use toIterable.last
- // lastOrDefault -> use toIterable.lastOption
- // first -> use toIterable.head
- // firstOrDefault -> use toIterable.headOption
- // single(predicate) -> use filter and single
- // singleOrDefault -> use singleOption
+ /**
+ * Returns an `Option` with the very first item emitted by the source Observable,
+ * or `None` if the source Observable is empty.
+ *
+ * @return an `Option` with the very first item from the source,
+ * or `None` if the source Observable completes without emitting any item.
+ */
+ def headOption: Option[T] = {
+ o.headOption.toBlocking.single
+ }
+
+ /**
+ * Returns the very first item emitted by the source Observable, or a default value if the source Observable is empty.
+ *
+ *
+ *
+ * @param default The default value to emit if the source Observable doesn't emit anything.
+ * This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
+ * @return the very first item from the source, or a default value if the source Observable completes without emitting any item.
+ */
+ def headOrElse[U >: T](default: => U): U = {
+ headOption getOrElse default
+ }
/**
* Returns an {@link Iterable} that always returns the item most recently emitted by an {@link Observable}.
@@ -130,32 +176,48 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking
}
/**
- * If this {@link Observable} completes after emitting a single item, return that item,
- * otherwise throw an exception.
- *
- *
+ * If the source Observable completes after emitting a single item, return that item. If the source Observable
+ * emits more than one item or no items, notify of an `IllegalArgumentException` or `NoSuchElementException` respectively.
+ *
+ *
*
- * @return the single item emitted by the {@link Observable}
+ * @return an Observable that emits the single item emitted by the source Observable
+ * @throws IllegalArgumentException if the source emits more than one item
+ * @throws NoSuchElementException if the source emits no items
*/
def single: T = {
asJava.single(): T // useless ascription because of compiler bug
}
/**
- * If this {@link Observable} completes after emitting a single item, return an Option containing
- * this item, otherwise return {@code None}.
+ * If the source Observable completes after emitting a single item, return an `Option` with that item;
+ * if the source Observable is empty, return `None`. If the source Observable emits more than one item,
+ * throw an `IllegalArgumentException`.
+ *
+ * @return an `Option` with the single item emitted by the source Observable, or
+ * `None` if the source Observable is empty
+ * @throws IllegalArgumentException if the source Observable emits more than one item
*/
def singleOption: Option[T] = {
- var size: Int = 0
- var last: Option[T] = None
- for (t <- toIterable) {
- size += 1
- last = Some(t)
- }
- if (size == 1) last else None
+ o.singleOption.toBlocking.single
}
- // TODO toFuture()
+ /**
+ * If the source Observable completes after emitting a single item, return that item;
+ * if the source Observable is empty, return a default item. If the source Observable
+ * emits more than one item, throw an `IllegalArgumentException`.
+ *
+ *
+ *
+ * @param default a default value to emit if the source Observable emits no item.
+ * This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
+ * @return the single item emitted by the source Observable, or a default item if
+ * the source Observable is empty
+ * @throws IllegalArgumentException if the source Observable emits more than one item
+ */
+ def singleOrElse[U >: T](default: => U): U = {
+ singleOption getOrElse default
+ }
/**
* Returns an {@link Iterator} that iterates over all items emitted by this {@link Observable}.
@@ -171,6 +233,38 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking
asJava.toIterable.asScala.toList: List[T] // useless ascription because of compiler bug
}
+ /**
+ * Returns an `Iterable` that returns the latest item emitted by this `BlockingObservable`,
+ * waiting if necessary for one to become available.
+ *
+ * If this `BlockingObservable` produces items faster than `Iterator.next` takes them,
+ * `onNext` events might be skipped, but `onError` or `onCompleted` events are not.
+ *
+ * Note also that an `onNext` directly followed by `onCompleted` might hide the `onNext` event.
+ *
+ * @return an `Iterable` that always returns the latest item emitted by this `BlockingObservable`
+ */
+ def latest: Iterable[T] = {
+ asJava.latest.asScala: Iterable[T] // useless ascription because of compiler bug
+ }
+
+ /**
+ * Returns a `Future` representing the single value emitted by this `BlockingObservable`.
+ *
+ * The returned `Future` will be completed with an `IllegalArgumentException` if the `BlockingObservable`
+ * emits more than one item. And it will be completed with an `NoSuchElementException` if the `BlockingObservable`
+ * is empty. Use `Observable.toSeq.toBlocking.toFuture` if you are not sure about the size of `BlockingObservable`
+ * and do not want to handle these `Exception`s.
+ *
+ *
+ *
+ * @return a `Future` that expects a single item to be emitted by this `BlockingObservable`.
+ */
+ def toFuture: Future[T] = {
+ val p = Promise[T]()
+ o.single.subscribe(t => p.success(t), e => p.failure(e))
+ p.future
+ }
}
// Cannot yet have inner class because of this error message:
diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/observables/BlockingObservableTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/observables/BlockingObservableTest.scala
new file mode 100644
index 0000000000..d488150f46
--- /dev/null
+++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/observables/BlockingObservableTest.scala
@@ -0,0 +1,152 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.lang.scala.observables
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import org.junit.Assert._
+import org.junit.Test
+import org.scalatest.junit.JUnitSuite
+import scala.language.postfixOps
+import rx.lang.scala.Observable
+
+class BlockingObservableTest extends JUnitSuite {
+
+ @Test
+ def testSingleOption() {
+ val o = Observable.items(1)
+ assertEquals(Some(1), o.toBlocking.singleOption)
+ }
+
+ @Test
+ def testSingleOptionWithEmpty() {
+ val o = Observable.empty
+ assertEquals(None, o.toBlocking.singleOption)
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testSingleOptionWithMultipleItems() {
+ Observable.items(1, 2).toBlocking.singleOption
+ }
+
+ @Test
+ def testSingleOrElse() {
+ val o = Observable.items(1)
+ assertEquals(1, o.toBlocking.singleOrElse(2))
+ }
+
+ @Test
+ def testSingleOrElseWithEmpty() {
+ val o = Observable.empty
+ assertEquals(2, o.toBlocking.singleOrElse(2))
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testSingleOrElseWithMultipleItems() {
+ Observable.items(1, 2).toBlocking.singleOrElse(2)
+ }
+
+ @Test
+ def testHeadOption() {
+ val o = Observable.items(1)
+ assertEquals(Some(1), o.toBlocking.headOption)
+ }
+
+ @Test
+ def testHeadOptionWithEmpty() {
+ val o = Observable.empty
+ assertEquals(None, o.toBlocking.headOption)
+ }
+
+ @Test
+ def testHeadOptionWithMultipleItems() {
+ val o = Observable.items(1, 2)
+ assertEquals(Some(1), o.toBlocking.headOption)
+ }
+
+ @Test
+ def testHeadOrElse() {
+ val o = Observable.items(1)
+ assertEquals(1, o.toBlocking.headOrElse(2))
+ }
+
+ @Test
+ def testHeadOrElseWithEmpty() {
+ val o = Observable.empty
+ assertEquals(2, o.toBlocking.headOrElse(2))
+ }
+
+ @Test
+ def testHeadOrElseWithMultipleItems() {
+ val o = Observable.items(1, 2)
+ assertEquals(1, o.toBlocking.headOrElse(2))
+ }
+
+ @Test
+ def testLastOption() {
+ val o = Observable.items(1)
+ assertEquals(Some(1), o.toBlocking.lastOption)
+ }
+
+ @Test
+ def testLastOptionWithEmpty() {
+ val o = Observable.empty
+ assertEquals(None, o.toBlocking.lastOption)
+ }
+
+ @Test
+ def testLastOptionWithMultipleItems() {
+ val o = Observable.items(1, 2)
+ assertEquals(Some(2), o.toBlocking.lastOption)
+ }
+
+ @Test
+ def testLastOrElse() {
+ val o = Observable.items(1)
+ assertEquals(1, o.toBlocking.lastOrElse(2))
+ }
+
+ @Test
+ def testLastOrElseWithEmpty() {
+ val o = Observable.empty
+ assertEquals(2, o.toBlocking.lastOrElse(2))
+ }
+
+ @Test
+ def testLastOrElseWithMultipleItems() {
+ val o = Observable.items(1, 2)
+ assertEquals(2, o.toBlocking.lastOrElse(3))
+ }
+
+ @Test
+ def testToFuture() {
+ val o = Observable.items(1)
+ val r = Await.result(o.toBlocking.toFuture, 10 seconds)
+ assertEquals(1, r)
+ }
+
+ @Test(expected = classOf[NoSuchElementException])
+ def testToFutureWithEmpty() {
+ val o = Observable.empty
+ Await.result(o.toBlocking.toFuture, 10 seconds)
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testToFutureWithMultipleItems() {
+ val o = Observable.items(1, 2)
+ Await.result(o.toBlocking.toFuture, 10 seconds)
+ }
+}