Skip to content

Commit 19e1773

Browse files
committed
Add zip(iterable, selector) to RxScala; resolve the issue of zipWithIndex; update CompletenessTest.scala
1 parent 0efda07 commit 19e1773

File tree

2 files changed

+26
-1
lines changed

2 files changed

+26
-1
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,26 @@ trait Observable[+T]
336336
zipWith(that, (t: T, u: U) => (t, u))
337337
}
338338

339+
/**
340+
* Returns an Observable that emits items that are the result of applying a specified function to pairs of
341+
* values, one each from the source Observable and a specified Iterable sequence.
342+
* <p>
343+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.i.png">
344+
* <p>
345+
* Note that the `other` Iterable is evaluated as items are observed from the source Observable; it is
346+
* not pre-consumed. This allows you to zip infinite streams on either side.
347+
*
348+
* @param other the Iterable sequence
349+
* @param selector a function that combines the pairs of items from the Observable and the Iterable to generate
350+
* the items to be emitted by the resulting Observable
351+
* @return an Observable that pairs up values from the source Observable and the `other` Iterable
352+
* sequence and emits the results of `selector` applied to these pairs
353+
*/
354+
def zip[U, R](other: Iterable[U], selector: (T, U) => R): Observable[R] = {
355+
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]]
356+
toScalaObservable[R](thisJava.zip(other.asJava, selector))
357+
}
358+
339359
/**
340360
* Returns an Observable formed from this Observable and another Observable by combining
341361
* corresponding elements using the selector function.
@@ -353,7 +373,7 @@ trait Observable[+T]
353373
* their index. Indices start at 0.
354374
*/
355375
def zipWithIndex: Observable[(T, Int)] = {
356-
zip((0 until Int.MaxValue).toObservable)
376+
zip((0 until Int.MaxValue), (t: T, index: Int) => (t, index))
357377
}
358378

359379
/**

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,9 @@ class CompletenessTest extends JUnitSuite {
8787
"firstOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `.filter(condition).firstOrElse(default)`]",
8888
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]",
8989
"groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: GroupedObservable[TKey, T], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, (K, Observable[T]) => Observable[Any])",
90+
"ignoreElements()" -> "[use `filter(_ => false)`]",
9091
"lift(Operator[_ <: R, _ >: T])" -> "lift(Subscriber[R] => Subscriber[T])",
92+
"limit(Int)" -> "take(Int)",
9193
"mapWithIndex(Func2[_ >: T, Integer, _ <: R])" -> "[combine `zipWithIndex` with `map` or with a for comprehension]",
9294
"multicast(Subject[_ >: T, _ <: R])" -> "multicast(Subject[R])",
9395
"multicast(Func0[_ <: Subject[_ >: T, _ <: TIntermediate]], Func1[_ >: Observable[TIntermediate], _ <: Observable[TResult]])" -> "multicast(() => Subject[R], Observable[R] => Observable[U])",
@@ -118,6 +120,7 @@ class CompletenessTest extends JUnitSuite {
118120
"skipWhile(Func1[_ >: T, Boolean])" -> "dropWhile(T => Boolean)",
119121
"skipWhileWithIndex(Func2[_ >: T, Integer, Boolean])" -> unnecessary,
120122
"skipUntil(Observable[U])" -> "dropUntil(Observable[E])",
123+
"single(Func1[_ >: T, Boolean])" -> "[use `filter(predicate).single`]",
121124
"startWith(T)" -> "[use `item +: o`]",
122125
"startWith(Array[T])" -> "[use `Observable.items(items) ++ o`]",
123126
"startWith(Array[T], Scheduler)" -> "[use `Observable.items(items).subscribeOn(scheduler) ++ o`]",
@@ -151,6 +154,8 @@ class CompletenessTest extends JUnitSuite {
151154
"toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
152155
"window(Long, Long, TimeUnit)" -> "window(Duration, Duration)",
153156
"window(Long, Long, TimeUnit, Scheduler)" -> "window(Duration, Duration, Scheduler)",
157+
"zip(Observable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Observable[U], (T, U) => R)",
158+
"zip(Iterable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zip(Iterable[U], (T, U) => R)",
154159

155160
// manually added entries for Java static methods
156161
"average(Observable[Integer])" -> averageProblem,

0 commit comments

Comments
 (0)