Skip to content

Commit 5dd0ede

Browse files
Merge pull request #1359 from akarnokd/MpscLinkedQueuePadding
Fixed padding of the integer and node classes.
2 parents 0398453 + 30e7afa commit 5dd0ede

File tree

4 files changed

+148
-62
lines changed

4 files changed

+148
-62
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.util;
17+
18+
import java.io.Serializable;
19+
20+
/**
21+
* Padding up to 128 bytes at the front.
22+
* Based on netty's implementation
23+
*/
24+
abstract class FrontPadding implements Serializable {
25+
/** */
26+
private static final long serialVersionUID = -596356687591714352L;
27+
/** Padding. */
28+
public transient long p1, p2, p3, p4, p5, p6; // 48 bytes (header is 16 bytes)
29+
/** Padding. */
30+
public transient long p8, p9, p10, p11, p12, p13, p14, p15; // 64 bytes
31+
}

rxjava-core/src/main/java/rx/internal/util/MpscPaddedQueue.java

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.internal.util;
1717

18+
import java.io.Serializable;
1819
import java.util.concurrent.atomic.AtomicReference;
1920
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
2021

@@ -26,8 +27,6 @@
2627
* @param <E> the element type
2728
*/
2829
public final class MpscPaddedQueue<E> extends AtomicReference<MpscPaddedQueue.Node<E>> {
29-
@SuppressWarnings(value = "rawtypes")
30-
static final AtomicReferenceFieldUpdater<PaddedNode, Node> TAIL_UPDATER = AtomicReferenceFieldUpdater.newUpdater(PaddedNode.class, Node.class, "tail");
3130
/** */
3231
private static final long serialVersionUID = 1L;
3332
/** The padded tail reference. */
@@ -39,7 +38,7 @@ public final class MpscPaddedQueue<E> extends AtomicReference<MpscPaddedQueue.No
3938
public MpscPaddedQueue() {
4039
Node<E> first = new Node<E>(null);
4140
tail = new PaddedNode<E>();
42-
tail.tail = first;
41+
tail.node = first;
4342
set(first);
4443
}
4544

@@ -64,7 +63,7 @@ public E poll() {
6463
}
6564
E v = n.value;
6665
n.value = null; // do not retain this value as the node still stays in the queue
67-
TAIL_UPDATER.lazySet(tail, n);
66+
tail.lazySet(n);
6867
return v;
6968
}
7069

