Skip to content

Commit 96f4b66

Browse files
akarnokdakarnokd
authored andcommitted
OperatorMulticastAndReplay
1 parent 95e0636 commit 96f4b66

File tree

7 files changed

+259
-254
lines changed

7 files changed

+259
-254
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,11 @@
5656
import rx.operators.OperationJoin;
5757
import rx.operators.OperationMergeDelayError;
5858
import rx.operators.OperationMergeMaxConcurrent;
59-
import rx.operators.OperationMulticast;
6059
import rx.operators.OperationOnErrorResumeNextViaObservable;
6160
import rx.operators.OperationOnErrorReturn;
6261
import rx.operators.OperationOnExceptionResumeNextViaObservable;
6362
import rx.operators.OperationParallelMerge;
64-
import rx.operators.OperationReplay;
63+
import rx.operators.OperatorReplay;
6564
import rx.operators.OperationSample;
6665
import rx.operators.OperationSequenceEqual;
6766
import rx.operators.OperationSkip;
@@ -105,6 +104,8 @@
105104
import rx.operators.OperatorMerge;
106105
import rx.operators.OperatorMergeMapPair;
107106
import rx.operators.OperatorMergeMapTransform;
107+
import rx.operators.OperatorMulticast;
108+
import rx.operators.OperatorMulticastSelector;
108109
import rx.operators.OperatorObserveOn;
109110
import rx.operators.OperatorOnErrorFlatMap;
110111
import rx.operators.OperatorOnErrorResumeNextViaFunction;
@@ -4419,7 +4420,7 @@ public final <U, R> Observable<R> mergeMapIterable(Func1<? super T, ? extends It
44194420
public final <TIntermediate, TResult> Observable<TResult> multicast(
44204421
final Func0<? extends Subject<? super T, ? extends TIntermediate>> subjectFactory,
44214422
final Func1<? super Observable<TIntermediate>, ? extends Observable<TResult>> selector) {
4422-
return OperationMulticast.multicast(this, subjectFactory, selector);
4423+
return create(new OperatorMulticastSelector<T, TIntermediate, TResult>(this, subjectFactory, selector));
44234424
}
44244425

44254426
/**
@@ -4436,7 +4437,7 @@ public final <TIntermediate, TResult> Observable<TResult> multicast(
44364437
* Observable.multicast()</a>
44374438
*/
44384439
public final <R> ConnectableObservable<R> multicast(Subject<? super T, ? extends R> subject) {
4439-
return OperationMulticast.multicast(this, subject);
4440+
return new OperatorMulticast<T, R>(this, subject);
44404441
}
44414442

44424443
/**
@@ -4641,7 +4642,7 @@ public final <R> Observable<R> parallel(final Func1<Observable<T>, Observable<R>
46414642
* @see <a href="https://github.com/Netflix/RxJava/wiki/Connectable-Observable-Operators#wiki-observablepublish-and-observablemulticast">RxJava Wiki: publish()</a>
46424643
*/
46434644
public final ConnectableObservable<T> publish() {
4644-
return OperationMulticast.multicast(this, PublishSubject.<T> create());
4645+
return new OperatorMulticast<T, T>(this, PublishSubject.<T> create());
46454646
}
46464647

46474648
/**
@@ -4704,7 +4705,7 @@ public final Subject<T, T> call() {
47044705
* and starts with {@code initialValue}
47054706
*/
47064707
public final ConnectableObservable<T> publish(T initialValue) {
4707-
return OperationMulticast.multicast(this, BehaviorSubject.<T> create(initialValue));
4708+
return new OperatorMulticast<T, T>(this, BehaviorSubject.<T> create(initialValue));
47084709
}
47094710

47104711
/**
@@ -4716,7 +4717,7 @@ public final ConnectableObservable<T> publish(T initialValue) {
47164717
* @see <a href="https://github.com/Netflix/RxJava/wiki/Connectable-Observable-Operators#wiki-observablepublishlast">RxJava Wiki: publishLast()</a>
47174718
*/
47184719
public final ConnectableObservable<T> publishLast() {
4719-
return OperationMulticast.multicast(this, AsyncSubject.<T> create());
4720+
return new OperatorMulticast<T, T>(this, AsyncSubject.<T> create());
47204721
}
47214722

47224723
/**
@@ -4879,7 +4880,7 @@ public final Observable<T> repeat(long count, Scheduler scheduler) {
48794880
* @see <a href="https://github.com/Netflix/RxJava/wiki/Connectable-Observable-Operators#wiki-observablereplay">RxJava Wiki: replay()</a>
48804881
*/
48814882
public final ConnectableObservable<T> replay() {
4882-
return OperationMulticast.multicast(this, ReplaySubject.<T> create());
4883+
return new OperatorMulticast<T, T>(this, ReplaySubject.<T> create());
48834884
}
48844885

48854886
/**
@@ -4898,12 +4899,12 @@ public final ConnectableObservable<T> replay() {
48984899
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229653.aspx">MSDN: Observable.Replay</a>
48994900
*/
49004901
public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector) {
4901-
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
4902+
return create(new OperatorMulticastSelector<T, T, R>(this, new Func0<Subject<T, T>>() {
49024903
@Override
49034904
public final Subject<T, T> call() {
49044905
return ReplaySubject.create();
49054906
}
4906-
}, selector);
4907+
}, selector));
49074908
}
49084909

