Skip to content

Commit e98cd16

Browse files
committed
OperatorSkipTimed
1 parent 95e0636 commit e98cd16

File tree

6 files changed

+89
-130
lines changed

6 files changed

+89
-130
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@
6464
import rx.operators.OperationReplay;
6565
import rx.operators.OperationSample;
6666
import rx.operators.OperationSequenceEqual;
67-
import rx.operators.OperationSkip;
6867
import rx.operators.OperationSkipUntil;
6968
import rx.operators.OperationSwitch;
7069
import rx.operators.OperationTakeLast;
@@ -118,6 +117,7 @@
118117
import rx.operators.OperatorSkip;
119118
import rx.operators.OperatorSkipLast;
120119
import rx.operators.OperatorSkipLastTimed;
120+
import rx.operators.OperatorSkipTimed;
121121
import rx.operators.OperatorSkipWhile;
122122
import rx.operators.OperatorSubscribeOn;
123123
import rx.operators.OperatorTake;
@@ -5547,7 +5547,7 @@ public final Observable<T> skip(long time, TimeUnit unit) {
55475547
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#wiki-skip">RxJava Wiki: skip()</a>
55485548
*/
55495549
public final Observable<T> skip(long time, TimeUnit unit, Scheduler scheduler) {
5550-
return create(new OperationSkip.SkipTimed<T>(this, time, unit, scheduler));
5550+
return lift(new OperatorSkipTimed<T>(time, unit, scheduler));
55515551
}
55525552

55535553
/**

rxjava-core/src/main/java/rx/operators/OperationSkip.java

Lines changed: 0 additions & 125 deletions
This file was deleted.
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.concurrent.TimeUnit;
19+
import java.util.concurrent.atomic.AtomicBoolean;
20+
import rx.Observable.Operator;
21+
import rx.Scheduler;
22+
import rx.Scheduler.Worker;
23+
import rx.Subscriber;
24+
import rx.functions.Action0;
25+
26+
/**
27+
* Skips elements until a specified time elapses.
28+
* @param <T> the value type
29+
*/
30+
public final class OperatorSkipTimed<T> implements Operator<T, T> {
31+
final long time;
32+
final TimeUnit unit;
33+
final Scheduler scheduler;
34+
35+
public OperatorSkipTimed(long time, TimeUnit unit, Scheduler scheduler) {
36+
this.time = time;
37+
this.unit = unit;
38+
this.scheduler = scheduler;
39+
}
40+
41+
@Override
42+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
43+
final Worker worker = scheduler.createWorker();
44+
child.add(worker);
45+
final AtomicBoolean gate = new AtomicBoolean();
46+
worker.schedule(new Action0() {
47+
@Override
48+
public void call() {
49+
gate.set(true);
50+
}
51+
}, time, unit);
52+
return new Subscriber<T>(child) {
53+
54+
@Override
55+
public void onNext(T t) {
56+
if (gate.get()) {
57+
child.onNext(t);
58+
}
59+
}
60+
61+
@Override
62+
public void onError(Throwable e) {
63+
try {
64+
child.onError(e);
65+
} finally {
66+
unsubscribe();
67+
}
68+
}
69+
70+
@Override
71+
public void onCompleted() {
72+
try {
73+
child.onCompleted();
74+
} finally {
75+
unsubscribe();
76+
}
77+
}
78+
};
79+
}
80+
}

rxjava-core/src/test/java/rx/operators/OperationTakeTimedTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929
import rx.Observable;
3030
import rx.Observer;
31-
import rx.operators.OperationSkipTest.CustomException;
31+
import rx.operators.OperatorSkipTimedTest.CustomException;
3232
import rx.schedulers.TestScheduler;
3333
import rx.subjects.PublishSubject;
3434

rxjava-core/src/test/java/rx/operators/OperatorSkipLastTimedTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929
import rx.Observable;
3030
import rx.Observer;
31-
import rx.operators.OperationSkipTest.CustomException;
31+
import rx.operators.OperatorSkipTimedTest.CustomException;
3232
import rx.schedulers.TestScheduler;
3333
import rx.subjects.PublishSubject;
3434

rxjava-core/src/test/java/rx/operators/OperationSkipTest.java renamed to rxjava-core/src/test/java/rx/operators/OperatorSkipTimedTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import rx.schedulers.TestScheduler;
3232
import rx.subjects.PublishSubject;
3333

34-
public class OperationSkipTest {
34+
public class OperatorSkipTimedTest {
3535

3636
@Test
3737
public void testSkipTimed() {
@@ -41,6 +41,7 @@ public void testSkipTimed() {
4141

4242
Observable<Integer> result = source.skip(1, TimeUnit.SECONDS, scheduler);
4343

44+
@SuppressWarnings("unchecked")
4445
Observer<Object> o = mock(Observer.class);
4546

4647
result.subscribe(o);
@@ -78,6 +79,7 @@ public void testSkipTimedFinishBeforeTime() {
7879

7980
Observable<Integer> result = source.skip(1, TimeUnit.SECONDS, scheduler);
8081

82+
@SuppressWarnings("unchecked")
8183
Observer<Object> o = mock(Observer.class);
8284

8385
result.subscribe(o);
@@ -108,6 +110,7 @@ public void testSkipTimedErrorBeforeTime() {
108110

109111
Observable<Integer> result = source.skip(1, TimeUnit.SECONDS, scheduler);
110112

113+
@SuppressWarnings("unchecked")
111114
Observer<Object> o = mock(Observer.class);
112115

113116
result.subscribe(o);
@@ -135,6 +138,7 @@ public void testSkipTimedErrorAfterTime() {
135138

136139
Observable<Integer> result = source.skip(1, TimeUnit.SECONDS, scheduler);
137140

141+
@SuppressWarnings("unchecked")
138142
Observer<Object> o = mock(Observer.class);
139143

140144
result.subscribe(o);

0 commit comments

Comments
 (0)