Skip to content

Commit 9b205ee

Browse files
Merge pull request #1172 from akarnokd/ObserveOnBatchDequeue
ObserveOn: Change to batch dequeue.
2 parents 570a8f9 + e6ae50b commit 9b205ee

File tree

1 file changed

+47
-8
lines changed

1 file changed

+47
-8
lines changed

rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
*/
1616
package rx.operators;
1717

18-
import java.util.concurrent.ConcurrentLinkedQueue;
18+
import java.util.ArrayList;
19+
import java.util.List;
1920
import java.util.concurrent.atomic.AtomicLong;
2021

2122
import rx.Observable.Operator;
@@ -60,7 +61,7 @@ private static class ObserveOnSubscriber<T> extends Subscriber<T> {
6061
final Subscriber<? super T> observer;
6162
private final Scheduler.Worker recursiveScheduler;
6263

63-
private final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>();
64+
private FastList queue = new FastList();
6465
final AtomicLong counter = new AtomicLong(0);
6566

6667
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> subscriber) {
@@ -72,19 +73,25 @@ public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> subscriber
7273

7374
@Override
7475
public void onNext(final T t) {
75-
queue.offer(on.next(t));
76+
synchronized (this) {
77+
queue.add(on.next(t));
78+
}
7679
schedule();
7780
}
7881

7982
@Override
8083
public void onCompleted() {
81-
queue.offer(on.completed());
84+
synchronized (this) {
85+
queue.add(on.completed());
86+
}
8287
schedule();
8388
}
8489

8590
@Override
8691
public void onError(final Throwable e) {
87-
queue.offer(on.error(e));
92+
synchronized (this) {
93+
queue.add(on.error(e));
94+
}
8895
schedule();
8996
}
9097

@@ -103,11 +110,43 @@ public void call() {
103110

104111
private void pollQueue() {
105112
do {
106-
Object v = queue.poll();
107-
on.accept(observer, v);
108-
} while (counter.decrementAndGet() > 0);
113+
FastList vs;
114+
synchronized (this) {
115+
vs = queue;
116+
queue = new FastList();
117+
}
118+
for (Object v : vs.array) {
119+
if (v == null) {
120+
break;
121+
}
122+
on.accept(observer, v);
123+
}
124+
if (counter.addAndGet(-vs.size) == 0) {
125+
break;
126+
}
127+
} while (true);
109128
}
110129

111130
}
112131

132+
static final class FastList {
133+
Object[] array;
134+
int size;
135+
136+
public void add(Object o) {
137+
int s = size;
138+
Object[] a = array;
139+
if (a == null) {
140+
a = new Object[16];
141+
array = a;
142+
} else if (s == a.length) {
143+
Object[] array2 = new Object[s + (s >> 2)];
144+
System.arraycopy(a, 0, array2, 0, s);
145+
a = array2;
146+
array = a;
147+
}
148+
a[s] = o;
149+
size = s + 1;
150+
}
151+
}
113152
}

0 commit comments

Comments
 (0)