Skip to content

Commit da429ea

Browse files
Merge pull request ReactiveX#414 from zsxwing/skip-last
Implemented the 'SkipLast' operator
2 parents 3956ca2 + b08b0d9 commit da429ea

File tree

2 files changed

+249
-0
lines changed

2 files changed

+249
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import rx.operators.OperationSample;
6464
import rx.operators.OperationScan;
6565
import rx.operators.OperationSkip;
66+
import rx.operators.OperationSkipLast;
6667
import rx.operators.OperationSkipWhile;
6768
import rx.operators.OperationSubscribeOn;
6869
import rx.operators.OperationSum;
@@ -4079,6 +4080,30 @@ public Observable<T> skipWhile(Func1<? super T, Boolean> predicate) {
40794080
return create(OperationSkipWhile.skipWhile(this, predicate));
40804081
}
40814082

4083+
/**
4084+
* Bypasses a specified number of elements at the end of an observable
4085+
* sequence.
4086+
* <p>
4087+
* This operator accumulates a queue with a length enough to store the first
4088+
* count elements. As more elements are received, elements are taken from
4089+
* the front of the queue and produced on the result sequence. This causes
4090+
* elements to be delayed.
4091+
*
4092+
* @param count
4093+
* number of elements to bypass at the end of the source
4094+
* sequence.
4095+
* @return An observable sequence containing the source sequence elements
4096+
* except for the bypassed ones at the end.
4097+
*
4098+
* @throws IndexOutOfBoundsException
4099+
* count is less than zero.
4100+
*
4101+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211750(v=vs.103).aspx">MSDN: Observable.SkipLast</a>
4102+
*/
4103+
public Observable<T> skipLast(int count) {
4104+
return create(OperationSkipLast.skipLast(this, count));
4105+
}
4106+
40824107
/**
40834108
* Returns an Observable that emits a single item, a list composed of all the items emitted by
40844109
* the source Observable.
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.mockito.Matchers.any;
19+
import static org.mockito.Mockito.inOrder;
20+
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.never;
22+
import static org.mockito.Mockito.times;
23+
import static org.mockito.Mockito.verify;
24+
25+
import java.util.Deque;
26+
import java.util.LinkedList;
27+
import java.util.concurrent.locks.ReentrantLock;
28+
29+
import org.junit.Test;
30+
import org.mockito.InOrder;
31+
32+
import rx.Observable;
33+
import rx.Observable.OnSubscribeFunc;
34+
import rx.Observer;
35+
import rx.Subscription;
36+
37+
/**
38+
* Bypasses a specified number of elements at the end of an observable sequence.
39+
*/
40+
public class OperationSkipLast {
41+
42+
/**
43+
* Bypasses a specified number of elements at the end of an observable
44+
* sequence.
45+
* <p>
46+
* This operator accumulates a queue with a length enough to store the first
47+
* count elements. As more elements are received, elements are taken from
48+
* the front of the queue and produced on the result sequence. This causes
49+
* elements to be delayed.
50+
*
51+
* @param source
52+
* the source sequence.
53+
* @param count
54+
* number of elements to bypass at the end of the source
55+
* sequence.
56+
* @return An observable sequence containing the source sequence elements
57+
* except for the bypassed ones at the end.
58+
*
59+
* @throws IndexOutOfBoundsException
60+
* count is less than zero.
61+
*/
62+
public static <T> OnSubscribeFunc<T> skipLast(
63+
Observable<? extends T> source, int count) {
64+
return new SkipLast<T>(source, count);
65+
}
66+
67+
private static class SkipLast<T> implements OnSubscribeFunc<T> {
68+
private final int count;
69+
private final Observable<? extends T> source;
70+
71+
private SkipLast(Observable<? extends T> source, int count) {
72+
this.count = count;
73+
this.source = source;
74+
}
75+
76+
public Subscription onSubscribe(final Observer<? super T> observer) {
77+
if (count < 0) {
78+
throw new IndexOutOfBoundsException(
79+
"count could not be negative");
80+
}
81+
final SafeObservableSubscription subscription = new SafeObservableSubscription();
82+
return subscription.wrap(source.subscribe(new Observer<T>() {
83+
84+
private final ReentrantLock lock = new ReentrantLock();
85+
86+
/**
87+
* Store the last count elements until now.
88+
*/
89+
private final Deque<T> deque = new LinkedList<T>();
90+
91+
@Override
92+
public void onCompleted() {
93+
observer.onCompleted();
94+
}
95+
96+
@Override
97+
public void onError(Throwable e) {
98+
observer.onError(e);
99+
}
100+
101+
@Override
102+
public void onNext(T value) {
103+
if (count == 0) {
104+
// If count == 0, we do not need to put value into deque
105+
// and remove it at once. We can emit the value
106+
// directly.
107+
try {
108+
observer.onNext(value);
109+
} catch (Throwable ex) {
110+
observer.onError(ex);
111+
subscription.unsubscribe();
112+
}
113+
return;
114+
}
115+
lock.lock();
116+
try {
117+
deque.offerLast(value);
118+
if (deque.size() > count) {
119+
// Now deque has count + 1 elements, so the first
120+
// element in the deque definitely does not belong
121+
// to the last count elements of the source
122+
// sequence. We can emit it now.
123+
observer.onNext(deque.removeFirst());
124+
}
125+
} catch (Throwable ex) {
126+
observer.onError(ex);
127+
subscription.unsubscribe();
128+
} finally {
129+
lock.unlock();
130+
}
131+
}
132+
133+
}));
134+
}
135+
}
136+
137+
public static class UnitTest {
138+
139+
@Test
140+
public void testSkipLastEmpty() {
141+
Observable<String> w = Observable.empty();
142+
Observable<String> observable = Observable.create(skipLast(w, 2));
143+
144+
@SuppressWarnings("unchecked")
145+
Observer<String> aObserver = mock(Observer.class);
146+
observable.subscribe(aObserver);
147+
verify(aObserver, never()).onNext(any(String.class));
148+
verify(aObserver, never()).onError(any(Throwable.class));
149+
verify(aObserver, times(1)).onCompleted();
150+
}
151+
152+
@Test
153+
public void testSkipLast1() {
154+
Observable<String> w = Observable.from("one", "two", "three");
155+
Observable<String> observable = Observable.create(skipLast(w, 2));
156+
157+
@SuppressWarnings("unchecked")
158+
Observer<String> aObserver = mock(Observer.class);
159+
InOrder inOrder = inOrder(aObserver);
160+
observable.subscribe(aObserver);
161+
inOrder.verify(aObserver, never()).onNext("two");
162+
inOrder.verify(aObserver, never()).onNext("three");
163+
verify(aObserver, times(1)).onNext("one");
164+
verify(aObserver, never()).onError(any(Throwable.class));
165+
verify(aObserver, times(1)).onCompleted();
166+
}
167+
168+
@Test
169+
public void testSkipLast2() {
170+
Observable<String> w = Observable.from("one", "two");
171+
Observable<String> observable = Observable.create(skipLast(w, 2));
172+
173+
@SuppressWarnings("unchecked")
174+
Observer<String> aObserver = mock(Observer.class);
175+
observable.subscribe(aObserver);
176+
verify(aObserver, never()).onNext(any(String.class));
177+
verify(aObserver, never()).onError(any(Throwable.class));
178+
verify(aObserver, times(1)).onCompleted();
179+
}
180+
181+
@Test
182+
public void testSkipLastWithZeroCount() {
183+
Observable<String> w = Observable.from("one", "two");
184+
Observable<String> observable = Observable.create(skipLast(w, 0));
185+
186+
@SuppressWarnings("unchecked")
187+
Observer<String> aObserver = mock(Observer.class);
188+
observable.subscribe(aObserver);
189+
verify(aObserver, times(1)).onNext("one");
190+
verify(aObserver, times(1)).onNext("two");
191+
verify(aObserver, never()).onError(any(Throwable.class));
192+
verify(aObserver, times(1)).onCompleted();
193+
}
194+
195+
@Test
196+
public void testSkipLastWithNull() {
197+
Observable<String> w = Observable.from("one", null, "two");
198+
Observable<String> observable = Observable.create(skipLast(w, 1));
199+
200+
@SuppressWarnings("unchecked")
201+
Observer<String> aObserver = mock(Observer.class);
202+
observable.subscribe(aObserver);
203+
verify(aObserver, times(1)).onNext("one");
204+
verify(aObserver, times(1)).onNext(null);
205+
verify(aObserver, never()).onNext("two");
206+
verify(aObserver, never()).onError(any(Throwable.class));
207+
verify(aObserver, times(1)).onCompleted();
208+
}
209+
210+
@Test
211+
public void testSkipLastWithNegativeCount() {
212+
Observable<String> w = Observable.from("one");
213+
Observable<String> observable = Observable.create(skipLast(w, -1));
214+
215+
@SuppressWarnings("unchecked")
216+
Observer<String> aObserver = mock(Observer.class);
217+
observable.subscribe(aObserver);
218+
verify(aObserver, never()).onNext(any(String.class));
219+
verify(aObserver, times(1)).onError(
220+
any(IndexOutOfBoundsException.class));
221+
verify(aObserver, never()).onCompleted();
222+
}
223+
}
224+
}

0 commit comments

Comments
 (0)