Skip to content

Backpressure #1000

Closed
Closed
@benjchristensen

Description

@benjchristensen

Thus far Rx has left backpressure solutions "an exercise for the user". It can be done via manual feedback loops and operators like throttle, sample, window, etc.

The drawback of this is that it requires someone understanding the implication of every operator and when it might be async (buffer/queue) and requires the effort to create a feedback loop and hook it up correctly. Some use cases will probably always require this type of manual effort, but it's worth exploring whether we can solve the general use cases within Rx.

Most Rx operators are synchronous in nature meaning the producing thread does not return until the work is completed. The map operator is a good example. It does computation work to transform from T -> R and nothing else.

There are some operators though that are async in nature and involve unbounded buffers. These include observeOn, zip and merge (as of 0.17.1 which means flatMap is affected). It also happens if an Observer is writing the data out to non-blocking IO (such as a Netty channel).

In all of these example, if the producer is fast, such as an in-memory Iterable, loading a file, or a firehose of events over a network, buffer-bloat (and eventually OutOfMemory) can easily occur.

After some experimentation and talking with various individuals, teams and companies about this topic, a possible solution is the use of co-routines from the producing Observable that a Subscriber can request data from in batches.

Prototype Code

A prototype of this is being experimented with at https://github.com/benjchristensen/RxBackpressure though it is not yet complete (as of this writing).

The general idea is that an Observable.OnSubscribe implementation could register a producer co-routine with the Subscriber and then only push data down when the Subscriber has said how much it can receive. It becomes a conditional push-model. The co-routine can then be "parked" by the producer by finishing work and returning (releasing the thread) and then "unparked" or resumed by the Subscriber when it wants more. In this way no threads are blocked and only the amount of data the Subscriber can handle is sent. The same model can work across threads or across the network.

If the Observable chain has no async operators then it will be executed with an "infinite" request and behave exactly as it does today without any parking.

If an async operator is in the chain then it will "request" a batch size equaling it's internal buffer (say 128, 512, 1024 etc).

A simple producer of an Iterable would look like this (ignoring debates over naming and API):

public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {

    final Iterable<? extends T> is;

    public OnSubscribeFromIterable(Iterable<? extends T> iterable) {
        this.is = iterable;
    }

    @Override
    public void call(final Subscriber<? super T> o) {
        // state is outside the co-routine below
        final Iterator<? extends T> iter = is.iterator();
        // define the co-routine
        Action1<Request> func = new Action1<Request>() {
            @Override
            public void call(final Request r) {
                if (!r.countDown()) {
                    return;
                }

                while (iter.hasNext()) {
                    final T value = iter.next();
                    o.onNext(value);

                    if (!r.countDown()) {
                        // we have delivered all that was requested
                        // so "park" be returning and releasing this thread
                        return;
                    }
                }
                o.onCompleted();
            }

        };

        // register co-routine with Subscriber
        o.setProducer(func);
    }
}

The observeOn operator is async and thus will request batches as needed. The relevant code looks like this:

private void pollQueue() {
    do {
        Object v = queue.poll();
        if (v != null) {
            if (v instanceof Sentinel) {
                if (v == NULL_SENTINEL) {
                    observer.onNext(null);
                } else if (v == COMPLETE_SENTINEL) {
                    observer.onCompleted();
                } else if (v instanceof ErrorSentinel) {
                    observer.onError(((ErrorSentinel) v).e);
                }
            } else {
                observer.onNext((T) v);
            }
        }
        requested--;
    } while (counter.decrementAndGet() > 0 && !observer.isUnsubscribed());
    if (requested == 0) {
        requested += SIZE;
        // request more (starting the co-routine up again)
        request(SIZE);
    }
})

The subscribeOn operator decorates the co-routine so scheduling is retained and is started on the desired thread each time rather than the consuming thread doing the work.

public void setProducer(final Action1<rx.Subscriber.Request> producer) {
    subscriber.setProducer(new Action1<Request>() {

        @Override
        public void call(final rx.Subscriber.Request r) {
            inner.schedule(new Action1<Inner>() {

                @Override
                public void call(Inner inner) {
                    producer.call(r);
                }

            });

        }

    });

}

The take and skip operators compose the batch sizes to adjust accordingly to what is being taken or skipped:

The Subscriber manages the life-cycle of when the co-routine is run: https://github.com/benjchristensen/RxBackpressure/blob/master/rx-bp-prototype/src/main/java/rx/Subscriber.java#L97

For example, if the final Subscriber in the chain is hit and no async operator was involved it immediately invokes the co-routine with "-1" request size so it is infinite.

Example Use Cases

Use case are being written here: https://github.com/benjchristensen/RxBackpressure/tree/master/rx-bp-examples/src/test/java/rx/examples

Handling Observables Without Co-routines

Implementing an Observable that supports backpressure is more complicated and not all will support it. In those cases the async operators will onError stating that an Observable is not respecting the backpressure and suggest resolutions (linking to documentation probably).

The two routes at that time are: 1) fix the Observable to support backpressure, or 2) use one of several backpressure operators that will be added such as:

  • whileParkedDrop
  • whileParkedBuffer
  • whileParkedBufferThenDrop(int maxCount)
  • whileParkedBlock (when blocking is okay, such as on Quasar fibers, IO threads, etc)
  • whileParkedUnsubscribe (such as on a hot stream like mouse events)

Outstanding Work

An earlier prototype (not public) had zip functioning but the current code does not. We know how it is done and this model works with it but have not yet spent the time to re-implement it. We need to.

The biggest outstanding item is making merge work (and thus flatMap). This is the major hurdle of any backpressure solution working.

We (myself, @abersnaze, @headinthebox and others) have whiteboarded it and believe we have a solution but my schedule has not allowed me to code it. I have held off on writing this post as I wanted to code it up first, but since I still haven't had time I wanted to get this information public instead of holding it up.

The planned design for merge is that each Observable being merged would have it's own buffer (of say 128 items) and then the consumer would request n items which would come in round-robin fashion from all of the merged buffers. Each individual Observable would exert it's own backpressure upstream. Slow Observables may never fill their buffer while fast ones will.

We will not attempt to limit the horizontal growth (number of Observables being merged) but will limit the vertical growth (size of buffer for each Observable).

The challenge with this is making sure performance and fairness are balanced.

Next Steps

  1. Finish prototype implementatation of merge and zip
  2. Finish examples across threads and netwwork.
  3. Bikeshed and debate APIs and naming conventions
  4. Prove or disprove performance/functionality tradeoff.
  5. Test long enough to convince ourselves we're not going to cause deadlocks/livelocks/hangs somewhere (co-routines not running).

I look forward to your help making this happen.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions