Skip to content

Commit 9e66ce4

Browse files
committed
Move NewThreadWorker out from NewThreadScheduler
- Break out ScheduledAction from NewThreadWorker
1 parent 99c6cdf commit 9e66ce4

File tree

5 files changed

+178
-133
lines changed

5 files changed

+178
-133
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.schedulers;
17+
18+
import rx.Scheduler;
19+
import rx.Subscription;
20+
import rx.functions.Action0;
21+
import rx.subscriptions.Subscriptions;
22+
23+
import java.util.concurrent.*;
24+
25+
public class NewThreadWorker extends Scheduler.Worker implements Subscription {
26+
private final ScheduledExecutorService executor;
27+
volatile boolean isUnsubscribed;
28+
29+
/* package */
30+
public NewThreadWorker(ThreadFactory threadFactory) {
31+
executor = Executors.newScheduledThreadPool(1, threadFactory);
32+
}
33+
34+
@Override
35+
public Subscription schedule(final Action0 action) {
36+
return schedule(action, 0, null);
37+
}
38+
39+
@Override
40+
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
41+
if (isUnsubscribed) {
42+
return Subscriptions.empty();
43+
}
44+
return scheduleActual(action, delayTime, unit);
45+
}
46+
47+
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
48+
ScheduledAction run = new ScheduledAction(action);
49+
Future<?> f;
50+
if (delayTime <= 0) {
51+
f = executor.submit(run);
52+
} else {
53+
f = executor.schedule(run, delayTime, unit);
54+
}
55+
run.add(Subscriptions.from(f));
56+
57+
return run;
58+
}
59+
60+
@Override
61+
public void unsubscribe() {
62+
isUnsubscribed = true;
63+
executor.shutdownNow();
64+
}
65+
66+
@Override
67+
public boolean isUnsubscribed() {
68+
return isUnsubscribed;
69+
}
70+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.schedulers;
17+
18+
import rx.Subscription;
19+
import rx.functions.Action0;
20+
import rx.subscriptions.CompositeSubscription;
21+
22+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
23+
24+
/**
25+
* A runnable that executes an Action0 and can be cancelled
26+
* The analogue is the Subscriber in respect of an Observer.
27+
*/
28+
public final class ScheduledAction implements Runnable, Subscription {
29+
final CompositeSubscription cancel;
30+
final Action0 action;
31+
volatile int once;
32+
static final AtomicIntegerFieldUpdater<ScheduledAction> ONCE_UPDATER
33+
= AtomicIntegerFieldUpdater.newUpdater(ScheduledAction.class, "once");
34+
35+
public ScheduledAction(Action0 action) {
36+
this.action = action;
37+
this.cancel = new CompositeSubscription();
38+
}
39+
40+
@Override
41+
public void run() {
42+
try {
43+
action.call();
44+
} finally {
45+
unsubscribe();
46+
}
47+
}
48+
49+
@Override
50+
public boolean isUnsubscribed() {
51+
return cancel.isUnsubscribed();
52+
}
53+
54+
@Override
55+
public void unsubscribe() {
56+
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) {
57+
cancel.unsubscribe();
58+
}
59+
}
60+
61+
public void add(Subscription s) {
62+
cancel.add(s);
63+
}
64+
65+
/**
66+
* Adds a parent to this ScheduledAction so when it is
67+
* cancelled or terminates, it can remove itself from this parent.
68+
* @param parent
69+
*/
70+
public void addParent(CompositeSubscription parent) {
71+
cancel.add(new Remover(this, parent));
72+
}
73+
74+
/** Remove a child subscription from a composite when unsubscribing. */
75+
private static final class Remover implements Subscription {
76+
final Subscription s;
77+
final CompositeSubscription parent;
78+
volatile int once;
79+
static final AtomicIntegerFieldUpdater<Remover> ONCE_UPDATER
80+
= AtomicIntegerFieldUpdater.newUpdater(Remover.class, "once");
81+
82+
public Remover(Subscription s, CompositeSubscription parent) {
83+
this.s = s;
84+
this.parent = parent;
85+
}
86+
87+
@Override
88+
public boolean isUnsubscribed() {
89+
return s.isUnsubscribed();
90+
}
91+
92+
@Override
93+
public void unsubscribe() {
94+
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) {
95+
parent.remove(s);
96+
}
97+
}
98+
99+
}
100+
}

rxjava-core/src/main/java/rx/schedulers/CachedThreadScheduler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import rx.Scheduler;
1919
import rx.Subscription;
2020
import rx.functions.Action0;
21+
import rx.internal.schedulers.NewThreadWorker;
22+
import rx.internal.schedulers.ScheduledAction;
2123
import rx.internal.util.RxThreadFactory;
2224
import rx.subscriptions.CompositeSubscription;
2325
import rx.subscriptions.Subscriptions;
@@ -144,14 +146,14 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
144146
return Subscriptions.empty();
145147
}
146148

147-
NewThreadScheduler.NewThreadWorker.ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit);
149+
ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit);
148150
innerSubscription.add(s);
149151
s.addParent(innerSubscription);
150152
return s;
151153
}
152154
}
153155

