Skip to content

Scheduler.schedulePeriodically - memory leak #782

Closed
@alexandru

Description

@alexandru

Folks, this is a pretty bad memory leak. The schedulePeriodically methods as currently defined are leaking.

For example, let's start with the current code in rx.Scheduler, line 93.

The method in question is this:

    public <T> Subscription schedulePeriodically(T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long initialDelay, long period, TimeUnit unit) {
        final long periodInNanos = unit.toNanos(period);
        final AtomicBoolean complete = new AtomicBoolean();

        final Func2<Scheduler, T, Subscription> recursiveAction = new Func2<Scheduler, T, Subscription>() {
            @Override
            public Subscription call(Scheduler scheduler, T state0) {
                if (!complete.get()) {
                    long startedAt = now();
                    final Subscription sub1 = action.call(scheduler, state0);
                    long timeTakenByActionInNanos = TimeUnit.MILLISECONDS.toNanos(now() - startedAt);
                    final Subscription sub2 = schedule(state0, this, periodInNanos - timeTakenByActionInNanos, TimeUnit.NANOSECONDS);
                    return Subscriptions.create(new Action0() {
                        @Override
                        public void call() {
                            sub1.unsubscribe();
                            sub2.unsubscribe();
                        }
                    });
                }
                return Subscriptions.empty();
            }
        };
        final Subscription sub = schedule(state, recursiveAction, initialDelay, unit);
        return Subscriptions.create(new Action0() {
            @Override
            public void call() {
                complete.set(true);
                sub.unsubscribe();
            }
        });
    }

The problem is that each and every one periodic execution of the given action is creating a new Subscription reference that is capturing the reference of the previous subscription. And no subscription reference will ever be garbage collected, unless unsubscribe() happens.

It's easy to reproduce the effect (i.e. OutOfMemoryError) with a simple test that will end up with an error in a matter of seconds:

  val sch = rx.schedulers.Schedulers.executor(Executors.newFixedThreadPool(2))
  val counter = new AtomicInteger(0)

  val action = new Action0 {
    def call(): Unit = {
      counter.increment()
    }
  }

  val sub = sch.schedulePeriodically(action, 1, 1, TimeUnit.NANOSECONDS)

Here's how that looks like in a profiler:
image

The above, because it doesn't use a ScheduledExecutorService, is using the default Scheduler.schedulePeriodically that I pasted above, which is pretty heavy. The problem is that even with a ScheduledExecutorService, it still leaks, because the ExecutorScheduler is still using a CompositeSubscription and when scheduling an Action0, this basically pushes Subscriptions.empty over and over again into an ArrayList. Thus, the code is slower in leaking memory, but it still leaks.

From what I can see, this affects all schedulePeriodically implementations and as a consequence, things like Observable.interval are anything but infinite.

UPDATE: I now see that a pull request addressing some issues was made in #712 - in particular, it changed the behavior of ExecutorScheduler. I haven't tested this version, I will do so today hopefully.

However the default Scheduler.schedulePeriodically is still there and something to consider - if something that does periodic scheduling leaks, it probably shouldn't be there.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions