diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 64b98dd101..e316722c24 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -17,11 +17,15 @@ package rx.lang.scala import rx.functions.FuncN -import rx.Observable.OnSubscribeFunc import rx.lang.scala.observables.ConnectableObservable import scala.concurrent.duration import java.util import collection.JavaConversions._ +import scala.collection.generic.CanBuildFrom +import scala.annotation.unchecked.uncheckedVariance +import scala.collection.{Iterable, Traversable, immutable} +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag /** @@ -3970,6 +3974,163 @@ trait Observable[+T] } } } + + /** + * Returns an Observable that emits a single item, a collection composed of all the items emitted by + * the source Observable. + * + * Be careful not to use this operator on Observables that emit infinite or very large numbers + * of items, as you do not have the option to unsubscribe. + * + * @tparam Col the collection type to build. + * @return an Observable that emits a single item, a collection containing all of the items emitted by + * the source Observable. + */ + def to[Col[_]](implicit cbf: CanBuildFrom[Nothing, T, Col[T @uncheckedVariance]]): Observable[Col[T @uncheckedVariance]] = { + lift { + (subscriber: Subscriber[Col[T]]) => { + val b = cbf() + Subscriber[T]( + subscriber, + (t: T) => { + b += t: Unit + }, + e => subscriber.onError(e), + () => { + subscriber.onNext(b.result) + subscriber.onCompleted() + } + ) + } + } + } + + /** + * Returns an Observable that emits a single item, a `Traversable` composed of all the items emitted by + * the source Observable. + * + * Be careful not to use this operator on Observables that emit infinite or very large numbers + * of items, as you do not have the option to unsubscribe. + * + * @return an Observable that emits a single item, a `Traversable` containing all of the items emitted by + * the source Observable. + */ + def toTraversable: Observable[Traversable[T]] = to[Traversable] + + /** + * Returns an Observable that emits a single item, a `List` composed of all the items emitted by + * the source Observable. + * + * Be careful not to use this operator on Observables that emit infinite or very large numbers + * of items, as you do not have the option to unsubscribe. + * + * @return an Observable that emits a single item, a `List` containing all of the items emitted by + * the source Observable. + */ + def toList: Observable[List[T]] = to[List] + + /** + * Returns an Observable that emits a single item, an `Iterable` composed of all the items emitted by + * the source Observable. + * + * Be careful not to use this operator on Observables that emit infinite or very large numbers + * of items, as you do not have the option to unsubscribe. + * + * @return an Observable that emits a single item, an `Iterable` containing all of the items emitted by + * the source Observable. + */ + def toIterable: Observable[Iterable[T]] = to[Iterable] + + /** + * Returns an Observable that emits a single item, an `Iterator` composed of all the items emitted by + * the source Observable. + * + * Be careful not to use this operator on Observables that emit infinite or very large numbers + * of items, as you do not have the option to unsubscribe. + * + * @return an Observable that emits a single item, an `Iterator` containing all of the items emitted by + * the source Observable. + */ + def toIterator: Observable[Iterator[T]] = toIterable.map(_.iterator) + + /** + * Returns an Observable that emits a single item, a `Stream` composed of all the items emitted by + * the source Observable. + * + * Be careful not to use this operator on Observables that emit infinite or very large numbers + * of items, as you do not have the option to unsubscribe. + * + * @return an Observable that emits a single item, a `Stream` containing all of the items emitted by + * the source Observable. + */ + def toStream: Observable[Stream[T]] = to[Stream] + + /** + * Returns an Observable that emits a single item, an `IndexedSeq` composed of all the items emitted by + * the source Observable. + * + * Be careful not to use this operator on Observables that emit infinite or very large numbers + * of items, as you do not have the option to unsubscribe. + * + * @return an Observable that emits a single item, an `IndexedSeq` containing all of the items emitted by + * the source Observable. + */ + def toIndexedSeq: Observable[immutable.IndexedSeq[T]] = to[immutable.IndexedSeq] + + /** + * Returns an Observable that emits a single item, a `Vector` composed of all the items emitted by + * the source Observable. + * + * Be careful not to use this operator on Observables that emit infinite or very large numbers + * of items, as you do not have the option to unsubscribe. + * + * @return an Observable that emits a single item, a `Vector` containing all of the items emitted by + * the source Observable. + */ + def toVector: Observable[Vector[T]] = to[Vector] + + /** + * Returns an Observable that emits a single item, a `Buffer` composed of all the items emitted by + * the source Observable. + * + * Be careful not to use this operator on Observables that emit infinite or very large numbers + * of items, as you do not have the option to unsubscribe. + * + * @return an Observable that emits a single item, a `Buffer` containing all of the items emitted by + * the source Observable. + */ + def toBuffer[U >: T]: Observable[mutable.Buffer[U]] = { // use U >: T because Buffer is invariant + val us: Observable[U] = this + us.to[ArrayBuffer] + } + + /** + * Returns an Observable that emits a single item, a `Set` composed of all the items emitted by + * the source Observable. + * + * Be careful not to use this operator on Observables that emit infinite or very large numbers + * of items, as you do not have the option to unsubscribe. + * + * @return an Observable that emits a single item, a `Set` containing all of the items emitted by + * the source Observable. + */ + def toSet[U >: T]: Observable[immutable.Set[U]] = { // use U >: T because Set is invariant + val us: Observable[U] = this + us.to[immutable.Set] + } + + /** + * Returns an Observable that emits a single item, an `Array` composed of all the items emitted by + * the source Observable. + * + * Be careful not to use this operator on Observables that emit infinite or very large numbers + * of items, as you do not have the option to unsubscribe. + * + * @return an Observable that emits a single item, an `Array` containing all of the items emitted by + * the source Observable. + */ + def toArray[U >: T : ClassTag]: Observable[Array[U]] = // use U >: T because Array is invariant + toBuffer[U].map(_.toArray) } /** diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala index b2e257f365..e8d10a2441 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala @@ -286,4 +286,64 @@ class ObservableTests extends JUnitSuite { assertEquals(List("a", "b", "c"), o.toBlocking.toList) assertTrue(called) } + + @Test + def testToTraversable() { + val o = Observable.items(1, 2, 3).toTraversable + assertEquals(Seq(1, 2, 3), o.toBlocking.single) + } + + @Test + def testToList() { + val o = Observable.items(1, 2, 3).toList + assertEquals(Seq(1, 2, 3), o.toBlocking.single) + } + + @Test + def testToIterable() { + val o = Observable.items(1, 2, 3).toIterable + assertEquals(Seq(1, 2, 3), o.toBlocking.single) + } + + @Test + def testToIterator() { + val o = Observable.items(1, 2, 3).toIterator + assertEquals(Seq(1, 2, 3), o.toBlocking.single.toSeq) + } + + @Test + def testToStream() { + val o = Observable.items(1, 2, 3).toStream + assertEquals(Seq(1, 2, 3), o.toBlocking.single) + } + + @Test + def testToIndexedSeq() { + val o = Observable.items(1, 2, 3).toIndexedSeq + assertEquals(Seq(1, 2, 3), o.toBlocking.single) + } + + @Test + def testToBuffer() { + val o = Observable.items(1, 2, 3).toBuffer + assertEquals(Seq(1, 2, 3), o.toBlocking.single) + } + + @Test + def testToSet() { + val o = Observable.items(1, 2, 2).toSet + assertEquals(Set(1, 2), o.toBlocking.single) + } + + @Test + def testToVector() { + val o = Observable.items(1, 2, 3).toVector + assertEquals(Seq(1, 2, 3), o.toBlocking.single) + } + + @Test + def testToArray() { + val o = Observable.items(1, 2, 3).toArray + assertArrayEquals(Array(1, 2, 3), o.toBlocking.single) + } }