Description
Folks, this is a pretty bad memory leak. The schedulePeriodically
methods as currently defined are leaking.
For example, let's start with the current code in rx.Scheduler, line 93.
The method in question is this:
public <T> Subscription schedulePeriodically(T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long initialDelay, long period, TimeUnit unit) {
final long periodInNanos = unit.toNanos(period);
final AtomicBoolean complete = new AtomicBoolean();
final Func2<Scheduler, T, Subscription> recursiveAction = new Func2<Scheduler, T, Subscription>() {
@Override
public Subscription call(Scheduler scheduler, T state0) {
if (!complete.get()) {
long startedAt = now();
final Subscription sub1 = action.call(scheduler, state0);
long timeTakenByActionInNanos = TimeUnit.MILLISECONDS.toNanos(now() - startedAt);
final Subscription sub2 = schedule(state0, this, periodInNanos - timeTakenByActionInNanos, TimeUnit.NANOSECONDS);
return Subscriptions.create(new Action0() {
@Override
public void call() {
sub1.unsubscribe();
sub2.unsubscribe();
}
});
}
return Subscriptions.empty();
}
};
final Subscription sub = schedule(state, recursiveAction, initialDelay, unit);
return Subscriptions.create(new Action0() {
@Override
public void call() {
complete.set(true);
sub.unsubscribe();
}
});
}
The problem is that each and every one periodic execution of the given action is creating a new Subscription reference that is capturing the reference of the previous subscription. And no subscription reference will ever be garbage collected, unless unsubscribe() happens.
It's easy to reproduce the effect (i.e. OutOfMemoryError) with a simple test that will end up with an error in a matter of seconds:
val sch = rx.schedulers.Schedulers.executor(Executors.newFixedThreadPool(2))
val counter = new AtomicInteger(0)
val action = new Action0 {
def call(): Unit = {
counter.increment()
}
}
val sub = sch.schedulePeriodically(action, 1, 1, TimeUnit.NANOSECONDS)
Here's how that looks like in a profiler:
The above, because it doesn't use a ScheduledExecutorService
, is using the default Scheduler.schedulePeriodically
that I pasted above, which is pretty heavy. The problem is that even with a ScheduledExecutorService
, it still leaks, because the ExecutorScheduler is still using a CompositeSubscription
and when scheduling an Action0
, this basically pushes Subscriptions.empty
over and over again into an ArrayList
. Thus, the code is slower in leaking memory, but it still leaks.
From what I can see, this affects all schedulePeriodically
implementations and as a consequence, things like Observable.interval
are anything but infinite.
UPDATE: I now see that a pull request addressing some issues was made in #712 - in particular, it changed the behavior of ExecutorScheduler. I haven't tested this version, I will do so today hopefully.
However the default Scheduler.schedulePeriodically
is still there and something to consider - if something that does periodic scheduling leaks, it probably shouldn't be there.