@@ -75,7 +74,7 @@ public E poll() {
7574
private Node<E> peekNode() {
7675
for (;;) {
7776
@SuppressWarnings(value = "unchecked")
78-
Node<E> t = TAIL_UPDATER.get(tail);
77+
Node<E> t = tail.node;
7978
Node<E> n = t.get();
8079
if (n != null || get() == t) {
8180
return n;
@@ -93,29 +92,30 @@ public void clear() {
9392
}
9493
}
9594
}
96-
97-
/** Class that contains a Node reference padded around to fit a typical cache line. */
98-
static final class PaddedNode<E> {
99-
/** Padding, public to prevent optimizing it away. */
100-
public int p1;
101-
volatile Node<E> tail;
102-
/** Padding, public to prevent optimizing it away. */
103-
public long p2;
104-
/** Padding, public to prevent optimizing it away. */
105-
public long p3;
106-
/** Padding, public to prevent optimizing it away. */
107-
public long p4;
108-
/** Padding, public to prevent optimizing it away. */
109-
public long p5;
110-
/** Padding, public to prevent optimizing it away. */
111-
public long p6;
95+
/** The front-padded node class housing the actual value. */
96+
static abstract class PaddedNodeBase<E> extends FrontPadding {
97+
private static final long serialVersionUID = 2L;
98+
volatile Node<E> node;
99+
@SuppressWarnings(value = "rawtypes")
100+
static final AtomicReferenceFieldUpdater<PaddedNodeBase, Node> NODE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(PaddedNodeBase.class, Node.class, "node");
101+
public void lazySet(Node<E> newValue) {
102+
NODE_UPDATER.lazySet(this, newValue);
103+
}
104+
}
105+
/** Post-padding of the padded node base class. */
106+
static final class PaddedNode<E> extends PaddedNodeBase<E> {
107+
private static final long serialVersionUID = 3L;
108+
/** Padding. */
109+
public transient long p16, p17, p18, p19, p20, p21, p22; // 56 bytes (the remaining 8 is in the base)
110+
/** Padding. */
111+
public transient long p24, p25, p26, p27, p28, p29, p30, p31; // 64 bytes
112112
}
113113

114114
/**
115115
* Regular node with value and reference to the next node.
116116
*/
117-
static final class Node<E> {
118-
117+
static final class Node<E> implements Serializable {
118+
private static final long serialVersionUID = 4L;
119119
E value;
120120
@SuppressWarnings(value = "rawtypes")
121121
static final AtomicReferenceFieldUpdater<Node, Node> TAIL_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "tail");

rxjava-core/src/main/java/rx/internal/util/PaddedAtomicInteger.java

Lines changed: 10 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -15,45 +15,16 @@
1515
*/
1616
package rx.internal.util;
1717

18-
import java.util.concurrent.atomic.AtomicInteger;
19-
2018
/**
21-
* An AtomicInteger with extra fields to pad it out to fit a typical cache line.
19+
* A padded atomic integer to fill in 4 cache lines to avoid any false sharing or
20+
* adjacent prefetch.
21+
* Based on Netty's implementation.
2222
*/
23-
public final class PaddedAtomicInteger extends AtomicInteger {
24-
private static final long serialVersionUID = 1L;
25-
/** Padding, public to prevent optimizing it away. */
26-
public int p1;
27-
/** Padding, public to prevent optimizing it away. */
28-
public int p2;
29-
/** Padding, public to prevent optimizing it away. */
30-
public int p3;
31-
/** Padding, public to prevent optimizing it away. */
32-
public int p4;
33-
/** Padding, public to prevent optimizing it away. */
34-
public int p5;
35-
/** Padding, public to prevent optimizing it away. */
36-
public int p6;
37-
/** Padding, public to prevent optimizing it away. */
38-
public int p7;
39-
/** Padding, public to prevent optimizing it away. */
40-
public int p8;
41-
/** Padding, public to prevent optimizing it away. */
42-
public int p9;
43-
/** Padding, public to prevent optimizing it away. */
44-
public int p10;
45-
/** Padding, public to prevent optimizing it away. */
46-
public int p11;
47-
/** Padding, public to prevent optimizing it away. */
48-
public int p12;
49-
/** Padding, public to prevent optimizing it away. */
50-
public int p13;
51-
/**
52-
* @warn description missing
53-
* @return prevents optimizing away the fields, most likely.
54-
*/
55-
public int noopt() {
56-
return p1 + p2 + p3 + p4 + p5 + p6 + p7 + p8 + p9 + p10 + p11 + p12 + p13;
57-
}
58-
23+
public final class PaddedAtomicInteger extends PaddedAtomicIntegerBase {
24+
/** */
25+
private static final long serialVersionUID = 8781891581317286855L;
26+
/** Padding. */
27+
public transient long p16, p17, p18, p19, p20, p21, p22; // 56 bytes (the remaining 8 is in the base)
28+
/** Padding. */
29+
public transient long p24, p25, p26, p27, p28, p29, p30, p31; // 64 bytes
5930
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package rx.internal.util;
18+
19+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
20+
21+
/**
22+
* The atomic integer base padded at the front.
23+
* Based on Netty's implementation.
24+
*/
25+
abstract class PaddedAtomicIntegerBase extends FrontPadding {
26+
27+
private static final long serialVersionUID = 6513142711280243198L;
28+
29+
private static final AtomicIntegerFieldUpdater<PaddedAtomicIntegerBase> updater;
30+
31+
static {
32+
updater = AtomicIntegerFieldUpdater.newUpdater(PaddedAtomicIntegerBase.class, "value");
33+
}
34+
35+
private volatile int value; // 8-byte object field (or 4-byte + padding)
36+
37+
public final int get() {
38+
return value;
39+
}
40+
41+
public final void set(int newValue) {
42+
this.value = newValue;
43+
}
44+
45+
public final void lazySet(int newValue) {
46+
updater.lazySet(this, newValue);
47+
}
48+
49+
public final boolean compareAndSet(int expect, int update) {
50+
return updater.compareAndSet(this, expect, update);
51+
}
52+
53+
public final boolean weakCompareAndSet(int expect, int update) {
54+
return updater.weakCompareAndSet(this, expect, update);
55+
}
56+
57+
public final int getAndSet(int newValue) {
58+
return updater.getAndSet(this, value);
59+
}
60+
61+
public final int getAndAdd(int delta) {
62+
return updater.getAndAdd(this, delta);
63+
}
64+
public final int incrementAndGet() {
65+
return updater.incrementAndGet(this);
66+
}
67+
public final int decrementAndGet() {
68+
return updater.decrementAndGet(this);
69+
}
70+
public final int getAndIncrement() {
71+
return updater.getAndIncrement(this);
72+
}
73+
public final int getAndDecrement() {
74+
return updater.getAndDecrement(this);
75+
}
76+
public final int addAndGet(int delta) {
77+
return updater.addAndGet(this, delta);
78+
}
79+
80+
@Override
81+
public String toString() {
82+
return String.valueOf(get());
83+
}
84+
}

0 commit comments

Comments
 (0)