Skip to content

Commit 4ae5333

Browse files
Merge pull request #1236 from akarnokd/CompositeSubscriptionMemory521
CompositeSubscription with atomic field updater
2 parents 62e68d5 + 7a1ad18 commit 4ae5333

File tree

1 file changed

+23
-21
lines changed

1 file changed

+23
-21
lines changed

rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.ArrayList;
1919
import java.util.List;
2020
import java.util.concurrent.atomic.AtomicReference;
21+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
2122

2223
import rx.Subscription;
2324
import rx.exceptions.CompositeException;
@@ -29,8 +30,11 @@
2930
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net equivalent CompositeDisposable</a>
3031
*/
3132
public final class CompositeSubscription implements Subscription {
32-
33-
private final AtomicReference<State> state = new AtomicReference<State>();
33+
/** The atomic state updater. */
34+
static final AtomicReferenceFieldUpdater<CompositeSubscription, State> STATE_UPDATER
35+
= AtomicReferenceFieldUpdater.newUpdater(CompositeSubscription.class, State.class, "state");
36+
/** The subscription state. */
37+
volatile State state;
3438

3539
/** Empty initial state. */
3640
private static final State CLEAR_STATE;
@@ -97,43 +101,45 @@ State clear() {
97101
}
98102

99103
public CompositeSubscription() {
100-
state.set(CLEAR_STATE);
104+
// this creates only a store-store barrier which is generally faster when
105+
// CompositeSubscriptions are created in a tight loop.
106+
STATE_UPDATER.lazySet(this, CLEAR_STATE);
101107
}
102108

103109
public CompositeSubscription(final Subscription... subscriptions) {
104-
state.set(new State(false, subscriptions));
110+
STATE_UPDATER.lazySet(this, new State(false, subscriptions));
105111
}
106112

107113
@Override
108114
public boolean isUnsubscribed() {
109-
return state.get().isUnsubscribed;
115+
return state.isUnsubscribed;
110116
}
111117

112118
public void add(final Subscription s) {
113119
State oldState;
114120
State newState;
115121
do {
116-
oldState = state.get();
122+
oldState = state;
117123
if (oldState.isUnsubscribed) {
118124
s.unsubscribe();
119125
return;
120126
} else {
121127
newState = oldState.add(s);
122128
}
123-
} while (!state.compareAndSet(oldState, newState));
129+
} while (!STATE_UPDATER.compareAndSet(this, oldState, newState));
124130
}
125131

126132
public void remove(final Subscription s) {
127133
State oldState;
128134
State newState;
129135
do {
130-
oldState = state.get();
136+
oldState = state;
131137
if (oldState.isUnsubscribed) {
132138
return;
133139
} else {
134140
newState = oldState.remove(s);
135141
}
136-
} while (!state.compareAndSet(oldState, newState));
142+
} while (!STATE_UPDATER.compareAndSet(this, oldState, newState));
137143
// if we removed successfully we then need to call unsubscribe on it
138144
s.unsubscribe();
139145
}
@@ -142,29 +148,25 @@ public void clear() {
142148
State oldState;
143149
State newState;
144150
do {
145-
oldState = state.get();
151+
oldState = state;
146152
if (oldState.isUnsubscribed) {
147153
return;
148154
} else {
149155
newState = oldState.clear();
150156
}
151-
} while (!state.compareAndSet(oldState, newState));
157+
} while (!STATE_UPDATER.compareAndSet(this, oldState, newState));
152158
// if we cleared successfully we then need to call unsubscribe on all previous
153159
unsubscribeFromAll(oldState.subscriptions);
154160
}
155161

156162
@Override
157163
public void unsubscribe() {
158-
State oldState;
159-
State newState;
160-
do {
161-
oldState = state.get();
162-
if (oldState.isUnsubscribed) {
163-
return;
164-
} else {
165-
newState = oldState.unsubscribe();
166-
}
167-
} while (!state.compareAndSet(oldState, newState));
164+
State oldState = state;
165+
if (oldState.isUnsubscribed) {
166+
return;
167+
}
168+
// intrinsics may make this a single instruction and may prevent concurrent add/remove faster
169+
oldState = STATE_UPDATER.getAndSet(this, oldState.unsubscribe());
168170
unsubscribeFromAll(oldState.subscriptions);
169171
}
170172

0 commit comments

Comments
 (0)