Skip to content

Commit ecedbab

Browse files
Merge pull request #1247 from zsxwing/zip-iterable
Merge pull request #1247
2 parents 13dc137 + 9349cd4 commit ecedbab

File tree

3 files changed

+67
-1
lines changed

3 files changed

+67
-1
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,31 @@ class RxScalaDemo extends JUnitSuite {
448448
.toBlockingObservable.foreach(println(_))
449449
}
450450

451+
/**
452+
* This is a bad way of using `zip` with an `Iterable`: even if the consumer unsubscribes,
453+
* some elements may still be pulled from `Iterable`.
454+
*/
455+
@Test def zipWithIterableBadExample() {
456+
val o1 = Observable.interval(100 millis, IOScheduler()).map(_ * 100).take(3)
457+
val o2 = Observable.from(0 until Int.MaxValue).doOnEach(i => println(i + " from o2"))
458+
o1.zip(o2).toBlockingObservable.foreach(println(_))
459+
}
460+
461+
/**
462+
* This is a good way of using `zip` with an `Iterable`: if the consumer unsubscribes,
463+
* no more elements will be pulled from `Iterable`.
464+
*/
465+
@Test def zipWithIterableGoodExample() {
466+
val o1 = Observable.interval(100 millis, IOScheduler()).map(_ * 100).take(3)
467+
val iter = (0 until Int.MaxValue).view.map {
468+
i => {
469+
println(i + " from iter")
470+
i
471+
}
472+
}
473+
o1.zip(iter).toBlockingObservable.foreach(println(_))
474+
}
475+
451476
@Test def takeFirstWithCondition() {
452477
val condition: Int => Boolean = _ >= 3
453478
assertEquals(3, List(1, 2, 3, 4).toObservable.filter(condition).first.toBlockingObservable.single)

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,42 @@ trait Observable[+T]
336336
zipWith(that, (t: T, u: U) => (t, u))
337337
}
338338

339+
/**
340+
* Returns an Observable formed from `this` Observable and `other` Iterable by combining
341+
* corresponding elements in pairs.
342+
* <p>
343+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.i.png">
344+
* <p>
345+
* Note that the `other` Iterable is evaluated as items are observed from the source Observable; it is
346+
* not pre-consumed. This allows you to zip infinite streams on either side.
347+
*
348+
* @param other the Iterable sequence
349+
* @return an Observable that pairs up values from the source Observable and the `other` Iterable.
350+
*/
351+
def zip[U](other: Iterable[U]): Observable[(T, U)] = {
352+
zipWith(other, (t: T, u: U) => (t, u))
353+
}
354+
355+
/**
356+
* Returns an Observable that emits items that are the result of applying a specified function to pairs of
357+
* values, one each from the source Observable and a specified Iterable sequence.
358+
* <p>
359+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.i.png">
360+
* <p>
361+
* Note that the `other` Iterable is evaluated as items are observed from the source Observable; it is
362+
* not pre-consumed. This allows you to zip infinite streams on either side.
363+
*
364+
* @param other the Iterable sequence
365+
* @param selector a function that combines the pairs of items from the Observable and the Iterable to generate
366+
* the items to be emitted by the resulting Observable
367+
* @return an Observable that pairs up values from the source Observable and the `other` Iterable
368+
* sequence and emits the results of `selector` applied to these pairs
369+
*/
370+
def zipWith[U, R](other: Iterable[U], selector: (T, U) => R): Observable[R] = {
371+
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]]
372+
toScalaObservable[R](thisJava.zip(other.asJava, selector))
373+
}
374+
339375
/**
340376
* Returns an Observable formed from this Observable and another Observable by combining
341377
* corresponding elements using the selector function.
@@ -353,7 +389,7 @@ trait Observable[+T]
353389
* their index. Indices start at 0.
354390
*/
355391
def zipWithIndex: Observable[(T, Int)] = {
356-
zip((0 until Int.MaxValue).toObservable)
392+
zip(0 until Int.MaxValue)
357393
}
358394

359395
/**

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,9 @@ class CompletenessTest extends JUnitSuite {
8787
"firstOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `.filter(condition).firstOrElse(default)`]",
8888
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]",
8989
"groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: GroupedObservable[TKey, T], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, (K, Observable[T]) => Observable[Any])",
90+
"ignoreElements()" -> "[use `filter(_ => false)`]",
9091
"lift(Operator[_ <: R, _ >: T])" -> "lift(Subscriber[R] => Subscriber[T])",
92+
"limit(Int)" -> "take(Int)",
9193
"mapWithIndex(Func2[_ >: T, Integer, _ <: R])" -> "[combine `zipWithIndex` with `map` or with a for comprehension]",
9294
"multicast(Subject[_ >: T, _ <: R])" -> "multicast(Subject[R])",
9395
"multicast(Func0[_ <: Subject[_ >: T, _ <: TIntermediate]], Func1[_ >: Observable[TIntermediate], _ <: Observable[TResult]])" -> "multicast(() => Subject[R], Observable[R] => Observable[U])",
@@ -118,6 +120,7 @@ class CompletenessTest extends JUnitSuite {
118120
"skipWhile(Func1[_ >: T, Boolean])" -> "dropWhile(T => Boolean)",
119121
"skipWhileWithIndex(Func2[_ >: T, Integer, Boolean])" -> unnecessary,
120122
"skipUntil(Observable[U])" -> "dropUntil(Observable[E])",
123+
"single(Func1[_ >: T, Boolean])" -> "[use `filter(predicate).single`]",
121124
"startWith(T)" -> "[use `item +: o`]",
122125
"startWith(Array[T])" -> "[use `Observable.items(items) ++ o`]",
123126
"startWith(Array[T], Scheduler)" -> "[use `Observable.items(items).subscribeOn(scheduler) ++ o`]",
@@ -151,6 +154,8 @@ class CompletenessTest extends JUnitSuite {
151154
"toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
152155
"window(Long, Long, TimeUnit)" -> "window(Duration, Duration)",
153156
"window(Long, Long, TimeUnit, Scheduler)" -> "window(Duration, Duration, Scheduler)",
157+
"zip(Observable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Observable[U], (T, U) => R)",
158+
"zip(Iterable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Iterable[U], (T, U) => R)",
154159

155160
// manually added entries for Java static methods
156161
"average(Observable[Integer])" -> averageProblem,

0 commit comments

Comments
 (0)