Skip to content

Commit d1d75d6

Browse files
Merge pull request #1296 from jbripley/schedulers-internal
Move re-used internal Scheduler classes to their own package
2 parents 4d2a960 + 9e66ce4 commit d1d75d6

File tree

8 files changed

+233
-169
lines changed

8 files changed

+233
-169
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.schedulers;
17+
18+
import rx.Scheduler;
19+
import rx.Subscription;
20+
import rx.functions.Action0;
21+
import rx.subscriptions.Subscriptions;
22+
23+
import java.util.concurrent.*;
24+
25+
public class NewThreadWorker extends Scheduler.Worker implements Subscription {
26+
private final ScheduledExecutorService executor;
27+
volatile boolean isUnsubscribed;
28+
29+
/* package */
30+
public NewThreadWorker(ThreadFactory threadFactory) {
31+
executor = Executors.newScheduledThreadPool(1, threadFactory);
32+
}
33+
34+
@Override
35+
public Subscription schedule(final Action0 action) {
36+
return schedule(action, 0, null);
37+
}
38+
39+
@Override
40+
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
41+
if (isUnsubscribed) {
42+
return Subscriptions.empty();
43+
}
44+
return scheduleActual(action, delayTime, unit);
45+
}
46+
47+
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
48+
ScheduledAction run = new ScheduledAction(action);
49+
Future<?> f;
50+
if (delayTime <= 0) {
51+
f = executor.submit(run);
52+
} else {
53+
f = executor.schedule(run, delayTime, unit);
54+
}
55+
run.add(Subscriptions.from(f));
56+
57+
return run;
58+
}
59+
60+
@Override
61+
public void unsubscribe() {
62+
isUnsubscribed = true;
63+
executor.shutdownNow();
64+
}
65+
66+
@Override
67+
public boolean isUnsubscribed() {
68+
return isUnsubscribed;
69+
}
70+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.schedulers;
17+
18+
import rx.Subscription;
19+
import rx.functions.Action0;
20+
import rx.subscriptions.CompositeSubscription;
21+
22+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
23+
24+
/**
25+
* A runnable that executes an Action0 and can be cancelled
26+
* The analogue is the Subscriber in respect of an Observer.
27+
*/
28+
public final class ScheduledAction implements Runnable, Subscription {
29+
final CompositeSubscription cancel;
30+
final Action0 action;
31+
volatile int once;
32+
static final AtomicIntegerFieldUpdater<ScheduledAction> ONCE_UPDATER
33+
= AtomicIntegerFieldUpdater.newUpdater(ScheduledAction.class, "once");
34+
35+
public ScheduledAction(Action0 action) {
36+
this.action = action;
37+
this.cancel = new CompositeSubscription();
38+
}
39+
40+
@Override
41+
public void run() {
42+
try {
43+
action.call();
44+
} finally {
45+
unsubscribe();
46+
}
47+
}
48+
49+
@Override
50+
public boolean isUnsubscribed() {
51+
return cancel.isUnsubscribed();
52+
}
53+
54+
@Override
55+
public void unsubscribe() {
56+
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) {
57+
cancel.unsubscribe();
58+
}
59+
}
60+
61+
public void add(Subscription s) {
62+
cancel.add(s);
63+
}
64+
65+
/**
66+
* Adds a parent to this ScheduledAction so when it is
67+
* cancelled or terminates, it can remove itself from this parent.
68+
* @param parent
69+
*/
70+
public void addParent(CompositeSubscription parent) {
71+
cancel.add(new Remover(this, parent));
72+
}
73+
74+
/** Remove a child subscription from a composite when unsubscribing. */
75+
private static final class Remover implements Subscription {
76+
final Subscription s;
77+
final CompositeSubscription parent;
78+
volatile int once;
79+
static final AtomicIntegerFieldUpdater<Remover> ONCE_UPDATER
80+
= AtomicIntegerFieldUpdater.newUpdater(Remover.class, "once");
81+
82+
public Remover(Subscription s, CompositeSubscription parent) {
83+
this.s = s;
84+
this.parent = parent;
85+
}
86+
87+
@Override
88+
public boolean isUnsubscribed() {
89+
return s.isUnsubscribed();
90+
}
91+
92+
@Override
93+
public void unsubscribe() {
94+
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) {
95+
parent.remove(s);
96+
}
97+
}
98+
99+
}
100+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.util;
17+
18+
import java.util.concurrent.ThreadFactory;
19+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
20+
21+
public final class RxThreadFactory implements ThreadFactory {
22+
final String prefix;
23+
volatile long counter;
24+
static final AtomicLongFieldUpdater<RxThreadFactory> COUNTER_UPDATER
25+
= AtomicLongFieldUpdater.newUpdater(RxThreadFactory.class, "counter");
26+
27+
public RxThreadFactory(String prefix) {
28+
this.prefix = prefix;
29+
}
30+
31+
@Override
32+
public Thread newThread(Runnable r) {
33+
Thread t = new Thread(r, prefix + COUNTER_UPDATER.incrementAndGet(this));
34+
t.setDaemon(true);
35+
return t;
36+
}
37+
}

rxjava-core/src/main/java/rx/schedulers/CachedThreadScheduler.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
import rx.Scheduler;
1919
import rx.Subscription;
2020
import rx.functions.Action0;
21+
import rx.internal.schedulers.NewThreadWorker;
22+
import rx.internal.schedulers.ScheduledAction;
23+
import rx.internal.util.RxThreadFactory;
2124
import rx.subscriptions.CompositeSubscription;
2225
import rx.subscriptions.Subscriptions;
2326

@@ -27,12 +30,12 @@
2730

2831
/* package */final class CachedThreadScheduler extends Scheduler {
2932
private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler-";
30-
private static final NewThreadScheduler.RxThreadFactory WORKER_THREAD_FACTORY =
31-
new NewThreadScheduler.RxThreadFactory(WORKER_THREAD_NAME_PREFIX);
33+
private static final RxThreadFactory WORKER_THREAD_FACTORY =
34+
new RxThreadFactory(WORKER_THREAD_NAME_PREFIX);
3235

3336
private static final String EVICTOR_THREAD_NAME_PREFIX = "RxCachedWorkerPoolEvictor-";
34-
private static final NewThreadScheduler.RxThreadFactory EVICTOR_THREAD_FACTORY =
35-
new NewThreadScheduler.RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX);
37+
private static final RxThreadFactory EVICTOR_THREAD_FACTORY =
38+
new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX);
3639

3740
private static final class CachedWorkerPool {
3841
private final long keepAliveTime;
@@ -143,14 +146,14 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
143146
return Subscriptions.empty();
144147
}
145148

146-
NewThreadScheduler.NewThreadWorker.ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit);
149+
ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit);
147150
innerSubscription.add(s);
148151
s.addParent(innerSubscription);
149152
return s;
150153
}
151154
}
152155

153-
private static final class ThreadWorker extends NewThreadScheduler.NewThreadWorker {
156+
private static final class ThreadWorker extends NewThreadWorker {
154157
private long expirationTime;
155158

156159
ThreadWorker(ThreadFactory threadFactory) {

rxjava-core/src/main/java/rx/schedulers/EventLoopsScheduler.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,18 @@
1515
*/
1616
package rx.schedulers;
1717

18-
import java.util.concurrent.ThreadFactory;
19-
import java.util.concurrent.TimeUnit;
20-
2118
import rx.Scheduler;
2219
import rx.Subscription;
2320
import rx.functions.Action0;
24-
import rx.schedulers.NewThreadScheduler.NewThreadWorker.ScheduledAction;
25-
import rx.schedulers.NewThreadScheduler.RxThreadFactory;
21+
import rx.internal.schedulers.NewThreadWorker;
22+
import rx.internal.schedulers.ScheduledAction;
23+
import rx.internal.util.RxThreadFactory;
2624
import rx.subscriptions.CompositeSubscription;
2725
import rx.subscriptions.Subscriptions;
2826

27+
import java.util.concurrent.ThreadFactory;
28+
import java.util.concurrent.TimeUnit;
29+
2930
/* package */class EventLoopsScheduler extends Scheduler {
3031
/** Manages a fixed number of workers. */
3132
private static final String THREAD_NAME_PREFIX = "RxComputationThreadPool-";
@@ -104,7 +105,7 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
104105
}
105106
}
106107

107-
private static final class PoolWorker extends NewThreadScheduler.NewThreadWorker {
108+
private static final class PoolWorker extends NewThreadWorker {
108109
PoolWorker(ThreadFactory threadFactory) {
109110
super(threadFactory);
110111
}

rxjava-core/src/main/java/rx/schedulers/GenericScheduledExecutorService.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,14 @@
1515
*/
1616
package rx.schedulers;
1717

18+
import rx.Scheduler;
19+
import rx.internal.util.RxThreadFactory;
20+
1821
import java.util.concurrent.Executor;
1922
import java.util.concurrent.ExecutorService;
2023
import java.util.concurrent.Executors;
2124
import java.util.concurrent.ScheduledExecutorService;
2225

23-
import rx.Scheduler;
24-
2526
/**
2627
* A default {@link ScheduledExecutorService} that can be used for scheduling actions when a {@link Scheduler} implementation doesn't have that ability.
2728
* <p>
@@ -34,7 +35,7 @@
3435
/* package */final class GenericScheduledExecutorService {
3536

3637
private static final String THREAD_NAME_PREFIX = "RxScheduledExecutorPool-";
37-
private static final NewThreadScheduler.RxThreadFactory THREAD_FACTORY = new NewThreadScheduler.RxThreadFactory(THREAD_NAME_PREFIX);
38+
private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);
3839

3940
private final static GenericScheduledExecutorService INSTANCE = new GenericScheduledExecutorService();
4041
private final ScheduledExecutorService executor;

0 commit comments

Comments
 (0)