Skip to content

Commit 62266af

Browse files
Merge pull request #1178 from zsxwing/issue1173
Fix issue #1173
2 parents d64b3a1 + 039378f commit 62266af

File tree

6 files changed

+138
-41
lines changed

6 files changed

+138
-41
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Notification.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ object Notification {
137137
* @param notification
138138
* The [[rx.lang.scala.Notification]] to be deconstructed
139139
* @return
140-
* The [[java.lang.Throwable]] value contained in this notification.
140+
* The `java.lang.Throwable` value contained in this notification.
141141
*/
142142
def unapply[U](notification: Notification[U]): Option[Throwable] = notification match {
143143
case onError: OnError[U] => Some(onError.error)

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 60 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,28 @@ import collection.JavaConversions._
4848
* the observer
4949
* @define subscribeObserverParamScheduler
5050
* the [[rx.lang.scala.Scheduler]] on which Observers subscribe to the Observable
51+
*
52+
* @define subscribeSubscriberMain
53+
* Call this method to subscribe an [[Subscriber]] for receiving items and notifications from the [[Observable]].
54+
*
55+
* A typical implementation of `subscribe` does the following:
56+
*
57+
* It stores a reference to the Observer in a collection object, such as a `List[T]` object.
58+
*
59+
* It returns a reference to the [[rx.lang.scala.Subscription]] interface. This enables [[Subscriber]]s to
60+
* unsubscribe, that is, to stop receiving items and notifications before the Observable stops
61+
* sending them, which also invokes the Subscriber's [[rx.lang.scala.Observer.onCompleted onCompleted]] method.
62+
*
63+
* An [[Observable]] instance is responsible for accepting all subscriptions
64+
* and notifying all [[Subscriber]]s. Unless the documentation for a particular
65+
* [[Observable]] implementation indicates otherwise, [[Subscriber]]s should make no
66+
* assumptions about the order in which multiple [[Subscriber]]s will receive their notifications.
67+
*
68+
* @define subscribeSubscriberParamObserver
69+
* the [[Subscriber]]
70+
* @define subscribeSubscriberParamScheduler
71+
* the [[rx.lang.scala.Scheduler]] on which [[Subscriber]]s subscribe to the Observable
72+
*
5173
* @define subscribeAllReturn
5274
* a [[rx.lang.scala.Subscription]] reference whose `unsubscribe` method can be called to stop receiving items
5375
* before the Observable has finished sending them
@@ -125,6 +147,39 @@ trait Observable[+T]
125147
*/
126148
def apply(observer: Observer[T]): Subscription = subscribe(observer)
127149

150+
/**
151+
* $subscribeSubscriberMain
152+
*
153+
* @param subscriber $subscribeSubscriberParamObserver
154+
* @param scheduler $subscribeSubscriberParamScheduler
155+
* @return $subscribeAllReturn
156+
*/
157+
def subscribe(subscriber: Subscriber[T], scheduler: Scheduler): Subscription = {
158+
// Add the casting to avoid compile error "ambiguous reference to overloaded definition"
159+
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]]
160+
thisJava.subscribe(subscriber.asJavaSubscriber, scheduler)
161+
}
162+
163+
/**
164+
* $subscribeSubscriberMain
165+
*
166+
* @param subscriber $subscribeSubscriberParamObserver
167+
* @return $subscribeAllReturn
168+
*/
169+
def subscribe(subscriber: Subscriber[T]): Subscription = {
170+
// Add the casting to avoid compile error "ambiguous reference to overloaded definition"
171+
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]]
172+
thisJava.subscribe(subscriber.asJavaSubscriber)
173+
}
174+
175+
/**
176+
* $subscribeSubscriberMain
177+
*
178+
* @param subscriber $subscribeSubscriberParamObserver
179+
* @return $subscribeAllReturn
180+
*/
181+
def apply(subscriber: Subscriber[T]): Subscription = subscribe(subscriber)
182+
128183
/**
129184
* $subscribeCallbacksMainNoNotifications
130185
*
@@ -287,7 +342,7 @@ trait Observable[+T]
287342
*
288343
* A well-behaved Observable does not interleave its invocations of the [[rx.lang.scala.Observer.onNext onNext]], [[rx.lang.scala.Observer.onCompleted onCompleted]], and [[rx.lang.scala.Observer.onError onError]] methods of
289344
* its [[rx.lang.scala.Observer]]s; it invokes `onCompleted` or `onError` only once; and it never invokes `onNext` after invoking either `onCompleted` or `onError`.
290-
* `synchronize` enforces this, and the Observable it returns invokes `onNext` and `onCompleted` or `onError` synchronously.
345+
* [[Observable.serialize serialize]] enforces this, and the Observable it returns invokes `onNext` and `onCompleted` or `onError` synchronously.
291346
*
292347
* @return an Observable that is a chronologically well-behaved version of the source
293348
* Observable, and that synchronously notifies its [[rx.lang.scala.Observer]]s
@@ -2405,8 +2460,7 @@ trait Observable[+T]
24052460

24062461
/**
24072462
* Perform work in parallel by sharding an `Observable[T]` on a
2408-
* [[rx.lang.scala.concurrency.Schedulers.threadPoolForComputation computation]]
2409-
* [[rx.lang.scala.Scheduler]] and return an `Observable[R]` with the output.
2463+
* [[rx.lang.scala.schedulers.ComputationScheduler]] and return an `Observable[R]` with the output.
24102464
*
24112465
* @param f
24122466
* a function that applies Observable operators to `Observable[T]` in parallel and returns an `Observable[R]`
@@ -2636,12 +2690,10 @@ trait Observable[+T]
26362690
* those emitted by the source Observable
26372691
* @throws IndexOutOfBoundsException
26382692
* if index is greater than or equal to the number of items emitted by the source
2639-
* Observable
2640-
* @throws IndexOutOfBoundsException
2641-
* if index is less than 0
2693+
* Observable, or index is less than 0
26422694
* @see `Observable.elementAt`
2643-
* @deprecated("Use `elementAt`", "0.18.0")
26442695
*/
2696+
@deprecated("Use `elementAt`", "0.18.0")
26452697
def apply(index: Int): Observable[T] = elementAt(index)
26462698

26472699
/**
@@ -2656,9 +2708,7 @@ trait Observable[+T]
26562708
* those emitted by the source Observable
26572709
* @throws IndexOutOfBoundsException
26582710
* if index is greater than or equal to the number of items emitted by the source
2659-
* Observable
2660-
* @throws IndexOutOfBoundsException
2661-
* if index is less than 0
2711+
* Observable, or index is less than 0
26622712
*/
26632713
def elementAt(index: Int): Observable[T] = {
26642714
toScalaObservable[T](asJavaObservable.elementAt(index))

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,16 @@ package rx.lang.scala
33
trait Subscriber[-T] extends Observer[T] with Subscription {
44

55
self =>
6-
7-
private [scala] override val asJavaObserver: rx.Observer[_ >: T] = asJavaSubscriber
8-
private [scala] override val asJavaSubscription: rx.Subscription = asJavaSubscriber
9-
6+
107
private [scala] val asJavaSubscriber: rx.Subscriber[_ >: T] = new rx.Subscriber[T] {
118
def onNext(value: T): Unit = self.onNext(value)
129
def onError(error: Throwable): Unit = self.onError(error)
1310
def onCompleted(): Unit = self.onCompleted()
1411
}
15-
12+
13+
private [scala] override val asJavaObserver: rx.Observer[_ >: T] = asJavaSubscriber
14+
private [scala] override val asJavaSubscription: rx.Subscription = asJavaSubscriber
15+
1616
/**
1717
* Used to register an unsubscribe callback.
1818
*/
@@ -34,6 +34,8 @@ object Subscriber extends ObserverFactoryMethods[Subscriber] {
3434

3535
private[scala] def apply[T](subscriber: rx.Subscriber[T]): Subscriber[T] = new Subscriber[T] {
3636
override val asJavaSubscriber = subscriber
37+
override val asJavaObserver: rx.Observer[_ >: T] = asJavaSubscriber
38+
override val asJavaSubscription: rx.Subscription = asJavaSubscriber
3739

3840
override def onNext(value: T): Unit = asJavaSubscriber.onNext(value)
3941
override def onError(error: Throwable): Unit = asJavaSubscriber.onError(error)

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/CompositeSubscription.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ class CompositeSubscription private[scala] (override val asJavaSubscription: rx.
5050

5151
/**
5252
* Adds a subscription to the group,
53-
* or unsubscribes immediately is the [[rx.subscriptions.CompositeSubscription]] is unsubscribed.
53+
* or unsubscribes immediately is the [[rx.lang.scala.subscriptions.CompositeSubscription]] is unsubscribed.
5454
* @param subscription the subscription to be added.
55-
* @return the [[rx.subscriptions.CompositeSubscription]] itself.
55+
* @return the [[rx.lang.scala.subscriptions.CompositeSubscription]] itself.
5656
*/
5757
def +=(subscription: Subscription): this.type = {
5858
asJavaSubscription.add(subscription.asJavaSubscription)
@@ -62,7 +62,7 @@ class CompositeSubscription private[scala] (override val asJavaSubscription: rx.
6262
/**
6363
* Removes and unsubscribes a subscription to the group,
6464
* @param subscription the subscription be removed.
65-
* @return the [[rx.subscriptions.CompositeSubscription]] itself.
65+
* @return the [[rx.lang.scala.subscriptions.CompositeSubscription]] itself.
6666
*/
6767
def -=(subscription: Subscription): this.type = {
6868
asJavaSubscription.remove(subscription.asJavaSubscription)

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -71,17 +71,16 @@ class CompletenessTest extends JUnitSuite {
7171
"all(Func1[_ >: T, Boolean])" -> "forall(T => Boolean)",
7272
"buffer(Long, Long, TimeUnit)" -> "buffer(Duration, Duration)",
7373
"buffer(Long, Long, TimeUnit, Scheduler)" -> "buffer(Duration, Duration, Scheduler)",
74-
"contains(T)" -> "contains(Any)",
74+
"contains(Any)" -> "contains(U)",
7575
"count()" -> "length",
7676
"dematerialize()" -> "dematerialize(<:<[Observable[T], Observable[Notification[U]]])",
77-
"elementAt(Int)" -> "[use `.drop(index).first`]",
78-
"elementAtOrDefault(Int, T)" -> "[use `.drop(index).firstOrElse(default)`]",
77+
"elementAtOrDefault(Int, T)" -> "elementAtOrDefault(Int, U)",
7978
"first(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate,
8079
"firstOrDefault(T)" -> "firstOrElse(=> U)",
81-
"firstOrDefault(Func1[_ >: T, Boolean], T)" -> "[use `.filter(condition).firstOrElse(default)`]",
80+
"firstOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `.filter(condition).firstOrElse(default)`]",
8281
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]",
82+
"groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: GroupedObservable[TKey, T], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, (K, Observable[T]) => Observable[Any])",
8383
"lift(Operator[_ <: R, _ >: T])" -> "lift(Subscriber[R] => Subscriber[T])",
84-
"mapMany(Func1[_ >: T, _ <: Observable[_ <: R]])" -> "flatMap(T => Observable[R])",
8584
"mapWithIndex(Func2[_ >: T, Integer, _ <: R])" -> "[combine `zipWithIndex` with `map` or with a for comprehension]",
8685
"onErrorResumeNext(Func1[Throwable, _ <: Observable[_ <: T]])" -> "onErrorResumeNext(Throwable => Observable[U])",
8786
"onErrorResumeNext(Observable[_ <: T])" -> "onErrorResumeNext(Observable[U])",
@@ -95,6 +94,7 @@ class CompletenessTest extends JUnitSuite {
9594
"publish(Func1[_ >: Observable[T], _ <: Observable[R]], T)" -> "publish(Observable[U] => Observable[R], U)",
9695
"reduce(Func2[T, T, T])" -> "reduce((U, U) => U)",
9796
"reduce(R, Func2[R, _ >: T, R])" -> "foldLeft(R)((R, T) => R)",
97+
"repeat()" -> "repeat()",
9898
"retry()" -> "retry()",
9999
"scan(Func2[T, T, T])" -> unnecessary,
100100
"scan(R, Func2[R, _ >: T, R])" -> "scan(R)((R, T) => R)",
@@ -113,8 +113,8 @@ class CompletenessTest extends JUnitSuite {
113113
"skipLast(Int)" -> "dropRight(Int)",
114114
"skipLast(Long, TimeUnit)" -> "dropRight(Duration)",
115115
"skipLast(Long, TimeUnit, Scheduler)" -> "dropRight(Duration, Scheduler)",
116-
"takeFirst()" -> "first",
117-
"takeFirst(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate,
116+
"subscribe()" -> "subscribe()",
117+
"takeFirst(Func1[_ >: T, Boolean])" -> "[use `filter(condition).take(1)`]",
118118
"takeLast(Int)" -> "takeRight(Int)",
119119
"takeWhileWithIndex(Func2[_ >: T, _ >: Integer, Boolean])" -> "[use `.zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1)`]",
120120
"timeout(Func0[_ <: Observable[U]], Func1[_ >: T, _ <: Observable[V]], Observable[_ <: T])" -> "timeout(() => Observable[U], T => Observable[V], Observable[O])",
@@ -126,7 +126,6 @@ class CompletenessTest extends JUnitSuite {
126126
"toList()" -> "toSeq",
127127
"toSortedList()" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]",
128128
"toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
129-
"where(Func1[_ >: T, Boolean])" -> "filter(T => Boolean)",
130129
"window(Long, Long, TimeUnit)" -> "window(Duration, Duration)",
131130
"window(Long, Long, TimeUnit, Scheduler)" -> "window(Duration, Duration, Scheduler)",
132131

@@ -135,32 +134,28 @@ class CompletenessTest extends JUnitSuite {
135134
"averageDoubles(Observable[Double])" -> averageProblem,
136135
"averageFloats(Observable[Float])" -> averageProblem,
137136
"averageLongs(Observable[Long])" -> averageProblem,
138-
"create(OnSubscribeFunc[T])" -> "apply(Observer[T] => Subscription)",
137+
"create(OnSubscribeFunc[T])" -> "create(Observer[T] => Subscription)",
138+
"create(OnSubscribe[T])" -> "apply(Subscriber[T] => Unit)",
139139
"combineLatest(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "combineLatest(Observable[U])",
140140
"concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])",
141141
"defer(Func0[_ <: Observable[_ <: T]])" -> "defer(=> Observable[T])",
142-
"empty()" -> "apply(T*)",
143-
"error(Throwable)" -> "apply(Throwable)",
144-
"from(Array[T])" -> "apply(T*)",
145-
"from(Iterable[_ <: T])" -> "apply(T*)",
142+
"from(Array[T])" -> "[use `items(T*)`]",
143+
"from(Iterable[_ <: T])" -> "from(Iterable[T])",
146144
"from(Future[_ <: T])" -> fromFuture,
147145
"from(Future[_ <: T], Long, TimeUnit)" -> fromFuture,
148146
"from(Future[_ <: T], Scheduler)" -> fromFuture,
149-
"just(T)" -> "apply(T*)",
147+
"just(T)" -> "[use `items(T*)`]",
148+
"just(T, Scheduler)" -> "[use `items(T*).subscribeOn(scheduler)`]",
150149
"merge(Observable[_ <: T], Observable[_ <: T])" -> "merge(Observable[U])",
151150
"merge(Observable[_ <: Observable[_ <: T]])" -> "flatten(<:<[Observable[T], Observable[Observable[U]]])",
152151
"mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[U])",
153152
"mergeDelayError(Observable[_ <: Observable[_ <: T]])" -> "flattenDelayError(<:<[Observable[T], Observable[Observable[U]]])",
154-
"range(Int, Int)" -> "apply(Range)",
155-
"repeat()" -> "repeat()",
156-
"retry()" -> "retry()",
157-
"sequenceEqual(Observable[_ <: T], Observable[_ <: T])" -> "[use `(first zip second) map (p => p._1 == p._2)`]",
158-
"sequenceEqual(Observable[_ <: T], Observable[_ <: T], Func2[_ >: T, _ >: T, Boolean])" -> "[use `(first zip second) map (p => equality(p._1, p._2))`]",
153+
"range(Int, Int)" -> "[use `(start until (start + count)).toObservable` instead of `range(start, count)`]",
154+
"range(Int, Int, Scheduler)" -> "[use `(start until (start + count)).toObservable.subscribeOn(scheduler)` instead of `range(start, count, scheduler)`]`]",
159155
"sum(Observable[Integer])" -> "sum(Numeric[U])",
160156
"sumDoubles(Observable[Double])" -> "sum(Numeric[U])",
161157
"sumFloats(Observable[Float])" -> "sum(Numeric[U])",
162158
"sumLongs(Observable[Long])" -> "sum(Numeric[U])",
163-
"synchronize(Observable[T])" -> "synchronize",
164159
"switchDo(Observable[_ <: Observable[_ <: T]])" -> deprecated,
165160
"switchOnNext(Observable[_ <: Observable[_ <: T]])" -> "switch(<:<[Observable[T], Observable[Observable[U]]])",
166161
"zip(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "[use instance method `zip` and `map`]",
@@ -174,7 +169,7 @@ class CompletenessTest extends JUnitSuite {
174169
"concat(" + _ + ")" -> "[unnecessary because we can use `++` instead or `Observable(o1, o2, ...).concat`]"
175170
).drop(1).toMap ++ List.iterate("T", 10)(s => s + ", T").map(
176171
// all 10 overloads of from:
177-
"from(" + _ + ")" -> "apply(T*)"
172+
"from(" + _ + ")" -> "[use `items(T*)`]"
178173
).toMap ++ (3 to 9).map(i => {
179174
// zip3-9:
180175
val obsArgs = (1 to i).map(j => s"Observable[_ <: T$j], ").mkString("")
@@ -216,6 +211,8 @@ class CompletenessTest extends JUnitSuite {
216211
// TODO how can we filter out instance methods which were put into companion because
217212
// of extends AnyVal in a way which does not depend on implementation-chosen name '$extension'?
218213
.filter(! _.contains("$extension"))
214+
// `access$000` is public. How to distinguish it from others without hard-code?
215+
.filter(! _.contains("access$000"))
219216
}
220217

221218
// also applicable for Java types
@@ -373,7 +370,10 @@ class CompletenessTest extends JUnitSuite {
373370
def escape(s: String) = s.replaceAllLiterally("[", "&lt;").replaceAllLiterally("]", "&gt;")
374371

375372
println("""
376-
## Comparison of Scala Observable and Java Observable
373+
---
374+
layout: comparison
375+
title: Comparison of Scala Observable and Java Observable
376+
---
377377
378378
Note:
379379
* This table contains both static methods and instance methods.
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.lang.scala
17+
18+
import org.junit.Test
19+
import org.junit.Assert.assertNotNull
20+
import org.junit.Assert.assertTrue
21+
import org.scalatest.junit.JUnitSuite
22+
23+
class SubscriberTests extends JUnitSuite {
24+
25+
@Test def testIssue1173() {
26+
// https://github.com/Netflix/RxJava/issues/1173
27+
val subscriber = Subscriber((n: Int) => println(n))
28+
assertNotNull(subscriber.asJavaObserver)
29+
assertNotNull(subscriber.asJavaSubscription)
30+
assertNotNull(subscriber.asJavaSubscriber)
31+
}
32+
33+
@Test def testUnsubscribeForSubscriber() {
34+
var innerSubscriber: Subscriber[Int] = null
35+
val o = Observable[Int](subscriber => {
36+
Observable[Int](subscriber => {
37+
innerSubscriber = subscriber
38+
}).subscribe(subscriber)
39+
})
40+
o.subscribe().unsubscribe()
41+
// If we unsubscribe outside, the inner Subscriber should also be unsubscribed
42+
assertTrue(innerSubscriber.isUnsubscribed)
43+
}
44+
45+
}

0 commit comments

Comments
 (0)