Skip to content

Commit 1383d3d

Browse files
committed
Operator TakeTimed
1 parent 95e0636 commit 1383d3d

File tree

4 files changed

+87
-292
lines changed

4 files changed

+87
-292
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@
6868
import rx.operators.OperationSkipUntil;
6969
import rx.operators.OperationSwitch;
7070
import rx.operators.OperationTakeLast;
71-
import rx.operators.OperationTakeTimed;
7271
import rx.operators.OperationTakeUntil;
7372
import rx.operators.OperationTakeWhile;
7473
import rx.operators.OperationThrottleFirst;
@@ -121,6 +120,7 @@
121120
import rx.operators.OperatorSkipWhile;
122121
import rx.operators.OperatorSubscribeOn;
123122
import rx.operators.OperatorTake;
123+
import rx.operators.OperatorTakeTimed;
124124
import rx.operators.OperatorTimeout;
125125
import rx.operators.OperatorTimeoutWithSelector;
126126
import rx.operators.OperatorTimestamp;
@@ -6447,7 +6447,7 @@ public final Observable<T> take(long time, TimeUnit unit) {
64476447
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#wiki-take">RxJava Wiki: take()</a>
64486448
*/
64496449
public final Observable<T> take(long time, TimeUnit unit, Scheduler scheduler) {
6450-
return create(new OperationTakeTimed.TakeTimed<T>(this, time, unit, scheduler));
6450+
return lift(new OperatorTakeTimed<T>(time, unit, scheduler));
64516451
}
64526452

64536453
/**

rxjava-core/src/main/java/rx/operators/OperationTakeTimed.java

Lines changed: 0 additions & 289 deletions
This file was deleted.
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.concurrent.TimeUnit;
19+
import rx.Observable.Operator;
20+
import rx.Scheduler;
21+
import rx.Scheduler.Worker;
22+
import rx.Subscriber;
23+
import rx.functions.Action0;
24+
import rx.observers.SerializedSubscriber;
25+
26+
/**
27+
* Takes values from the source until the specific time ellapses.
28+
*
29+
* @param <T>
30+
* the result value type
31+
*/
32+
public final class OperatorTakeTimed<T> implements Operator<T, T> {
33+
final long time;
34+
final TimeUnit unit;
35+
final Scheduler scheduler;
36+
37+
public OperatorTakeTimed(long time, TimeUnit unit, Scheduler scheduler) {
38+
this.time = time;
39+
this.unit = unit;
40+
this.scheduler = scheduler;
41+
}
42+
43+
@Override
44+
public Subscriber<? super T> call(Subscriber<? super T> child) {
45+
Worker worker = scheduler.createWorker();
46+
child.add(worker);
47+
48+
TakeSubscriber<T> ts = new TakeSubscriber<T>(new SerializedSubscriber<T>(child));
49+
worker.schedule(ts, time, unit);
50+
return ts;
51+
}
52+
/** Subscribed to source and scheduled on a worker. */
53+
static final class TakeSubscriber<T> extends Subscriber<T> implements Action0 {
54+
final Subscriber<? super T> child;
55+
public TakeSubscriber(Subscriber<? super T> child) {
56+
super(child);
57+
this.child = child;
58+
}
59+
60+
@Override
61+
public void onNext(T t) {
62+
child.onNext(t);
63+
}
64+
65+
@Override
66+
public void onError(Throwable e) {
67+
child.onError(e);
68+
unsubscribe();
69+
}
70+
71+
@Override
72+
public void onCompleted() {
73+
child.onCompleted();
74+
unsubscribe();
75+
}
76+
77+
@Override
78+
public void call() {
79+
onCompleted();
80+
}
81+
82+
83+
}
84+
}

0 commit comments

Comments
 (0)