49094910
/**
@@ -4927,12 +4928,12 @@ public final Subject<T, T> call() {
49274928
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211675.aspx">MSDN: Observable.Replay</a>
49284929
*/
49294930
public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final int bufferSize) {
4930-
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
4931+
return create(new OperatorMulticastSelector<T, T, R>(this, new Func0<Subject<T, T>>() {
49314932
@Override
49324933
public final Subject<T, T> call() {
4933-
return OperationReplay.replayBuffered(bufferSize);
4934+
return OperatorReplay.replayBuffered(bufferSize);
49344935
}
4935-
}, selector);
4936+
}, selector));
49364937
}
49374938

49384939
/**
@@ -4995,12 +4996,12 @@ public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Obs
49954996
if (bufferSize < 0) {
49964997
throw new IllegalArgumentException("bufferSize < 0");
49974998
}
4998-
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
4999+
return create(new OperatorMulticastSelector<T, T, R>(this, new Func0<Subject<T, T>>() {
49995000
@Override
50005001
public final Subject<T, T> call() {
5001-
return OperationReplay.replayWindowed(time, unit, bufferSize, scheduler);
5002+
return OperatorReplay.replayWindowed(time, unit, bufferSize, scheduler);
50025003
}
5003-
}, selector);
5004+
}, selector));
50045005
}
50055006

50065007
/**
@@ -5026,12 +5027,12 @@ public final Subject<T, T> call() {
50265027
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229928.aspx">MSDN: Observable.Replay</a>
50275028
*/
50285029
public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final int bufferSize, final Scheduler scheduler) {
5029-
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
5030+
return create(new OperatorMulticastSelector<T, T, R>(this, new Func0<Subject<T, T>>() {
50305031
@Override
50315032
public final Subject<T, T> call() {
5032-
return OperationReplay.<T> createScheduledSubject(OperationReplay.<T> replayBuffered(bufferSize), scheduler);
5033+
return OperatorReplay.<T> createScheduledSubject(OperatorReplay.<T> replayBuffered(bufferSize), scheduler);
50335034
}
5034-
}, selector);
5035+
}, selector));
50355036
}
50365037

50375038
/**
@@ -5085,12 +5086,12 @@ public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Obs
50855086
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244327.aspx">MSDN: Observable.Replay</a>
50865087
*/
50875088
public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final long time, final TimeUnit unit, final Scheduler scheduler) {
5088-
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
5089+
return create(new OperatorMulticastSelector<T, T, R>(this, new Func0<Subject<T, T>>() {
50895090
@Override
50905091
public final Subject<T, T> call() {
5091-
return OperationReplay.replayWindowed(time, unit, -1, scheduler);
5092+
return OperatorReplay.replayWindowed(time, unit, -1, scheduler);
50925093
}
5093-
}, selector);
5094+
}, selector));
50945095
}
50955096

