Skip to content

Commit 01c7406

Browse files
Merge pull request #1362 from zsxwing/rxscala-create
RxScala: Fix #1340 and #1343
2 parents aa374e5 + 476353b commit 01c7406

File tree

2 files changed

+22
-7
lines changed

2 files changed

+22
-7
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4018,13 +4018,15 @@ object Observable {
40184018
* @return
40194019
* an Observable that, when an [[rx.lang.scala.Observer]] subscribes to it, will execute the given function.
40204020
*/
4021-
@deprecated("Use `apply[T](Subscriber[T] => Unit)` instead", "0.17.0")
40224021
def create[T](func: Observer[T] => Subscription): Observable[T] = {
4023-
toScalaObservable[T](rx.Observable.create(new OnSubscribeFunc[T] {
4024-
def onSubscribe(t1: rx.Observer[_ >: T]): rx.Subscription = {
4025-
func(Observer(t1))
4022+
Observable(
4023+
(subscriber: Subscriber[T]) => {
4024+
val s = func(subscriber)
4025+
if (s != null && s != subscriber) {
4026+
subscriber.add(s)
4027+
}
40264028
}
4027-
}))
4029+
)
40284030
}
40294031

40304032
/*
@@ -4305,7 +4307,6 @@ object Observable {
43054307
*/
43064308
def interval(duration: Duration): Observable[Long] = {
43074309
toScalaObservable[java.lang.Long](rx.Observable.interval(duration.length, duration.unit)).map(_.longValue())
4308-
/*XXX*/
43094310
}
43104311

43114312
/**
@@ -4338,7 +4339,6 @@ object Observable {
43384339
*/
43394340
def timer(initialDelay: Duration, period: Duration): Observable[Long] = {
43404341
toScalaObservable[java.lang.Long](rx.Observable.timer(initialDelay.toNanos, period.toNanos, duration.NANOSECONDS)).map(_.longValue())
4341-
/*XXX*/
43424342
}
43434343

43444344
/**

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,4 +271,19 @@ class ObservableTests extends JUnitSuite {
271271
assertEquals(expected, r)
272272
}
273273

274+
@Test
275+
def testCreate() {
276+
var called = false
277+
val o = Observable.create[String](observer => {
278+
observer.onNext("a")
279+
observer.onNext("b")
280+
observer.onNext("c")
281+
observer.onCompleted()
282+
Subscription {
283+
called = true
284+
}
285+
})
286+
assertEquals(List("a", "b", "c"), o.toBlocking.toList)
287+
assertTrue(called)
288+
}
274289
}

0 commit comments

Comments
 (0)