diff --git a/rxjava-core/src/main/java/rx/observers/MpscPaddedQueue.java b/rxjava-core/src/main/java/rx/observers/MpscPaddedQueue.java
new file mode 100644
index 0000000000..3a5aa90fd4
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/observers/MpscPaddedQueue.java
@@ -0,0 +1,129 @@
+/**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package rx.observers;
+
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+/**
+ * A multiple-producer single consumer queue implementation with padded reference
+ * to tail to avoid cache-line thrashing.
+ * Based on Netty's MpscQueue implementation but using AtomicReferenceFieldUpdater
+ * instead of Unsafe.
+ * @param the element type
+ */
+public final class MpscPaddedQueue extends AtomicReference> {
+ @SuppressWarnings(value = "rawtypes")
+ static final AtomicReferenceFieldUpdater TAIL_UPDATER = AtomicReferenceFieldUpdater.newUpdater(PaddedNode.class, Node.class, "tail");
+ /** */
+ private static final long serialVersionUID = 1L;
+ /** The padded tail reference. */
+ final PaddedNode tail;
+ /**
+ * Initializes the empty queue.
+ */
+ public MpscPaddedQueue() {
+ Node first = new Node(null);
+ tail = new PaddedNode();
+ tail.tail = first;
+ set(first);
+ }
+ /**
+ * Offer a new value.
+ * @param v the value to offer
+ */
+ public void offer(E v) {
+ Node n = new Node(v);
+ getAndSet(n).set(n);
+ }
+
+ /**
+ * @return Poll a value from the head of the queue or return null if the queue is empty.
+ */
+ public E poll() {
+ Node n = peekNode();
+ if (n == null) {
+ return null;
+ }
+ E v = n.value;
+ n.value = null; // do not retain this value as the node still stays in the queue
+ TAIL_UPDATER.lazySet(tail, n);
+ return v;
+ }
+ /**
+ * Check if there is a node available without changing anything.
+ */
+ private Node peekNode() {
+ for (;;) {
+ @SuppressWarnings(value = "unchecked")
+ Node t = TAIL_UPDATER.get(tail);
+ Node n = t.get();
+ if (n != null || get() == t) {
+ return n;
+ }
+ }
+ }
+ /**
+ * Clears the queue.
+ */
+ public void clear() {
+ for (;;) {
+ if (poll() == null) {
+ break;
+ }
+ }
+ }
+ /** Class that contains a Node reference padded around to fit a typical cache line. */
+ static final class PaddedNode {
+ /** Padding, public to prevent optimizing it away. */
+ public int p1;
+ volatile Node tail;
+ /** Padding, public to prevent optimizing it away. */
+ public long p2;
+ /** Padding, public to prevent optimizing it away. */
+ public long p3;
+ /** Padding, public to prevent optimizing it away. */
+ public long p4;
+ /** Padding, public to prevent optimizing it away. */
+ public long p5;
+ /** Padding, public to prevent optimizing it away. */
+ public long p6;
+ }
+ /**
+ * Regular node with value and reference to the next node.
+ */
+ static final class Node {
+
+ E value;
+ @SuppressWarnings(value = "rawtypes")
+ static final AtomicReferenceFieldUpdater TAIL_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "tail");
+ volatile Node tail;
+
+ public Node(E value) {
+ this.value = value;
+ }
+
+ public void set(Node newTail) {
+ TAIL_UPDATER.lazySet(this, newTail);
+ }
+
+ @SuppressWarnings(value = "unchecked")
+ public Node get() {
+ return TAIL_UPDATER.get(this);
+ }
+ }
+
+}
diff --git a/rxjava-core/src/main/java/rx/observers/PaddedAtomicInteger.java b/rxjava-core/src/main/java/rx/observers/PaddedAtomicInteger.java
new file mode 100644
index 0000000000..b00c16b881
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/observers/PaddedAtomicInteger.java
@@ -0,0 +1,56 @@
+/**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package rx.observers;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * An AtomicInteger with extra fields to pad it out to fit a typical cache line.
+ */
+public final class PaddedAtomicInteger extends AtomicInteger {
+ private static final long serialVersionUID = 1L;
+ /** Padding, public to prevent optimizing it away. */
+ public int p1;
+ /** Padding, public to prevent optimizing it away. */
+ public int p2;
+ /** Padding, public to prevent optimizing it away. */
+ public int p3;
+ /** Padding, public to prevent optimizing it away. */
+ public int p4;
+ /** Padding, public to prevent optimizing it away. */
+ public int p5;
+ /** Padding, public to prevent optimizing it away. */
+ public int p6;
+ /** Padding, public to prevent optimizing it away. */
+ public int p7;
+ /** Padding, public to prevent optimizing it away. */
+ public int p8;
+ /** Padding, public to prevent optimizing it away. */
+ public int p9;
+ /** Padding, public to prevent optimizing it away. */
+ public int p10;
+ /** Padding, public to prevent optimizing it away. */
+ public int p11;
+ /** Padding, public to prevent optimizing it away. */
+ public int p12;
+ /** Padding, public to prevent optimizing it away. */
+ public int p13;
+ /** @return prevents optimizing away the fields, most likely. */
+ public int noopt() {
+ return p1 + p2 + p3 + p4 + p5 + p6 + p7 + p8 + p9 + p10 + p11 + p12 + p13;
+ }
+
+}
diff --git a/rxjava-core/src/main/java/rx/observers/SerializedObserver.java b/rxjava-core/src/main/java/rx/observers/SerializedObserver.java
index 0572fddb07..9e1e0ac0e8 100644
--- a/rxjava-core/src/main/java/rx/observers/SerializedObserver.java
+++ b/rxjava-core/src/main/java/rx/observers/SerializedObserver.java
@@ -15,7 +15,9 @@
*/
package rx.observers;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import rx.Observer;
+import rx.operators.NotificationLite;
/**
* Enforce single-threaded, serialized, ordered execution of onNext, onCompleted, onError.
@@ -29,178 +31,146 @@
*
* @param
*/
-public class SerializedObserver implements Observer {
- private final Observer super T> actual;
-
- private boolean emitting = false;
- private boolean terminated = false;
- private FastList queue;
-
- private static final int MAX_DRAIN_ITERATION = Integer.MAX_VALUE;
- private static final Object NULL_SENTINEL = new Object();
- private static final Object COMPLETE_SENTINEL = new Object();
-
- static final class FastList {
- Object[] array;
- int size;
-
- public void add(Object o) {
- int s = size;
- Object[] a = array;
- if (a == null) {
- a = new Object[16];
- array = a;
- } else if (s == a.length) {
- Object[] array2 = new Object[s + (s >> 2)];
- System.arraycopy(a, 0, array2, 0, s);
- a = array2;
- array = a;
- }
- a[s] = o;
- size = s + 1;
- }
- }
-
- private static final class ErrorSentinel {
- final Throwable e;
-
- ErrorSentinel(Throwable e) {
- this.e = e;
- }
- }
-
+public final class SerializedObserver implements Observer {
+ /** The actual observer that receives the values. */
+ private final Observer