50965097
/**
@@ -5113,12 +5114,12 @@ public final Subject<T, T> call() {
51135114
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211644.aspx">MSDN: Observable.Replay</a>
51145115
*/
51155116
public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final Scheduler scheduler) {
5116-
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
5117+
return create(new OperatorMulticastSelector<T, T, R>(this, new Func0<Subject<T, T>>() {
51175118
@Override
51185119
public final Subject<T, T> call() {
5119-
return OperationReplay.createScheduledSubject(ReplaySubject.<T> create(), scheduler);
5120+
return OperatorReplay.createScheduledSubject(ReplaySubject.<T> create(), scheduler);
51205121
}
5121-
}, selector);
5122+
}, selector));
51225123
}
51235124

51245125
/**
@@ -5135,7 +5136,7 @@ public final Subject<T, T> call() {
51355136
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211976.aspx">MSDN: Observable.Replay</a>
51365137
*/
51375138
public final ConnectableObservable<T> replay(int bufferSize) {
5138-
return OperationMulticast.multicast(this, OperationReplay.<T> replayBuffered(bufferSize));
5139+
return new OperatorMulticast<T, T>(this, OperatorReplay.<T> replayBuffered(bufferSize));
51395140
}
51405141

51415142
/**
@@ -5184,7 +5185,7 @@ public final ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit
51845185
if (bufferSize < 0) {
51855186
throw new IllegalArgumentException("bufferSize < 0");
51865187
}
5187-
return OperationMulticast.multicast(this, OperationReplay.<T> replayWindowed(time, unit, bufferSize, scheduler));
5188+
return new OperatorMulticast<T, T>(this, OperatorReplay.<T> replayWindowed(time, unit, bufferSize, scheduler));
51885189
}
51895190

51905191
/**
@@ -5203,9 +5204,9 @@ public final ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit
52035204
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229814.aspx">MSDN: Observable.Replay</a>
52045205
*/
52055206
public final ConnectableObservable<T> replay(int bufferSize, Scheduler scheduler) {
5206-
return OperationMulticast.multicast(this,
5207-
OperationReplay.createScheduledSubject(
5208-
OperationReplay.<T> replayBuffered(bufferSize), scheduler));
5207+
return new OperatorMulticast<T, T>(this,
5208+
OperatorReplay.createScheduledSubject(
5209+
OperatorReplay.<T> replayBuffered(bufferSize), scheduler));
52095210
}
52105211

52115212
/**
@@ -5245,7 +5246,7 @@ public final ConnectableObservable<T> replay(long time, TimeUnit unit) {
52455246
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211811.aspx">MSDN: Observable.Replay</a>
52465247
*/
52475248
public final ConnectableObservable<T> replay(long time, TimeUnit unit, Scheduler scheduler) {
5248-
return OperationMulticast.multicast(this, OperationReplay.<T> replayWindowed(time, unit, -1, scheduler));
5249+
return new OperatorMulticast<T, T>(this, OperatorReplay.<T> replayWindowed(time, unit, -1, scheduler));
52495250
}
52505251

52515252
/**
@@ -5262,7 +5263,7 @@ public final ConnectableObservable<T> replay(long time, TimeUnit unit, Scheduler
52625263
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211699.aspx">MSDN: Observable.Replay</a>
52635264
*/
52645265
public final ConnectableObservable<T> replay(Scheduler scheduler) {
5265-
return OperationMulticast.multicast(this, OperationReplay.createScheduledSubject(ReplaySubject.<T> create(), scheduler));
5266+
return new OperatorMulticast<T, T>(this, OperatorReplay.createScheduledSubject(ReplaySubject.<T> create(), scheduler));
52665267
}
52675268

52685269
/**

rxjava-core/src/main/java/rx/operators/OperationMulticast.java

Lines changed: 0 additions & 148 deletions
This file was deleted.

0 commit comments

Comments
 (0)