diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index 3800a85a21..6b1ffdd258 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -448,6 +448,31 @@ class RxScalaDemo extends JUnitSuite { .toBlockingObservable.foreach(println(_)) } + /** + * This is a bad way of using `zip` with an `Iterable`: even if the consumer unsubscribes, + * some elements may still be pulled from `Iterable`. + */ + @Test def zipWithIterableBadExample() { + val o1 = Observable.interval(100 millis, IOScheduler()).map(_ * 100).take(3) + val o2 = Observable.from(0 until Int.MaxValue).doOnEach(i => println(i + " from o2")) + o1.zip(o2).toBlockingObservable.foreach(println(_)) + } + + /** + * This is a good way of using `zip` with an `Iterable`: if the consumer unsubscribes, + * no more elements will be pulled from `Iterable`. + */ + @Test def zipWithIterableGoodExample() { + val o1 = Observable.interval(100 millis, IOScheduler()).map(_ * 100).take(3) + val iter = (0 until Int.MaxValue).view.map { + i => { + println(i + " from iter") + i + } + } + o1.zip(iter).toBlockingObservable.foreach(println(_)) + } + @Test def takeFirstWithCondition() { val condition: Int => Boolean = _ >= 3 assertEquals(3, List(1, 2, 3, 4).toObservable.filter(condition).first.toBlockingObservable.single) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index c16f9d5af7..4deaab8992 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -336,6 +336,42 @@ trait Observable[+T] zipWith(that, (t: T, u: U) => (t, u)) } + /** + * Returns an Observable formed from `this` Observable and `other` Iterable by combining + * corresponding elements in pairs. + *
+ *
+ *
+ * Note that the `other` Iterable is evaluated as items are observed from the source Observable; it is + * not pre-consumed. This allows you to zip infinite streams on either side. + * + * @param other the Iterable sequence + * @return an Observable that pairs up values from the source Observable and the `other` Iterable. + */ + def zip[U](other: Iterable[U]): Observable[(T, U)] = { + zipWith(other, (t: T, u: U) => (t, u)) + } + + /** + * Returns an Observable that emits items that are the result of applying a specified function to pairs of + * values, one each from the source Observable and a specified Iterable sequence. + *
+ *
+ *
+ * Note that the `other` Iterable is evaluated as items are observed from the source Observable; it is + * not pre-consumed. This allows you to zip infinite streams on either side. + * + * @param other the Iterable sequence + * @param selector a function that combines the pairs of items from the Observable and the Iterable to generate + * the items to be emitted by the resulting Observable + * @return an Observable that pairs up values from the source Observable and the `other` Iterable + * sequence and emits the results of `selector` applied to these pairs + */ + def zipWith[U, R](other: Iterable[U], selector: (T, U) => R): Observable[R] = { + val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]] + toScalaObservable[R](thisJava.zip(other.asJava, selector)) + } + /** * Returns an Observable formed from this Observable and another Observable by combining * corresponding elements using the selector function. @@ -353,7 +389,7 @@ trait Observable[+T] * their index. Indices start at 0. */ def zipWithIndex: Observable[(T, Int)] = { - zip((0 until Int.MaxValue).toObservable) + zip(0 until Int.MaxValue) } /** diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index cd5c6a1d9a..611569493b 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -87,7 +87,9 @@ class CompletenessTest extends JUnitSuite { "firstOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `.filter(condition).firstOrElse(default)`]", "groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]", "groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: GroupedObservable[TKey, T], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, (K, Observable[T]) => Observable[Any])", + "ignoreElements()" -> "[use `filter(_ => false)`]", "lift(Operator[_ <: R, _ >: T])" -> "lift(Subscriber[R] => Subscriber[T])", + "limit(Int)" -> "take(Int)", "mapWithIndex(Func2[_ >: T, Integer, _ <: R])" -> "[combine `zipWithIndex` with `map` or with a for comprehension]", "multicast(Subject[_ >: T, _ <: R])" -> "multicast(Subject[R])", "multicast(Func0[_ <: Subject[_ >: T, _ <: TIntermediate]], Func1[_ >: Observable[TIntermediate], _ <: Observable[TResult]])" -> "multicast(() => Subject[R], Observable[R] => Observable[U])", @@ -118,6 +120,7 @@ class CompletenessTest extends JUnitSuite { "skipWhile(Func1[_ >: T, Boolean])" -> "dropWhile(T => Boolean)", "skipWhileWithIndex(Func2[_ >: T, Integer, Boolean])" -> unnecessary, "skipUntil(Observable[U])" -> "dropUntil(Observable[E])", + "single(Func1[_ >: T, Boolean])" -> "[use `filter(predicate).single`]", "startWith(T)" -> "[use `item +: o`]", "startWith(Array[T])" -> "[use `Observable.items(items) ++ o`]", "startWith(Array[T], Scheduler)" -> "[use `Observable.items(items).subscribeOn(scheduler) ++ o`]", @@ -151,6 +154,8 @@ class CompletenessTest extends JUnitSuite { "toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]", "window(Long, Long, TimeUnit)" -> "window(Duration, Duration)", "window(Long, Long, TimeUnit, Scheduler)" -> "window(Duration, Duration, Scheduler)", + "zip(Observable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Observable[U], (T, U) => R)", + "zip(Iterable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Iterable[U], (T, U) => R)", // manually added entries for Java static methods "average(Observable[Integer])" -> averageProblem,