diff --git a/rxjava-core/src/main/java/rx/observers/MpscPaddedQueue.java b/rxjava-core/src/main/java/rx/observers/MpscPaddedQueue.java new file mode 100644 index 0000000000..3a5aa90fd4 --- /dev/null +++ b/rxjava-core/src/main/java/rx/observers/MpscPaddedQueue.java @@ -0,0 +1,129 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.observers; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +/** + * A multiple-producer single consumer queue implementation with padded reference + * to tail to avoid cache-line thrashing. + * Based on Netty's MpscQueue implementation but using AtomicReferenceFieldUpdater + * instead of Unsafe. + * @param the element type + */ +public final class MpscPaddedQueue extends AtomicReference> { + @SuppressWarnings(value = "rawtypes") + static final AtomicReferenceFieldUpdater TAIL_UPDATER = AtomicReferenceFieldUpdater.newUpdater(PaddedNode.class, Node.class, "tail"); + /** */ + private static final long serialVersionUID = 1L; + /** The padded tail reference. */ + final PaddedNode tail; + /** + * Initializes the empty queue. + */ + public MpscPaddedQueue() { + Node first = new Node(null); + tail = new PaddedNode(); + tail.tail = first; + set(first); + } + /** + * Offer a new value. + * @param v the value to offer + */ + public void offer(E v) { + Node n = new Node(v); + getAndSet(n).set(n); + } + + /** + * @return Poll a value from the head of the queue or return null if the queue is empty. + */ + public E poll() { + Node n = peekNode(); + if (n == null) { + return null; + } + E v = n.value; + n.value = null; // do not retain this value as the node still stays in the queue + TAIL_UPDATER.lazySet(tail, n); + return v; + } + /** + * Check if there is a node available without changing anything. + */ + private Node peekNode() { + for (;;) { + @SuppressWarnings(value = "unchecked") + Node t = TAIL_UPDATER.get(tail); + Node n = t.get(); + if (n != null || get() == t) { + return n; + } + } + } + /** + * Clears the queue. + */ + public void clear() { + for (;;) { + if (poll() == null) { + break; + } + } + } + /** Class that contains a Node reference padded around to fit a typical cache line. */ + static final class PaddedNode { + /** Padding, public to prevent optimizing it away. */ + public int p1; + volatile Node tail; + /** Padding, public to prevent optimizing it away. */ + public long p2; + /** Padding, public to prevent optimizing it away. */ + public long p3; + /** Padding, public to prevent optimizing it away. */ + public long p4; + /** Padding, public to prevent optimizing it away. */ + public long p5; + /** Padding, public to prevent optimizing it away. */ + public long p6; + } + /** + * Regular node with value and reference to the next node. + */ + static final class Node { + + E value; + @SuppressWarnings(value = "rawtypes") + static final AtomicReferenceFieldUpdater TAIL_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "tail"); + volatile Node tail; + + public Node(E value) { + this.value = value; + } + + public void set(Node newTail) { + TAIL_UPDATER.lazySet(this, newTail); + } + + @SuppressWarnings(value = "unchecked") + public Node get() { + return TAIL_UPDATER.get(this); + } + } + +} diff --git a/rxjava-core/src/main/java/rx/observers/PaddedAtomicInteger.java b/rxjava-core/src/main/java/rx/observers/PaddedAtomicInteger.java new file mode 100644 index 0000000000..b00c16b881 --- /dev/null +++ b/rxjava-core/src/main/java/rx/observers/PaddedAtomicInteger.java @@ -0,0 +1,56 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.observers; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * An AtomicInteger with extra fields to pad it out to fit a typical cache line. + */ +public final class PaddedAtomicInteger extends AtomicInteger { + private static final long serialVersionUID = 1L; + /** Padding, public to prevent optimizing it away. */ + public int p1; + /** Padding, public to prevent optimizing it away. */ + public int p2; + /** Padding, public to prevent optimizing it away. */ + public int p3; + /** Padding, public to prevent optimizing it away. */ + public int p4; + /** Padding, public to prevent optimizing it away. */ + public int p5; + /** Padding, public to prevent optimizing it away. */ + public int p6; + /** Padding, public to prevent optimizing it away. */ + public int p7; + /** Padding, public to prevent optimizing it away. */ + public int p8; + /** Padding, public to prevent optimizing it away. */ + public int p9; + /** Padding, public to prevent optimizing it away. */ + public int p10; + /** Padding, public to prevent optimizing it away. */ + public int p11; + /** Padding, public to prevent optimizing it away. */ + public int p12; + /** Padding, public to prevent optimizing it away. */ + public int p13; + /** @return prevents optimizing away the fields, most likely. */ + public int noopt() { + return p1 + p2 + p3 + p4 + p5 + p6 + p7 + p8 + p9 + p10 + p11 + p12 + p13; + } + +} diff --git a/rxjava-core/src/main/java/rx/observers/SerializedObserver.java b/rxjava-core/src/main/java/rx/observers/SerializedObserver.java index 0572fddb07..9e1e0ac0e8 100644 --- a/rxjava-core/src/main/java/rx/observers/SerializedObserver.java +++ b/rxjava-core/src/main/java/rx/observers/SerializedObserver.java @@ -15,7 +15,9 @@ */ package rx.observers; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import rx.Observer; +import rx.operators.NotificationLite; /** * Enforce single-threaded, serialized, ordered execution of onNext, onCompleted, onError. @@ -29,178 +31,146 @@ * * @param */ -public class SerializedObserver implements Observer { - private final Observer actual; - - private boolean emitting = false; - private boolean terminated = false; - private FastList queue; - - private static final int MAX_DRAIN_ITERATION = Integer.MAX_VALUE; - private static final Object NULL_SENTINEL = new Object(); - private static final Object COMPLETE_SENTINEL = new Object(); - - static final class FastList { - Object[] array; - int size; - - public void add(Object o) { - int s = size; - Object[] a = array; - if (a == null) { - a = new Object[16]; - array = a; - } else if (s == a.length) { - Object[] array2 = new Object[s + (s >> 2)]; - System.arraycopy(a, 0, array2, 0, s); - a = array2; - array = a; - } - a[s] = o; - size = s + 1; - } - } - - private static final class ErrorSentinel { - final Throwable e; - - ErrorSentinel(Throwable e) { - this.e = e; - } - } - +public final class SerializedObserver implements Observer { + /** The actual observer that receives the values. */ + private final Observer actual; + /** The queue in case of concurrent access. */ + final MpscPaddedQueue queue; + /** Number of queued events. */ + final PaddedAtomicInteger wip; + /** + * Adaptively enable and disable fast path. 0 means fast path enabled, 1 means fast path disabled, + * 2 means the observer is terminated. + */ + volatile int fastpath; + /** + * Atomic updater for the fastpath variable. + */ + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater FASTPATH_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(SerializedObserver.class, "fastpath"); + /** Lightweight notification transformation. */ + static final NotificationLite nl = NotificationLite.instance(); + /** + * Constructor, takes the actual observer and initializes the queue and + * work counters. + * @param s the actual observer to wrap + */ + @SuppressWarnings("unchecked") public SerializedObserver(Observer s) { - this.actual = s; + this.actual = (Observer)s; + this.queue = new MpscPaddedQueue(); + this.wip = new PaddedAtomicInteger(); } @Override public void onCompleted() { - FastList list; - synchronized (this) { - if (terminated) { + int n = fastpath; + // let's try the fast path + if (n == 0 && wip.compareAndSet(0, 1)) { + FASTPATH_UPDATER.lazySet(this, 2); + actual.onCompleted(); + // just return since nooen else will be able to + // put anything else into the queue at this point + return; + } + if (n == 2) { + return; + } + queue.offer(nl.completed()); + if (wip.getAndIncrement() == 0) { + n = fastpath; // re-read fastpath + if (n == 2) { + queue.clear(); return; } - terminated = true; - if (emitting) { - if (queue == null) { - queue = new FastList(); + FASTPATH_UPDATER.lazySet(this, 2); + do { + if (nl.accept2(actual, queue.poll())) { + queue.clear(); + return; } - queue.add(COMPLETE_SENTINEL); - return; - } - emitting = true; - list = queue; - queue = null; + } while (wip.decrementAndGet() > 0); } - drainQueue(list); - actual.onCompleted(); } @Override public void onError(final Throwable e) { - FastList list; - synchronized (this) { - if (terminated) { + int n = fastpath; + // let's try the fast path + if (n == 0 && wip.compareAndSet(0, 1)) { + FASTPATH_UPDATER.lazySet(this, 2); + actual.onError(e); + // just return since nooen else will be able to + // put anything else into the queue at this point + return; + } + // or are we terminated? + if (n == 2) { + return; + } + queue.offer(nl.error(e)); + if (wip.getAndIncrement() == 0) { + n = fastpath; // re-read fastpath + if (n == 2) { // are we terminated now? + queue.clear(); return; } - terminated = true; - if (emitting) { - if (queue == null) { - queue = new FastList(); + FASTPATH_UPDATER.lazySet(this, 2); + do { + if (nl.accept2(actual, queue.poll())) { + queue.clear(); + return; } - queue.add(new ErrorSentinel(e)); - return; - } - emitting = true; - list = queue; - queue = null; + } while (wip.decrementAndGet() > 0); } - drainQueue(list); - actual.onError(e); } @Override - public void onNext(T t) { - FastList list; - - synchronized (this) { - if (terminated) { + public void onNext(final T t) { + int n = fastpath; + if (n == 0 && wip.compareAndSet(0, 1)) { + int w; + try { + actual.onNext(t); + } finally { + w = wip.decrementAndGet(); + } + if (w == 0) { return; } - if (emitting) { - if (queue == null) { - queue = new FastList(); - } - queue.add(t != null ? t : NULL_SENTINEL); - // another thread is emitting so we add to the queue and return + FASTPATH_UPDATER.lazySet(this, 0); + } else { + if (n == 2) { + return; + } + queue.offer(nl.next(t)); + if (wip.getAndIncrement() != 0) { + return; + } + n = fastpath; // we won the emission race, are we done btw? + if (n == 2) { return; } - // we can emit - emitting = true; - // reference to the list to drain before emitting our value - list = queue; - queue = null; } - - // we only get here if we won the right to emit, otherwise we returned in the if(emitting) block above - boolean skipFinal = false; + int c = 0; + boolean endRegular = false; try { - int iter = MAX_DRAIN_ITERATION; do { - drainQueue(list); - if (iter == MAX_DRAIN_ITERATION) { - // after the first draining we emit our own value - actual.onNext(t); - } - --iter; - if (iter > 0) { - synchronized (this) { - list = queue; - queue = null; - if (list == null) { - emitting = false; - skipFinal = true; - return; - } - } - } - } while (iter > 0); - } finally { - if (!skipFinal) { - synchronized (this) { - if (terminated) { - list = queue; - queue = null; - } else { - emitting = false; - list = null; - } + if (nl.accept2(actual, queue.poll())) { + FASTPATH_UPDATER.lazySet(this, 2); + queue.clear(); + return; } + c++; + } while (wip.decrementAndGet() > 0); + endRegular = true; + if (c < 3) { + FASTPATH_UPDATER.lazySet(this, 1); } - } - - // this will only drain if terminated (done here outside of synchronized block) - drainQueue(list); - } - - void drainQueue(FastList list) { - if (list == null || list.size == 0) { - return; - } - for (Object v : list.array) { - if (v == null) { - break; - } - if (v == NULL_SENTINEL) { - actual.onNext(null); - } else if (v == COMPLETE_SENTINEL) { - actual.onCompleted(); - } else if (v.getClass() == ErrorSentinel.class) { - actual.onError(((ErrorSentinel) v).e); - } else { - @SuppressWarnings("unchecked") - T t = (T)v; - actual.onNext(t); + } finally { + if (!endRegular) { + wip.set(0); // allow onError to enter } } } diff --git a/rxjava-core/src/main/java/rx/operators/NotificationLite.java b/rxjava-core/src/main/java/rx/operators/NotificationLite.java index 025ba98df9..0a042e9780 100644 --- a/rxjava-core/src/main/java/rx/operators/NotificationLite.java +++ b/rxjava-core/src/main/java/rx/operators/NotificationLite.java @@ -129,6 +129,39 @@ public void accept(Observer o, Object n) { throw new IllegalArgumentException("The lite notification can not be null"); } } + /** + * Unwraps the lite notification and calls the appropriate method on the {@link Observer}. + * + * @param o + * the {@link Observer} to call onNext, onCompleted or onError. + * @param n + * @return true if the n was a termination event + * @throws IllegalArgumentException + * if the notification is null. + * @throws NullPointerException + * if the {@link Observer} is null. + */ + @SuppressWarnings("unchecked") + public boolean accept2(Observer o, Object n) { + if (n == ON_COMPLETED_SENTINEL) { + o.onCompleted(); + return true; + } else + if (n == ON_NEXT_NULL_SENTINEL) { + o.onNext(null); + return false; + } else + if (n != null) { + if (n.getClass() == OnErrorSentinel.class) { + o.onError(((OnErrorSentinel)n).e); + return true; + } + o.onNext((T)n); + return false; + } else { + throw new IllegalArgumentException("The lite notification can not be null"); + } + } public boolean isCompleted(Object n) { return n == ON_COMPLETED_SENTINEL;