Skip to content

Commit d9df3bd

Browse files
Merge pull request #1239 from zsxwing/apply-scaladoc-example
Update docs for "apply" and add an example
2 parents 0efda07 + ba6bf7c commit d9df3bd

File tree

3 files changed

+41
-4
lines changed

3 files changed

+41
-4
lines changed

language-adaptors/rxjava-scala/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ This adaptor allows to use RxJava in Scala with anonymous functions, e.g.
55
```scala
66
val o = Observable.interval(200 millis).take(5)
77
o.subscribe(n => println("n = " + n))
8-
Observable(1, 2, 3, 4).reduce(_ + _)
8+
Observable.items(1, 2, 3, 4).reduce(_ + _)
99
```
1010

1111
For-comprehensions are also supported:
1212

1313
```scala
14-
val first = Observable(10, 11, 12)
15-
val second = Observable(10, 11, 12)
14+
val first = Observable.items(10, 11, 12)
15+
val second = Observable.items(10, 11, 12)
1616
val booleans = for ((n1, n2) <- (first zip second)) yield (n1 == n2)
1717
```
1818

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -620,6 +620,34 @@ class RxScalaDemo extends JUnitSuite {
620620
o.take(1).subscribe(println(_))
621621
}
622622

623+
@Test def createExampleGood2() {
624+
import scala.io.{Codec, Source}
625+
626+
val rxscala = Observable[String](subscriber => {
627+
try {
628+
val input = new java.net.URL("http://rxscala.github.io/").openStream()
629+
subscriber.add(Subscription {
630+
input.close()
631+
})
632+
Source.fromInputStream(input)(Codec.UTF8).getLines()
633+
.takeWhile(_ => !subscriber.isUnsubscribed)
634+
.foreach(subscriber.onNext(_))
635+
if (!subscriber.isUnsubscribed) {
636+
subscriber.onCompleted()
637+
}
638+
}
639+
catch {
640+
case e: Throwable => if (!subscriber.isUnsubscribed) subscriber.onError(e)
641+
}
642+
}).subscribeOn(IOScheduler())
643+
644+
val count = rxscala.flatMap(_.split("\\W+").toSeq.toObservable)
645+
.map(_.toLowerCase)
646+
.filter(_ == "rxscala")
647+
.size
648+
println(s"RxScala appears ${count.toBlockingObservable.single} times in http://rxscala.github.io/")
649+
}
650+
623651
def output(s: String): Unit = println(s)
624652

625653
/** Subscribes to obs and waits until obs has completed. Note that if you subscribe to

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3330,10 +3330,19 @@ object Observable {
33303330
* Write the function you pass so that it behaves as an Observable: It should invoke the
33313331
* Subscriber's `onNext`, `onError`, and `onCompleted` methods appropriately.
33323332
*
3333+
* You can `add` custom [[Subscription]]s to [[Subscriber]]. These [[Subscription]]s will be called
3334+
* <ul>
3335+
* <li>when someone calls `unsubscribe`.</li>
3336+
* <li>after `onCompleted` or `onError`.</li>
3337+
* </ul>
3338+
*
33333339
* See <a href="http://go.microsoft.com/fwlink/?LinkID=205219">Rx Design Guidelines (PDF)</a> for detailed
33343340
* information.
33353341
*
3336-
* @tparam T
3342+
* See `<a href="https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala">RxScalaDemo</a>.createExampleGood`
3343+
* and `<a href="https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala">RxScalaDemo</a>.createExampleGood2`.
3344+
*
3345+
* @param T
33373346
* the type of the items that this Observable emits
33383347
* @param f
33393348
* a function that accepts a `Subscriber[T]`, and invokes its `onNext`,

0 commit comments

Comments
 (0)