Description
Thus far Rx has left backpressure solutions "an exercise for the user". It can be done via manual feedback loops and operators like throttle
, sample
, window
, etc.
The drawback of this is that it requires someone understanding the implication of every operator and when it might be async (buffer/queue) and requires the effort to create a feedback loop and hook it up correctly. Some use cases will probably always require this type of manual effort, but it's worth exploring whether we can solve the general use cases within Rx.
Most Rx operators are synchronous in nature meaning the producing thread does not return until the work is completed. The map
operator is a good example. It does computation work to transform from T -> R
and nothing else.
There are some operators though that are async in nature and involve unbounded buffers. These include observeOn
, zip
and merge
(as of 0.17.1 which means flatMap
is affected). It also happens if an Observer
is writing the data out to non-blocking IO (such as a Netty channel).
In all of these example, if the producer is fast, such as an in-memory Iterable, loading a file, or a firehose of events over a network, buffer-bloat (and eventually OutOfMemory) can easily occur.
After some experimentation and talking with various individuals, teams and companies about this topic, a possible solution is the use of co-routines from the producing Observable
that a Subscriber
can request data from in batches.
Prototype Code
A prototype of this is being experimented with at https://github.com/benjchristensen/RxBackpressure though it is not yet complete (as of this writing).
The general idea is that an Observable.OnSubscribe
implementation could register a producer co-routine with the Subscriber
and then only push data down when the Subscriber
has said how much it can receive. It becomes a conditional push-model. The co-routine can then be "parked" by the producer by finishing work and returning (releasing the thread) and then "unparked" or resumed by the Subscriber
when it wants more. In this way no threads are blocked and only the amount of data the Subscriber
can handle is sent. The same model can work across threads or across the network.
If the Observable
chain has no async operators then it will be executed with an "infinite" request and behave exactly as it does today without any parking.
If an async operator is in the chain then it will "request" a batch size equaling it's internal buffer (say 128, 512, 1024 etc).
A simple producer of an Iterable
would look like this (ignoring debates over naming and API):
public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {
final Iterable<? extends T> is;
public OnSubscribeFromIterable(Iterable<? extends T> iterable) {
this.is = iterable;
}
@Override
public void call(final Subscriber<? super T> o) {
// state is outside the co-routine below
final Iterator<? extends T> iter = is.iterator();
// define the co-routine
Action1<Request> func = new Action1<Request>() {
@Override
public void call(final Request r) {
if (!r.countDown()) {
return;
}
while (iter.hasNext()) {
final T value = iter.next();
o.onNext(value);
if (!r.countDown()) {
// we have delivered all that was requested
// so "park" be returning and releasing this thread
return;
}
}
o.onCompleted();
}
};
// register co-routine with Subscriber
o.setProducer(func);
}
}
The observeOn
operator is async and thus will request batches as needed. The relevant code looks like this:
private void pollQueue() {
do {
Object v = queue.poll();
if (v != null) {
if (v instanceof Sentinel) {
if (v == NULL_SENTINEL) {
observer.onNext(null);
} else if (v == COMPLETE_SENTINEL) {
observer.onCompleted();
} else if (v instanceof ErrorSentinel) {
observer.onError(((ErrorSentinel) v).e);
}
} else {
observer.onNext((T) v);
}
}
requested--;
} while (counter.decrementAndGet() > 0 && !observer.isUnsubscribed());
if (requested == 0) {
requested += SIZE;
// request more (starting the co-routine up again)
request(SIZE);
}
})
The subscribeOn
operator decorates the co-routine so scheduling is retained and is started on the desired thread each time rather than the consuming thread doing the work.
public void setProducer(final Action1<rx.Subscriber.Request> producer) {
subscriber.setProducer(new Action1<Request>() {
@Override
public void call(final rx.Subscriber.Request r) {
inner.schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
producer.call(r);
}
});
}
});
}
The take
and skip
operators compose the batch sizes to adjust accordingly to what is being taken or skipped:
- https://github.com/benjchristensen/RxBackpressure/blob/master/rx-bp-prototype/src/main/java/rx/operators/OperatorTake.java#L93
- https://github.com/benjchristensen/RxBackpressure/blob/master/rx-bp-prototype/src/main/java/rx/operators/OperatorSkip.java#L51
The Subscriber
manages the life-cycle of when the co-routine is run: https://github.com/benjchristensen/RxBackpressure/blob/master/rx-bp-prototype/src/main/java/rx/Subscriber.java#L97
For example, if the final Subscriber
in the chain is hit and no async operator was involved it immediately invokes the co-routine with "-1" request size so it is infinite.
Example Use Cases
Use case are being written here: https://github.com/benjchristensen/RxBackpressure/tree/master/rx-bp-examples/src/test/java/rx/examples
Handling Observables Without Co-routines
Implementing an Observable
that supports backpressure is more complicated and not all will support it. In those cases the async operators will onError
stating that an Observable
is not respecting the backpressure and suggest resolutions (linking to documentation probably).
The two routes at that time are: 1) fix the Observable
to support backpressure, or 2) use one of several backpressure operators that will be added such as:
whileParkedDrop
whileParkedBuffer
whileParkedBufferThenDrop(int maxCount)
whileParkedBlock
(when blocking is okay, such as on Quasar fibers, IO threads, etc)whileParkedUnsubscribe
(such as on a hot stream like mouse events)
Outstanding Work
An earlier prototype (not public) had zip
functioning but the current code does not. We know how it is done and this model works with it but have not yet spent the time to re-implement it. We need to.
The biggest outstanding item is making merge
work (and thus flatMap
). This is the major hurdle of any backpressure solution working.
We (myself, @abersnaze, @headinthebox and others) have whiteboarded it and believe we have a solution but my schedule has not allowed me to code it. I have held off on writing this post as I wanted to code it up first, but since I still haven't had time I wanted to get this information public instead of holding it up.
The planned design for merge
is that each Observable
being merged would have it's own buffer (of say 128 items) and then the consumer would request n items which would come in round-robin fashion from all of the merged buffers. Each individual Observable
would exert it's own backpressure upstream. Slow Observables
may never fill their buffer while fast ones will.
We will not attempt to limit the horizontal growth (number of Observables
being merged) but will limit the vertical growth (size of buffer for each Observable
).
The challenge with this is making sure performance and fairness are balanced.
Next Steps
- Finish prototype implementatation of
merge
andzip
- Finish examples across threads and netwwork.
- Bikeshed and debate APIs and naming conventions
- Prove or disprove performance/functionality tradeoff.
- Test long enough to convince ourselves we're not going to cause deadlocks/livelocks/hangs somewhere (co-routines not running).
I look forward to your help making this happen.