Skip to content

Commit 9349cd4

Browse files
committed
Rename to 'zipWith'; add 'zip(iterable)'; add examples
1 parent 19e1773 commit 9349cd4

File tree

3 files changed

+44
-3
lines changed

3 files changed

+44
-3
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,31 @@ class RxScalaDemo extends JUnitSuite {
448448
.toBlockingObservable.foreach(println(_))
449449
}
450450

451+
/**
452+
* This is a bad way of using `zip` with an `Iterable`: even if the consumer unsubscribes,
453+
* some elements may still be pulled from `Iterable`.
454+
*/
455+
@Test def zipWithIterableBadExample() {
456+
val o1 = Observable.interval(100 millis, IOScheduler()).map(_ * 100).take(3)
457+
val o2 = Observable.from(0 until Int.MaxValue).doOnEach(i => println(i + " from o2"))
458+
o1.zip(o2).toBlockingObservable.foreach(println(_))
459+
}
460+
461+
/**
462+
* This is a good way of using `zip` with an `Iterable`: if the consumer unsubscribes,
463+
* no more elements will be pulled from `Iterable`.
464+
*/
465+
@Test def zipWithIterableGoodExample() {
466+
val o1 = Observable.interval(100 millis, IOScheduler()).map(_ * 100).take(3)
467+
val iter = (0 until Int.MaxValue).view.map {
468+
i => {
469+
println(i + " from iter")
470+
i
471+
}
472+
}
473+
o1.zip(iter).toBlockingObservable.foreach(println(_))
474+
}
475+
451476
@Test def takeFirstWithCondition() {
452477
val condition: Int => Boolean = _ >= 3
453478
assertEquals(3, List(1, 2, 3, 4).toObservable.filter(condition).first.toBlockingObservable.single)

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,22 @@ trait Observable[+T]
336336
zipWith(that, (t: T, u: U) => (t, u))
337337
}
338338

339+
/**
340+
* Returns an Observable formed from `this` Observable and `other` Iterable by combining
341+
* corresponding elements in pairs.
342+
* <p>
343+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.i.png">
344+
* <p>
345+
* Note that the `other` Iterable is evaluated as items are observed from the source Observable; it is
346+
* not pre-consumed. This allows you to zip infinite streams on either side.
347+
*
348+
* @param other the Iterable sequence
349+
* @return an Observable that pairs up values from the source Observable and the `other` Iterable.
350+
*/
351+
def zip[U](other: Iterable[U]): Observable[(T, U)] = {
352+
zipWith(other, (t: T, u: U) => (t, u))
353+
}
354+
339355
/**
340356
* Returns an Observable that emits items that are the result of applying a specified function to pairs of
341357
* values, one each from the source Observable and a specified Iterable sequence.
@@ -351,7 +367,7 @@ trait Observable[+T]
351367
* @return an Observable that pairs up values from the source Observable and the `other` Iterable
352368
* sequence and emits the results of `selector` applied to these pairs
353369
*/
354-
def zip[U, R](other: Iterable[U], selector: (T, U) => R): Observable[R] = {
370+
def zipWith[U, R](other: Iterable[U], selector: (T, U) => R): Observable[R] = {
355371
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]]
356372
toScalaObservable[R](thisJava.zip(other.asJava, selector))
357373
}
@@ -373,7 +389,7 @@ trait Observable[+T]
373389
* their index. Indices start at 0.
374390
*/
375391
def zipWithIndex: Observable[(T, Int)] = {
376-
zip((0 until Int.MaxValue), (t: T, index: Int) => (t, index))
392+
zip(0 until Int.MaxValue)
377393
}
378394

379395
/**

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ class CompletenessTest extends JUnitSuite {
155155
"window(Long, Long, TimeUnit)" -> "window(Duration, Duration)",
156156
"window(Long, Long, TimeUnit, Scheduler)" -> "window(Duration, Duration, Scheduler)",
157157
"zip(Observable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Observable[U], (T, U) => R)",
158-
"zip(Iterable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zip(Iterable[U], (T, U) => R)",
158+
"zip(Iterable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Iterable[U], (T, U) => R)",
159159

160160
// manually added entries for Java static methods
161161
"average(Observable[Integer])" -> averageProblem,

0 commit comments

Comments
 (0)