From 19e177311250aaa26f00258b30502ecafa892bbd Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 22 May 2014 23:13:00 +0800 Subject: [PATCH 1/2] Add zip(iterable, selector) to RxScala; resolve the issue of zipWithIndex; update CompletenessTest.scala --- .../main/scala/rx/lang/scala/Observable.scala | 22 ++++++++++++++++++- .../rx/lang/scala/CompletenessTest.scala | 5 +++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index c16f9d5af7..db7f05da31 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -336,6 +336,26 @@ trait Observable[+T] zipWith(that, (t: T, u: U) => (t, u)) } + /** + * Returns an Observable that emits items that are the result of applying a specified function to pairs of + * values, one each from the source Observable and a specified Iterable sequence. + *

+ * + *

+ * Note that the `other` Iterable is evaluated as items are observed from the source Observable; it is + * not pre-consumed. This allows you to zip infinite streams on either side. + * + * @param other the Iterable sequence + * @param selector a function that combines the pairs of items from the Observable and the Iterable to generate + * the items to be emitted by the resulting Observable + * @return an Observable that pairs up values from the source Observable and the `other` Iterable + * sequence and emits the results of `selector` applied to these pairs + */ + def zip[U, R](other: Iterable[U], selector: (T, U) => R): Observable[R] = { + val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]] + toScalaObservable[R](thisJava.zip(other.asJava, selector)) + } + /** * Returns an Observable formed from this Observable and another Observable by combining * corresponding elements using the selector function. @@ -353,7 +373,7 @@ trait Observable[+T] * their index. Indices start at 0. */ def zipWithIndex: Observable[(T, Int)] = { - zip((0 until Int.MaxValue).toObservable) + zip((0 until Int.MaxValue), (t: T, index: Int) => (t, index)) } /** diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index cd5c6a1d9a..d818f6c706 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -87,7 +87,9 @@ class CompletenessTest extends JUnitSuite { "firstOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `.filter(condition).firstOrElse(default)`]", "groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]", "groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: GroupedObservable[TKey, T], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, (K, Observable[T]) => Observable[Any])", + "ignoreElements()" -> "[use `filter(_ => false)`]", "lift(Operator[_ <: R, _ >: T])" -> "lift(Subscriber[R] => Subscriber[T])", + "limit(Int)" -> "take(Int)", "mapWithIndex(Func2[_ >: T, Integer, _ <: R])" -> "[combine `zipWithIndex` with `map` or with a for comprehension]", "multicast(Subject[_ >: T, _ <: R])" -> "multicast(Subject[R])", "multicast(Func0[_ <: Subject[_ >: T, _ <: TIntermediate]], Func1[_ >: Observable[TIntermediate], _ <: Observable[TResult]])" -> "multicast(() => Subject[R], Observable[R] => Observable[U])", @@ -118,6 +120,7 @@ class CompletenessTest extends JUnitSuite { "skipWhile(Func1[_ >: T, Boolean])" -> "dropWhile(T => Boolean)", "skipWhileWithIndex(Func2[_ >: T, Integer, Boolean])" -> unnecessary, "skipUntil(Observable[U])" -> "dropUntil(Observable[E])", + "single(Func1[_ >: T, Boolean])" -> "[use `filter(predicate).single`]", "startWith(T)" -> "[use `item +: o`]", "startWith(Array[T])" -> "[use `Observable.items(items) ++ o`]", "startWith(Array[T], Scheduler)" -> "[use `Observable.items(items).subscribeOn(scheduler) ++ o`]", @@ -151,6 +154,8 @@ class CompletenessTest extends JUnitSuite { "toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]", "window(Long, Long, TimeUnit)" -> "window(Duration, Duration)", "window(Long, Long, TimeUnit, Scheduler)" -> "window(Duration, Duration, Scheduler)", + "zip(Observable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Observable[U], (T, U) => R)", + "zip(Iterable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zip(Iterable[U], (T, U) => R)", // manually added entries for Java static methods "average(Observable[Integer])" -> averageProblem, From 9349cd489e615bc58b64174b3c91794cdaff0754 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 23 May 2014 14:41:25 +0800 Subject: [PATCH 2/2] Rename to 'zipWith'; add 'zip(iterable)'; add examples --- .../rx/lang/scala/examples/RxScalaDemo.scala | 25 +++++++++++++++++++ .../main/scala/rx/lang/scala/Observable.scala | 20 +++++++++++++-- .../rx/lang/scala/CompletenessTest.scala | 2 +- 3 files changed, 44 insertions(+), 3 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index 3800a85a21..6b1ffdd258 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -448,6 +448,31 @@ class RxScalaDemo extends JUnitSuite { .toBlockingObservable.foreach(println(_)) } + /** + * This is a bad way of using `zip` with an `Iterable`: even if the consumer unsubscribes, + * some elements may still be pulled from `Iterable`. + */ + @Test def zipWithIterableBadExample() { + val o1 = Observable.interval(100 millis, IOScheduler()).map(_ * 100).take(3) + val o2 = Observable.from(0 until Int.MaxValue).doOnEach(i => println(i + " from o2")) + o1.zip(o2).toBlockingObservable.foreach(println(_)) + } + + /** + * This is a good way of using `zip` with an `Iterable`: if the consumer unsubscribes, + * no more elements will be pulled from `Iterable`. + */ + @Test def zipWithIterableGoodExample() { + val o1 = Observable.interval(100 millis, IOScheduler()).map(_ * 100).take(3) + val iter = (0 until Int.MaxValue).view.map { + i => { + println(i + " from iter") + i + } + } + o1.zip(iter).toBlockingObservable.foreach(println(_)) + } + @Test def takeFirstWithCondition() { val condition: Int => Boolean = _ >= 3 assertEquals(3, List(1, 2, 3, 4).toObservable.filter(condition).first.toBlockingObservable.single) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index db7f05da31..4deaab8992 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -336,6 +336,22 @@ trait Observable[+T] zipWith(that, (t: T, u: U) => (t, u)) } + /** + * Returns an Observable formed from `this` Observable and `other` Iterable by combining + * corresponding elements in pairs. + *

+ * + *

+ * Note that the `other` Iterable is evaluated as items are observed from the source Observable; it is + * not pre-consumed. This allows you to zip infinite streams on either side. + * + * @param other the Iterable sequence + * @return an Observable that pairs up values from the source Observable and the `other` Iterable. + */ + def zip[U](other: Iterable[U]): Observable[(T, U)] = { + zipWith(other, (t: T, u: U) => (t, u)) + } + /** * Returns an Observable that emits items that are the result of applying a specified function to pairs of * values, one each from the source Observable and a specified Iterable sequence. @@ -351,7 +367,7 @@ trait Observable[+T] * @return an Observable that pairs up values from the source Observable and the `other` Iterable * sequence and emits the results of `selector` applied to these pairs */ - def zip[U, R](other: Iterable[U], selector: (T, U) => R): Observable[R] = { + def zipWith[U, R](other: Iterable[U], selector: (T, U) => R): Observable[R] = { val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]] toScalaObservable[R](thisJava.zip(other.asJava, selector)) } @@ -373,7 +389,7 @@ trait Observable[+T] * their index. Indices start at 0. */ def zipWithIndex: Observable[(T, Int)] = { - zip((0 until Int.MaxValue), (t: T, index: Int) => (t, index)) + zip(0 until Int.MaxValue) } /** diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index d818f6c706..611569493b 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -155,7 +155,7 @@ class CompletenessTest extends JUnitSuite { "window(Long, Long, TimeUnit)" -> "window(Duration, Duration)", "window(Long, Long, TimeUnit, Scheduler)" -> "window(Duration, Duration, Scheduler)", "zip(Observable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Observable[U], (T, U) => R)", - "zip(Iterable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zip(Iterable[U], (T, U) => R)", + "zip(Iterable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Iterable[U], (T, U) => R)", // manually added entries for Java static methods "average(Observable[Integer])" -> averageProblem,