Description
Opening this issue to capture, document and discuss how serialization (serialize()
, merge()
, flatMap()
, mergeMap()
) behaves and is implemented.
Prior to 0.17.1 all serialization was done via blocking synchronization. This is how Rx.Net does it so we adopted the same pattern. This however breaks the model of Rx which is supposed to be non-blocking and be usable in non-blocking environments (think NIO event loops like Netty, Vert.x and Node.js-style apps). Blocking a thread while merging can significantly impact throughput in an application based on event-loops.
The migration to doing "serialization" instead of "synchronization" came with trade-offs.
Back Pressure
To be non-blocking means it must becomes async and allow threads to deliver their notifications, queueing if necessary and return. This can result in buffer-bloat and typical back-pressure problems. Solutions to this are being explored internally and with other teams/companies and will result in changes in the future.
Concurrency
One way of solving the problem without blocking is similar to observeOn
and everything gets dropped into a queue and another thread pulls off the queue.
This however means that we are injecting additional concurrency in places it is not expected and generally not desired.
The current implementation does not do this. It uses the threads that are pushing events through and "steals" a single thread at a time to push through whatever is in the queue and then itself and then return to do its own work.
Thread Starvation
Stealing threads opens up the possibility of thread starvation. If a thread loops continually to drain the queue and the queue always is getting filled by other threads it will never be released to do its own work. This would means that the events intended for it to deliver would never be delivered as it is always busy delivering events on behalf of other threads.
Delayed Delivery
To prevent thread starvation the current implementation only allows draining the queue once. This can be increased to multiple iterations, but at some point it stops draining and returns and allows another thread to "win" and start draining.
During the time gap between the draining thread finishing and and a new thread taking over there may be a delay where events stay in the queue. This can delay delivery of events.
In a fast moving stream this is not a problem as another thread immediately takes over. In an intermittent stream however this can possibly mean long, non-determistic delays.
Possible Improvements
There are a few ways to improve this situation without reverting back to blocking.
Metrics could be kept to know if a stream is fast moving and thus thread-starvation is an issue and draining should be handed off to another thread. If starvation is not an issue then the queue could be fully drained before returning. This is still not perfect and would still risk one of the two occurring, but could probably solve most cases. The difficult is doing so without significantly impacting normal performance.
Another option is conditionally scheduling delivery onto another Scheduler
when starvation is happening. This would allow most cases to be done by stealing threads, but flip to an observeOn
style model if contention and/or starvation is happening.
Next steps
If the current functionality is breaking your use cases, you may want to stay on 0.17.0 while working with us to improve. Most use cases have shown to work fine with 0.17.1 behavior and the non-blocking and deadlock-free characteristics are necessary.
I welcome discussion, unit tests, pull requests and assistance on this.
Existing performance tests (that need work):
- https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/perf/java/rx/operators/OperatorSerializePerf.java
- https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/perf/java/rx/archive/operators/OperatorSerializePerformance.java
Existing unit tests: