diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index d040306432..3800a85a21 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -343,14 +343,54 @@ class RxScalaDemo extends JUnitSuite { @Test def exampleWithReplay() { val numbers = Observable.interval(1000 millis).take(6) - val (startFunc, sharedNumbers) = numbers.replay + val sharedNumbers = numbers.replay sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n")) - startFunc() + sharedNumbers.connect // subscriber 2 subscribes later but still gets all numbers doLater(3500 millis, () => { sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n")) }) waitFor(sharedNumbers) } + @Test def exampleWithReplay2() { + val numbers = Observable.interval(100 millis).take(10) + val sharedNumbers = numbers.replay(3) + sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n")) + sharedNumbers.connect + // subscriber 2 subscribes later but only gets the 3 buffered numbers and the following numbers + Thread.sleep(700) + sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n")) + waitFor(sharedNumbers) + } + + @Test def exampleWithReplay3() { + val numbers = Observable.interval(100 millis).take(10) + val sharedNumbers = numbers.replay(300 millis) + sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n")) + sharedNumbers.connect + // subscriber 2 subscribes later but only gets the buffered numbers and the following numbers + Thread.sleep(700) + sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n")) + waitFor(sharedNumbers) + } + + @Test def exampleWithReplay4() { + val numbers = Observable.interval(100 millis).take(10) + val sharedNumbers = numbers.replay(2, 300 millis) + sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n")) + sharedNumbers.connect + // subscriber 2 subscribes later but only gets the buffered numbers and the following numbers + Thread.sleep(700) + sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n")) + waitFor(sharedNumbers) + } + + @Test def exampleWithReplay5() { + val numbers = Observable.interval(100 millis).take(10) + val sharedNumbers = numbers.replay[Long, Long]((o: Observable[Long]) => o.map(_ * 2)) + sharedNumbers.subscribe(n => println(s"subscriber gets $n")) + waitFor(sharedNumbers) + } + @Test def testSingleOption() { assertEquals(None, List(1, 2).toObservable.toBlockingObservable.singleOption) assertEquals(Some(1), List(1).toObservable.toBlockingObservable.singleOption) @@ -720,7 +760,7 @@ class RxScalaDemo extends JUnitSuite { } @Test def repeatExample1(): Unit = { - val o : Observable[String] = List("alice", "bob", "carol").toObservable.repeat().take(6) + val o : Observable[String] = List("alice", "bob", "carol").toObservable.repeat.take(6) assertEquals(List("alice", "bob", "carol", "alice", "bob", "carol"), o.toBlockingObservable.toList) } @@ -802,6 +842,21 @@ class RxScalaDemo extends JUnitSuite { } } + @Test def multicastExample1(): Unit = { + val unshared = Observable.from(1 to 4) + val shared = unshared.multicast(Subject()) + shared.subscribe(n => println(s"subscriber 1 gets $n")) + shared.subscribe(n => println(s"subscriber 2 gets $n")) + shared.connect + } + + @Test def multicastExample2(): Unit = { + val unshared = Observable.from(1 to 4) + val shared = unshared.multicast[Int, String](() => Subject(), o => o.map("No. " + _)) + shared.subscribe(n => println(s"subscriber 1 gets $n")) + shared.subscribe(n => println(s"subscriber 2 gets $n")) + } + @Test def startWithExample(): Unit = { val o1 = List(3, 4).toObservable val o2 = 1 +: 2 +: o1 diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index bec1498d1d..a156335d61 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -271,10 +271,24 @@ trait Observable[+T] * @return a pair of a start function and an [[rx.lang.scala.Observable]] such that when the start function * is called, the Observable starts to push results into the specified Subject */ - def multicast[R >: T](subject: rx.lang.scala.Subject[R]): (() => Subscription, Observable[R]) = { + def multicast[R >: T](subject: rx.lang.scala.Subject[R]): ConnectableObservable[R] = { val s: rx.subjects.Subject[_ >: T, _<: R] = subject.asJavaSubject - val javaCO: rx.observables.ConnectableObservable[R] = asJavaObservable.multicast(s) - (() => javaCO.connect(), toScalaObservable(javaCO)) + new ConnectableObservable[R](asJavaObservable.multicast(s)) + } + + /** + * Returns an Observable that emits items produced by multicasting the source Observable within a selector function. + * + * @param subjectFactory the `Subject` factory + * @param selector the selector function, which can use the multicasted source Observable subject to the policies + * enforced by the created `Subject` + * @return an Observable that emits the items produced by multicasting the source Observable within a selector function + */ + def multicast[R >: T, U](subjectFactory: () => rx.lang.scala.Subject[R], selector: Observable[R] => Observable[U]): Observable[U] = { + val subjectFactoryJava: Func0[rx.subjects.Subject[_ >: T, _ <: R]] = () => subjectFactory().asJavaSubject + val selectorJava: Func1[rx.Observable[R], rx.Observable[U]] = + (jo: rx.Observable[R]) => selector(toScalaObservable[R](jo)).asJavaObservable.asInstanceOf[rx.Observable[U]] + toScalaObservable[U](asJavaObservable.multicast[R, U](subjectFactoryJava, selectorJava)) } /** @@ -1148,9 +1162,281 @@ trait Observable[+T] * @return a pair of a start function and an [[rx.lang.scala.Observable]] such that when the start function * is called, the Observable starts to emit items to its [[rx.lang.scala.Observer]]s */ - def replay: (() => Subscription, Observable[T]) = { - val javaCO = asJavaObservable.replay() - (() => javaCO.connect(), toScalaObservable[T](javaCO)) + def replay: ConnectableObservable[T] = { + new ConnectableObservable[T](asJavaObservable.replay()) + } + + /** + * Returns an Observable that emits items that are the results of invoking a specified selector on the items + * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable. + *
+ *
+ *
+ * @param selector the selector function, which can use the multicasted sequence as many times as needed, without
+ * causing multiple subscriptions to the Observable
+ * @return an Observable that emits items that are the results of invoking the selector on a `ConnectableObservable`
+ * that shares a single subscription to the source Observable
+ */
+ def replay[U >: T, R](selector: Observable[U] => Observable[R]): Observable[R] = {
+ val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
+ val fJava: Func1[rx.Observable[U], rx.Observable[R]] =
+ (jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
+ toScalaObservable[R](thisJava.replay(fJava))
+ }
+
+ /**
+ * Returns an Observable that emits items that are the results of invoking a specified selector on items
+ * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable,
+ * replaying `bufferSize` notifications.
+ *
+ *
+ *
+ * @param selector the selector function, which can use the multicasted sequence as many times as needed, without
+ * causing multiple subscriptions to the Observable
+ * @param bufferSize the buffer size that limits the number of items the connectable observable can replay
+ * @return an Observable that emits items that are the results of invoking the selector on items emitted by
+ * a `ConnectableObservable` that shares a single subscription to the source Observable replaying
+ * no more than `bufferSize` items
+ */
+ def replay[U >: T, R](selector: Observable[U] => Observable[R], bufferSize: Int): Observable[R] = {
+ val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
+ val fJava: Func1[rx.Observable[U], rx.Observable[R]] =
+ (jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
+ toScalaObservable[R](thisJava.replay(fJava, bufferSize))
+ }
+
+ /**
+ * Returns an Observable that emits items that are the results of invoking a specified selector on items
+ * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable,
+ * replaying no more than `bufferSize` items that were emitted within a specified time window.
+ *
+ *
+ *
+ * @param selector a selector function, which can use the multicasted sequence as many times as needed, without
+ * causing multiple subscriptions to the Observable
+ * @param bufferSize the buffer size that limits the number of items the connectable observable can replay
+ * @param time the duration of the window in which the replayed items must have been emitted
+ * @return an Observable that emits items that are the results of invoking the selector on items emitted by
+ * a `ConnectableObservable` that shares a single subscription to the source Observable, and
+ * replays no more than `bufferSize` items that were emitted within the window defined by `time`
+ */
+ def replay[U >: T, R](selector: Observable[U] => Observable[R], bufferSize: Int, time: Duration): Observable[R] = {
+ val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
+ val fJava: Func1[rx.Observable[U], rx.Observable[R]] =
+ (jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
+ toScalaObservable[R](thisJava.replay(fJava, bufferSize, time.length, time.unit))
+ }
+
+ /**
+ * Returns an Observable that emits items that are the results of invoking a specified selector on items
+ * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable,
+ * replaying no more than `bufferSize` items that were emitted within a specified time window.
+ *
+ *
+ *
+ * @param selector a selector function, which can use the multicasted sequence as many times as needed, without
+ * causing multiple subscriptions to the Observable
+ * @param bufferSize the buffer size that limits the number of items the connectable observable can replay
+ * @param time the duration of the window in which the replayed items must have been emitted
+ * @param scheduler the Scheduler that is the time source for the window
+ * @return an Observable that emits items that are the results of invoking the selector on items emitted by
+ * a `ConnectableObservable` that shares a single subscription to the source Observable, and
+ * replays no more than `bufferSize` items that were emitted within the window defined by `time`
+ * @throws IllegalArgumentException if `bufferSize` is less than zero
+ */
+ def replay[U >: T, R](selector: Observable[U] => Observable[R], bufferSize: Int, time: Duration, scheduler: Scheduler): Observable[R] = {
+ val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
+ val fJava: Func1[rx.Observable[U], rx.Observable[R]] =
+ (jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
+ toScalaObservable[R](thisJava.replay(fJava, bufferSize, time.length, time.unit, scheduler))
+ }
+
+ /**
+ * Returns an Observable that emits items that are the results of invoking a specified selector on items
+ * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable,
+ * replaying a maximum of `bufferSize` items.
+ *
+ *
+ *
+ * @param selector a selector function, which can use the multicasted sequence as many times as needed, without
+ * causing multiple subscriptions to the Observable
+ * @param bufferSize the buffer size that limits the number of items the connectable observable can replay
+ * @param scheduler the Scheduler on which the replay is observed
+ * @return an Observable that emits items that are the results of invoking the selector on items emitted by
+ * a `ConnectableObservable` that shares a single subscription to the source Observable,
+ * replaying no more than `bufferSize` notifications
+ */
+ def replay[U >: T, R](selector: Observable[U] => Observable[R], bufferSize: Int, scheduler: Scheduler): Observable[R] = {
+ val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
+ val fJava: Func1[rx.Observable[U], rx.Observable[R]] =
+ (jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
+ toScalaObservable[R](thisJava.replay(fJava, bufferSize, scheduler))
+ }
+
+ /**
+ * Returns an Observable that emits items that are the results of invoking a specified selector on items
+ * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable,
+ * replaying all items that were emitted within a specified time window.
+ *
+ *
+ *
+ * @param selector a selector function, which can use the multicasted sequence as many times as needed, without
+ * causing multiple subscriptions to the Observable
+ * @param time the duration of the window in which the replayed items must have been emitted
+ * @return an Observable that emits items that are the results of invoking the selector on items emitted by
+ * a `ConnectableObservable` that shares a single subscription to the source Observable,
+ * replaying all items that were emitted within the window defined by `time`
+ */
+ def replay[U >: T, R](selector: Observable[U] => Observable[R], time: Duration, scheduler: Scheduler): Observable[R] = {
+ val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
+ val fJava: Func1[rx.Observable[U], rx.Observable[R]] =
+ (jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
+ toScalaObservable[R](thisJava.replay(fJava, time.length, time.unit, scheduler))
+ }
+
+ /**
+ * Returns an Observable that emits items that are the results of invoking a specified selector on items
+ * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable.
+ *
+ *
+ *
+ * @param selector a selector function, which can use the multicasted sequence as many times as needed, without
+ * causing multiple subscriptions to the Observable
+ * @param scheduler the Scheduler where the replay is observed
+ * @return an Observable that emits items that are the results of invoking the selector on items emitted by
+ * a `ConnectableObservable` that shares a single subscription to the source Observable,
+ * replaying all items
+ */
+ def replay[U >: T, R](selector: Observable[U] => Observable[R], scheduler: Scheduler): Observable[R] = {
+ val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
+ val fJava: Func1[rx.Observable[U], rx.Observable[R]] =
+ (jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
+ toScalaObservable[R](thisJava.replay(fJava, scheduler))
+ }
+
+ /**
+ * Returns a `ConnectableObservable` that shares a single subscription to the source Observable and
+ * replays at most `bufferSize` items that were emitted during a specified time window.
+ *
+ *
+ *
+ * @param bufferSize the buffer size that limits the number of items that can be replayed
+ * @param time the duration of the window in which the replayed items must have been emitted
+ * @return a `ConnectableObservable` that shares a single subscription to the source Observable and
+ * replays at most `bufferSize` items that were emitted during the window defined by `time`
+ */
+ def replay(bufferSize: Int, time: Duration): ConnectableObservable[T] = {
+ new ConnectableObservable[T](asJavaObservable.replay(bufferSize, time.length, time.unit))
+ }
+
+ /**
+ * Returns a `ConnectableObservable` that shares a single subscription to the source Observable and
+ * that replays a maximum of `bufferSize` items that are emitted within a specified time window.
+ *
+ *
+ *
+ * @param bufferSize the buffer size that limits the number of items that can be replayed
+ * @param time the duration of the window in which the replayed items must have been emitted
+ * @param scheduler the scheduler that is used as a time source for the window
+ * @return a `ConnectableObservable` that shares a single subscription to the source Observable and
+ * replays at most `bufferSize` items that were emitted during the window defined by `time``
+ * @throws IllegalArgumentException if `bufferSize` is less than zero
+ */
+ def replay(bufferSize: Int, time: Duration, scheduler: Scheduler): ConnectableObservable[T] = {
+ new ConnectableObservable[T](asJavaObservable.replay(bufferSize, time.length, time.unit, scheduler))
+ }
+
+ /**
+ * Returns an Observable that emits items that are the results of invoking a specified selector on items
+ * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable,
+ * replaying all items that were emitted within a specified time window.
+ *
+ *
+ *
+ * @param selector a selector function, which can use the multicasted sequence as many times as needed, without
+ * causing multiple subscriptions to the Observable
+ * @param time the duration of the window in which the replayed items must have been emitted
+ * @return an Observable that emits items that are the results of invoking the selector on items emitted by
+ * a `ConnectableObservable` that shares a single subscription to the source Observable,
+ * replaying all items that were emitted within the window defined by `time``
+ */
+ def replay[U >: T, R](selector: Observable[U] => Observable[R], time: Duration): Observable[R] = {
+ val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
+ val fJava: Func1[rx.Observable[U], rx.Observable[R]] =
+ (jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
+ toScalaObservable[R](thisJava.replay(fJava, time.length, time.unit))
+ }
+
+ /**
+ * Returns a `ConnectableObservable` that shares a single subscription to the source Observable that
+ * replays at most `bufferSize` items emitted by that Observable.
+ *
+ *
+ *
+ * @param bufferSize the buffer size that limits the number of items that can be replayed
+ * @return a `ConnectableObservable` that shares a single subscription to the source Observable and
+ * replays at most `bufferSize` items emitted by that Observable
+ */
+ def replay(bufferSize: Int): ConnectableObservable[T] = {
+ new ConnectableObservable[T](asJavaObservable.replay(bufferSize))
+ }
+
+ /**
+ * Returns a `ConnectableObservable` that shares a single subscription to the source Observable and
+ * replays at most `bufferSize` items emitted by that Observable.
+ *
+ *
+ *
+ * @param bufferSize the buffer size that limits the number of items that can be replayed
+ * @param scheduler the scheduler on which the Observers will observe the emitted items
+ * @return a `ConnectableObservable` that shares a single subscription to the source Observable and
+ * replays at most `bufferSize` items that were emitted by the Observable
+ */
+ def replay(bufferSize: Int, scheduler: Scheduler): ConnectableObservable[T] = {
+ new ConnectableObservable[T](asJavaObservable.replay(bufferSize, scheduler))
+ }
+
+ /**
+ * Returns a `ConnectableObservable` that shares a single subscription to the source Observable and
+ * replays all items emitted by that Observable within a specified time window.
+ *
+ *
+ *
+ * @param time the duration of the window in which the replayed items must have been emitted
+ * @return a `ConnectableObservable` that shares a single subscription to the source Observable and
+ * replays the items that were emitted during the window defined by `time`
+ */
+ def replay(time: Duration): ConnectableObservable[T] = {
+ new ConnectableObservable[T](asJavaObservable.replay(time.length, time.unit))
+ }
+
+ /**
+ * Returns a `ConnectableObservable` that shares a single subscription to the source Observable and
+ * replays all items emitted by that Observable within a specified time window.
+ *
+ *
+ *
+ * @param time the duration of the window in which the replayed items must have been emitted
+ * @param scheduler the Scheduler that is the time source for the window
+ * @return a `ConnectableObservable` that shares a single subscription to the source Observable and
+ * replays the items that were emitted during the window defined by `time`
+ */
+ def replay(time: Duration, scheduler: Scheduler): ConnectableObservable[T] = {
+ new ConnectableObservable[T](asJavaObservable.replay(time.length, time.unit, scheduler))
+ }
+
+ /**
+ * Returns a `ConnectableObservable` that shares a single subscription to the source Observable that
+ * will replay all of its items and notifications to any future `Observer` on the given `Scheduler`.
+ *
+ *
+ *
+ * @param scheduler the Scheduler on which the Observers will observe the emitted items
+ * @return a `ConnectableObservable` that shares a single subscription to the source Observable that
+ * will replay all of its items and notifications to any future `bserver` on the given `Scheduler`
+ */
+ def replay(scheduler: Scheduler): ConnectableObservable[T] = {
+ new ConnectableObservable[T](asJavaObservable.replay(scheduler))
}
/**
@@ -1201,7 +1487,7 @@ trait Observable[+T]
*
* @return an [[rx.lang.scala.observables.ConnectableObservable]].
*/
- def publish(): ConnectableObservable[T] = {
+ def publish: ConnectableObservable[T] = {
new ConnectableObservable[T](asJavaObservable.publish())
}
@@ -2512,7 +2798,7 @@ trait Observable[+T]
* emits [1, 2, 3, 4, 5] then the complete output would be [1, 2, 1, 2, 3, 4, 5, onCompleted].
* @return Observable with retry logic.
*/
- def retry(): Observable[T] = {
+ def retry: Observable[T] = {
toScalaObservable[T](asJavaObservable.retry())
}
@@ -2525,7 +2811,7 @@ trait Observable[+T]
* @see RxJava Wiki: repeat()
* @see MSDN: Observable.Repeat
*/
- def repeat(): Observable[T] = {
+ def repeat: Observable[T] = {
toScalaObservable[T](asJavaObservable.repeat())
}
@@ -2859,24 +3145,6 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.delaySubscription(delay.length, delay.unit, scheduler))
}
- /**
- * Returns an Observable that emits the single item at a specified index in a sequence of emissions from a
- * source Observbable.
- *
- *
- *
- * @param index
- * the zero-based index of the item to retrieve
- * @return an Observable that emits a single item: the item at the specified position in the sequence of
- * those emitted by the source Observable
- * @throws IndexOutOfBoundsException
- * if index is greater than or equal to the number of items emitted by the source
- * Observable, or index is less than 0
- * @see `Observable.elementAt`
- */
- @deprecated("Use `elementAt`", "0.18.0")
- def apply(index: Int): Observable[T] = elementAt(index)
-
/**
* Returns an Observable that emits the single item at a specified index in a sequence of emissions from a
* source Observbable.
diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala
index ddd30e7473..cd5c6a1d9a 100644
--- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala
+++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala
@@ -89,20 +89,27 @@ class CompletenessTest extends JUnitSuite {
"groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: GroupedObservable[TKey, T], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, (K, Observable[T]) => Observable[Any])",
"lift(Operator[_ <: R, _ >: T])" -> "lift(Subscriber[R] => Subscriber[T])",
"mapWithIndex(Func2[_ >: T, Integer, _ <: R])" -> "[combine `zipWithIndex` with `map` or with a for comprehension]",
+ "multicast(Subject[_ >: T, _ <: R])" -> "multicast(Subject[R])",
+ "multicast(Func0[_ <: Subject[_ >: T, _ <: TIntermediate]], Func1[_ >: Observable[TIntermediate], _ <: Observable[TResult]])" -> "multicast(() => Subject[R], Observable[R] => Observable[U])",
"onErrorResumeNext(Func1[Throwable, _ <: Observable[_ <: T]])" -> "onErrorResumeNext(Throwable => Observable[U])",
"onErrorResumeNext(Observable[_ <: T])" -> "onErrorResumeNext(Observable[U])",
"onErrorReturn(Func1[Throwable, _ <: T])" -> "onErrorReturn(Throwable => U)",
"onExceptionResumeNext(Observable[_ <: T])" -> "onExceptionResumeNext(Observable[U])",
"parallel(Func1[Observable[T], Observable[R]])" -> "parallel(Observable[T] => Observable[R])",
"parallel(Func1[Observable[T], Observable[R]], Scheduler)" -> "parallel(Observable[T] => Observable[R], Scheduler)",
- "publish()" -> "publish()",
"publish(T)" -> "publish(U)",
"publish(Func1[_ >: Observable[T], _ <: Observable[R]])" -> "publish(Observable[U] => Observable[R])",
"publish(Func1[_ >: Observable[T], _ <: Observable[R]], T)" -> "publish(Observable[U] => Observable[R], U)",
"reduce(Func2[T, T, T])" -> "reduce((U, U) => U)",
"reduce(R, Func2[R, _ >: T, R])" -> "foldLeft(R)((R, T) => R)",
- "repeat()" -> "repeat()",
- "retry()" -> "retry()",
+ "replay(Func1[_ >: Observable[T], _ <: Observable[R]])" -> "replay(Observable[U] => Observable[R])",
+ "replay(Func1[_ >: Observable[T], _ <: Observable[R]], Int)" -> "replay(Observable[U] => Observable[R], Int)",
+ "replay(Func1[_ >: Observable[T], _ <: Observable[R]], Int, Long, TimeUnit)" -> "replay(Observable[U] => Observable[R], Int, Duration)",
+ "replay(Func1[_ >: Observable[T], _ <: Observable[R]], Int, Long, TimeUnit, Scheduler)" -> "replay(Observable[U] => Observable[R], Int, Duration, Scheduler)",
+ "replay(Func1[_ >: Observable[T], _ <: Observable[R]], Int, Scheduler)" -> "replay(Observable[U] => Observable[R], Int, Scheduler)",
+ "replay(Func1[_ >: Observable[T], _ <: Observable[R]], Long, TimeUnit)" -> "replay(Observable[U] => Observable[R], Duration)",
+ "replay(Func1[_ >: Observable[T], _ <: Observable[R]], Long, TimeUnit, Scheduler)" -> "replay(Observable[U] => Observable[R], Duration, Scheduler)",
+ "replay(Func1[_ >: Observable[T], _ <: Observable[R]], Scheduler)" -> "replay(Observable[U] => Observable[R], Scheduler)",
"scan(Func2[T, T, T])" -> unnecessary,
"scan(R, Func2[R, _ >: T, R])" -> "scan(R)((R, T) => R)",
"skip(Int)" -> "drop(Int)",