Skip to content

Commit 27d1194

Browse files
Merge pull request #1210 from zsxwing/rxscala-more
Add more operators to RxScala
2 parents 62266af + c965815 commit 27d1194

File tree

3 files changed

+349
-1
lines changed

3 files changed

+349
-1
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,12 @@ class RxScalaDemo extends JUnitSuite {
144144
o.buffer(5).subscribe((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
145145
}
146146

147+
@Test def bufferExample() {
148+
val o = Observable.from(1 to 18).zip(Observable.interval(100 millis)).map(_._1)
149+
val boundary = Observable.interval(500 millis)
150+
o.buffer(boundary).toBlockingObservable.foreach((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
151+
}
152+
147153
@Test def windowExample() {
148154
(for ((o, i) <- Observable.from(1 to 18).window(5).zipWithIndex; n <- o)
149155
yield s"Observable#$i emits $n"
@@ -644,6 +650,21 @@ class RxScalaDemo extends JUnitSuite {
644650
println(result)
645651
}
646652

653+
@Test def delayExample3(): Unit = {
654+
val o = List(100, 500, 200).toObservable.delay(
655+
(i: Int) => Observable.items(i).delay(i millis)
656+
)
657+
o.toBlockingObservable.foreach(println(_))
658+
}
659+
660+
@Test def delayExample4(): Unit = {
661+
val o = List(100, 500, 200).toObservable.delay(
662+
() => Observable.interval(500 millis).take(1),
663+
(i: Int) => Observable.items(i).delay(i millis)
664+
)
665+
o.toBlockingObservable.foreach(println(_))
666+
}
667+
647668
@Test def delaySubscriptionExample(): Unit = {
648669
val o = List(100L, 200L, 300L).toObservable.delaySubscription(2 seconds)
649670
val result = o.toBlockingObservable.toList
@@ -792,6 +813,53 @@ class RxScalaDemo extends JUnitSuite {
792813
assertEquals(List(1, 2, 3, 4), o.toBlockingObservable.toList)
793814
}
794815

816+
@Test def sequenceEqualExampe(): Unit = {
817+
val o1 = List(1, 2, 3).toObservable
818+
val o2 = List(1, 2, 3).toObservable
819+
val o3 = List(1, 2).toObservable
820+
val o4 = List(1.0, 2.0, 3.0).toObservable
821+
assertTrue(o1.sequenceEqual(o2).toBlockingObservable.single)
822+
assertFalse(o1.sequenceEqual(o3).toBlockingObservable.single)
823+
assertTrue(o1.sequenceEqual(o4).toBlockingObservable.single)
824+
}
825+
826+
@Test def takeExample(): Unit = {
827+
val o = (1 to 20).toObservable
828+
.zip(Observable.interval(300 millis))
829+
.map(_._1)
830+
.take(2 seconds)
831+
println(o.toBlockingObservable.toList)
832+
}
833+
834+
@Test def takeRightExample(): Unit = {
835+
val o = (1 to 6).toObservable.takeRight(3)
836+
assertEquals(List(4, 5, 6), o.toBlockingObservable.toList)
837+
}
838+
839+
@Test def takeRightExample2(): Unit = {
840+
val o = (1 to 10).toObservable
841+
.zip(Observable.interval(100 millis))
842+
.map(_._1)
843+
.takeRight(300 millis)
844+
println(o.toBlockingObservable.toList)
845+
}
846+
847+
@Test def takeRightExample3(): Unit = {
848+
val o = (1 to 10).toObservable
849+
.zip(Observable.interval(100 millis))
850+
.map(_._1)
851+
.takeRight(2, 300 millis)
852+
println(o.toBlockingObservable.toList)
853+
}
854+
855+
@Test def timeIntervalExample(): Unit = {
856+
val o = (1 to 10).toObservable
857+
.zip(Observable.interval(100 millis))
858+
.map(_._1)
859+
.timeInterval
860+
println(o.toBlockingObservable.toList)
861+
}
862+
795863
@Test def schedulerExample1(): Unit = {
796864
val latch = new CountDownLatch(1)
797865
val worker = IOScheduler().createWorker

0 commit comments

Comments
 (0)