Skip to content

ReplaySubject/.replay() calls onComplete multiple times #1105

Closed
@pequnio3

Description

@pequnio3

In the following code:

        ConnectableObservable<Integer> intObs = Observable
                .from(Sets.newHashSet(1, 2))
                .replay();
        twoAndFour.connect();

        Observable<Integer> fooObs = intObs.toList()
                                                     .map(new Func1<List<Integer>, Integer>() {
                                                         @Override
                                                         public Integer call(List<Integer> integers) {
                                                             return 1;
                                                         }
                                                     });

The "return 1;" statement gets hit twice. Because the .toList() aggregate function is being called, its expected that the intObs.toList() observer should only emit 1 object. Unfortunately, the ReplaySubject that backs .replay() ends up calling onComplete() multiple times which results in the .toList() operator calling onNext() multiple times.

This can lead to a lot of unexpected results. One of the reasons for this happening is that it seems as though most operators, don't unsubscribe after being completed as well as making sure onComplete and onError only get called once. Is there a reason most operators don't create subscribers that adhere to the rxGuidelines mentioned in the SafeSubscriber class?

http://go.microsoft.com/fwlink/?LinkID=205219

Any help would be much appreciated.

Thanks,
Blake

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions