Description
In the following code:
ConnectableObservable<Integer> intObs = Observable
.from(Sets.newHashSet(1, 2))
.replay();
twoAndFour.connect();
Observable<Integer> fooObs = intObs.toList()
.map(new Func1<List<Integer>, Integer>() {
@Override
public Integer call(List<Integer> integers) {
return 1;
}
});
The "return 1;" statement gets hit twice. Because the .toList() aggregate function is being called, its expected that the intObs.toList() observer should only emit 1 object. Unfortunately, the ReplaySubject that backs .replay() ends up calling onComplete() multiple times which results in the .toList() operator calling onNext() multiple times.
This can lead to a lot of unexpected results. One of the reasons for this happening is that it seems as though most operators, don't unsubscribe after being completed as well as making sure onComplete and onError only get called once. Is there a reason most operators don't create subscribers that adhere to the rxGuidelines mentioned in the SafeSubscriber class?
http://go.microsoft.com/fwlink/?LinkID=205219
Any help would be much appreciated.
Thanks,
Blake