Skip to content

Commit 99c6cdf

Browse files
committed
Move RxThreadFactory out from NewThreadScheduler
1 parent 4d2a960 commit 99c6cdf

File tree

6 files changed

+58
-39
lines changed

6 files changed

+58
-39
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.util;
17+
18+
import java.util.concurrent.ThreadFactory;
19+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
20+
21+
public final class RxThreadFactory implements ThreadFactory {
22+
final String prefix;
23+
volatile long counter;
24+
static final AtomicLongFieldUpdater<RxThreadFactory> COUNTER_UPDATER
25+
= AtomicLongFieldUpdater.newUpdater(RxThreadFactory.class, "counter");
26+
27+
public RxThreadFactory(String prefix) {
28+
this.prefix = prefix;
29+
}
30+
31+
@Override
32+
public Thread newThread(Runnable r) {
33+
Thread t = new Thread(r, prefix + COUNTER_UPDATER.incrementAndGet(this));
34+
t.setDaemon(true);
35+
return t;
36+
}
37+
}

rxjava-core/src/main/java/rx/schedulers/CachedThreadScheduler.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import rx.Scheduler;
1919
import rx.Subscription;
2020
import rx.functions.Action0;
21+
import rx.internal.util.RxThreadFactory;
2122
import rx.subscriptions.CompositeSubscription;
2223
import rx.subscriptions.Subscriptions;
2324

@@ -27,12 +28,12 @@
2728

2829
/* package */final class CachedThreadScheduler extends Scheduler {
2930
private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler-";
30-
private static final NewThreadScheduler.RxThreadFactory WORKER_THREAD_FACTORY =
31-
new NewThreadScheduler.RxThreadFactory(WORKER_THREAD_NAME_PREFIX);
31+
private static final RxThreadFactory WORKER_THREAD_FACTORY =
32+
new RxThreadFactory(WORKER_THREAD_NAME_PREFIX);
3233

3334
private static final String EVICTOR_THREAD_NAME_PREFIX = "RxCachedWorkerPoolEvictor-";
34-
private static final NewThreadScheduler.RxThreadFactory EVICTOR_THREAD_FACTORY =
35-
new NewThreadScheduler.RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX);
35+
private static final RxThreadFactory EVICTOR_THREAD_FACTORY =
36+
new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX);
3637

3738
private static final class CachedWorkerPool {
3839
private final long keepAliveTime;

rxjava-core/src/main/java/rx/schedulers/EventLoopsScheduler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,17 @@
1515
*/
1616
package rx.schedulers;
1717

18-
import java.util.concurrent.ThreadFactory;
19-
import java.util.concurrent.TimeUnit;
20-
2118
import rx.Scheduler;
2219
import rx.Subscription;
2320
import rx.functions.Action0;
21+
import rx.internal.util.RxThreadFactory;
2422
import rx.schedulers.NewThreadScheduler.NewThreadWorker.ScheduledAction;
25-
import rx.schedulers.NewThreadScheduler.RxThreadFactory;
2623
import rx.subscriptions.CompositeSubscription;
2724
import rx.subscriptions.Subscriptions;
2825

26+
import java.util.concurrent.ThreadFactory;
27+
import java.util.concurrent.TimeUnit;
28+
2929
/* package */class EventLoopsScheduler extends Scheduler {
3030
/** Manages a fixed number of workers. */
3131
private static final String THREAD_NAME_PREFIX = "RxComputationThreadPool-";

rxjava-core/src/main/java/rx/schedulers/GenericScheduledExecutorService.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,14 @@
1515
*/
1616
package rx.schedulers;
1717

18+
import rx.Scheduler;
19+
import rx.internal.util.RxThreadFactory;
20+
1821
import java.util.concurrent.Executor;
1922
import java.util.concurrent.ExecutorService;
2023
import java.util.concurrent.Executors;
2124
import java.util.concurrent.ScheduledExecutorService;
2225

23-
import rx.Scheduler;
24-
2526
/**
2627
* A default {@link ScheduledExecutorService} that can be used for scheduling actions when a {@link Scheduler} implementation doesn't have that ability.
2728
* <p>
@@ -34,7 +35,7 @@
3435
/* package */final class GenericScheduledExecutorService {
3536

3637
private static final String THREAD_NAME_PREFIX = "RxScheduledExecutorPool-";
37-
private static final NewThreadScheduler.RxThreadFactory THREAD_FACTORY = new NewThreadScheduler.RxThreadFactory(THREAD_NAME_PREFIX);
38+
private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);
3839

3940
private final static GenericScheduledExecutorService INSTANCE = new GenericScheduledExecutorService();
4041
private final ScheduledExecutorService executor;

rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,16 @@
1515
*/
1616
package rx.schedulers;
1717

18-
import java.util.concurrent.Executors;
19-
import java.util.concurrent.Future;
20-
import java.util.concurrent.ScheduledExecutorService;
21-
import java.util.concurrent.ThreadFactory;
22-
import java.util.concurrent.TimeUnit;
23-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
24-
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
25-
2618
import rx.Scheduler;
2719
import rx.Subscription;
2820
import rx.functions.Action0;
21+
import rx.internal.util.RxThreadFactory;
2922
import rx.subscriptions.CompositeSubscription;
3023
import rx.subscriptions.Subscriptions;
3124

25+
import java.util.concurrent.*;
26+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
27+
3228
/**
3329
* Schedules work on a new thread.
3430
*/
@@ -38,24 +34,6 @@ public class NewThreadScheduler extends Scheduler {
3834
private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);
3935
private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();
4036

41-
static final class RxThreadFactory implements ThreadFactory {
42-
final String prefix;
43-
volatile long counter;
44-
static final AtomicLongFieldUpdater<RxThreadFactory> COUNTER_UPDATER
45-
= AtomicLongFieldUpdater.newUpdater(RxThreadFactory.class, "counter");
46-
47-
public RxThreadFactory(String prefix) {
48-
this.prefix = prefix;
49-
}
50-
51-
@Override
52-
public Thread newThread(Runnable r) {
53-
Thread t = new Thread(r, prefix + COUNTER_UPDATER.incrementAndGet(this));
54-
t.setDaemon(true);
55-
return t;
56-
}
57-
}
58-
5937
/* package */static NewThreadScheduler instance() {
6038
return INSTANCE;
6139
}

rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,16 @@
1515
*/
1616
package rx.schedulers;
1717

18-
import java.util.concurrent.Executors;
1918
import rx.Scheduler;
19+
import rx.internal.util.RxThreadFactory;
20+
21+
import java.util.concurrent.Executors;
2022

2123
public class ExecutorSchedulerTest extends AbstractSchedulerConcurrencyTests {
2224

2325
@Override
2426
protected Scheduler getScheduler() {
25-
return Schedulers.from(Executors.newFixedThreadPool(2, new NewThreadScheduler.RxThreadFactory("TestCustomPool-")));
27+
return Schedulers.from(Executors.newFixedThreadPool(2, new RxThreadFactory("TestCustomPool-")));
2628
}
2729

2830
}

0 commit comments

Comments
 (0)