154-
private static final class ThreadWorker extends NewThreadScheduler.NewThreadWorker {
156+
private static final class ThreadWorker extends NewThreadWorker {
155157
private long expirationTime;
156158

157159
ThreadWorker(ThreadFactory threadFactory) {

rxjava-core/src/main/java/rx/schedulers/EventLoopsScheduler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
import rx.Scheduler;
1919
import rx.Subscription;
2020
import rx.functions.Action0;
21+
import rx.internal.schedulers.NewThreadWorker;
22+
import rx.internal.schedulers.ScheduledAction;
2123
import rx.internal.util.RxThreadFactory;
22-
import rx.schedulers.NewThreadScheduler.NewThreadWorker.ScheduledAction;
2324
import rx.subscriptions.CompositeSubscription;
2425
import rx.subscriptions.Subscriptions;
2526

@@ -104,7 +105,7 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
104105
}
105106
}
106107

107-
private static final class PoolWorker extends NewThreadScheduler.NewThreadWorker {
108+
private static final class PoolWorker extends NewThreadWorker {
108109
PoolWorker(ThreadFactory threadFactory) {
109110
super(threadFactory);
110111
}

rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java

Lines changed: 1 addition & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,8 @@
1616
package rx.schedulers;
1717

1818
import rx.Scheduler;
19-
import rx.Subscription;
20-
import rx.functions.Action0;
19+
import rx.internal.schedulers.NewThreadWorker;
2120
import rx.internal.util.RxThreadFactory;
22-
import rx.subscriptions.CompositeSubscription;
23-
import rx.subscriptions.Subscriptions;
24-
25-
import java.util.concurrent.*;
26-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2721

2822
/**
2923
* Schedules work on a new thread.
@@ -46,126 +40,4 @@ private NewThreadScheduler() {
4640
public Worker createWorker() {
4741
return new NewThreadWorker(THREAD_FACTORY);
4842
}
49-
50-
/* package */static class NewThreadWorker extends Scheduler.Worker implements Subscription {
51-
private final ScheduledExecutorService executor;
52-
volatile boolean isUnsubscribed;
53-
54-
/* package */NewThreadWorker(ThreadFactory threadFactory) {
55-
executor = Executors.newScheduledThreadPool(1, threadFactory);
56-
}
57-
58-
@Override
59-
public Subscription schedule(final Action0 action) {
60-
return schedule(action, 0, null);
61-
}
62-
63-
@Override
64-
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
65-
if (isUnsubscribed) {
66-
return Subscriptions.empty();
67-
}
68-
return scheduleActual(action, delayTime, unit);
69-
}
70-
71-
/* package */ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
72-
ScheduledAction run = new ScheduledAction(action);
73-
Future<?> f;
74-
if (delayTime <= 0) {
75-
f = executor.submit(run);
76-
} else {
77-
f = executor.schedule(run, delayTime, unit);
78-
}
79-
run.add(Subscriptions.from(f));
80-
81-
return run;
82-
}
83-
84-
/** Remove a child subscription from a composite when unsubscribing. */
85-
private static final class Remover implements Subscription {
86-
final Subscription s;
87-
final CompositeSubscription parent;
88-
volatile int once;
89-
static final AtomicIntegerFieldUpdater<Remover> ONCE_UPDATER
90-
= AtomicIntegerFieldUpdater.newUpdater(Remover.class, "once");
91-
92-
public Remover(Subscription s, CompositeSubscription parent) {
93-
this.s = s;
94-
this.parent = parent;
95-
}
96-
97-
@Override
98-
public boolean isUnsubscribed() {
99-
return s.isUnsubscribed();
100-
}
101-
102-
@Override
103-
public void unsubscribe() {
104-
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) {
105-
parent.remove(s);
106-
}
107-
}
108-
109-
}
110-
/**
111-
* A runnable that executes an Action0 and can be cancelled
112-
* The analogue is the Subscriber in respect of an Observer.
113-
*/
114-
public static final class ScheduledAction implements Runnable, Subscription {
115-
final CompositeSubscription cancel;
116-
final Action0 action;
117-
volatile int once;
118-
static final AtomicIntegerFieldUpdater<ScheduledAction> ONCE_UPDATER
119-
= AtomicIntegerFieldUpdater.newUpdater(ScheduledAction.class, "once");
120-
121-
public ScheduledAction(Action0 action) {
122-
this.action = action;
123-
this.cancel = new CompositeSubscription();
124-
}
125-
126-
@Override
127-
public void run() {
128-
try {
129-
action.call();
130-
} finally {
131-
unsubscribe();
132-
}
133-
}
134-
135-
@Override
136-
public boolean isUnsubscribed() {
137-
return cancel.isUnsubscribed();
138-
}
139-
140-
@Override
141-
public void unsubscribe() {
142-
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) {
143-
cancel.unsubscribe();
144-
}
145-
}
146-
public void add(Subscription s) {
147-
cancel.add(s);
148-
}
149-
/**
150-
* Adds a parent to this ScheduledAction so when it is
151-
* cancelled or terminates, it can remove itself from this parent.
152-
* @param parent
153-
*/
154-
public void addParent(CompositeSubscription parent) {
155-
cancel.add(new Remover(this, parent));
156-
}
157-
}
158-
159-
@Override
160-
public void unsubscribe() {
161-
isUnsubscribed = true;
162-
executor.shutdownNow();
163-
}
164-
165-
@Override
166-
public boolean isUnsubscribed() {
167-
return isUnsubscribed;
168-
}
169-
170-
}
17143
}

0 commit comments

Comments
 (0)