@@ -46,18 +46,18 @@ public OperatorGroupBy(final Func1<? super T, ? extends K> keySelector) {
46
46
}
47
47
48
48
@ Override
49
- public Subscriber <? super T > call (final Subscriber <? super GroupedObservable <K , T >> childObserver ) {
50
- return new GroupBySubscriber <K , T >(keySelector , childObserver );
49
+ public Subscriber <? super T > call (final Subscriber <? super GroupedObservable <K , T >> child ) {
50
+ return new GroupBySubscriber <K , T >(keySelector , child );
51
51
}
52
52
static final class GroupBySubscriber <K , T > extends Subscriber <T > {
53
53
final Func1 <? super T , ? extends K > keySelector ;
54
- final Subscriber <? super GroupedObservable <K , T >> childObserver ;
55
- public GroupBySubscriber (Func1 <? super T , ? extends K > keySelector , Subscriber <? super GroupedObservable <K , T >> childObserver ) {
54
+ final Subscriber <? super GroupedObservable <K , T >> child ;
55
+ public GroupBySubscriber (Func1 <? super T , ? extends K > keySelector , Subscriber <? super GroupedObservable <K , T >> child ) {
56
56
// a new CompositeSubscription to decouple the subscription as the inner subscriptions need a separate lifecycle
57
57
// and will unsubscribe on this parent if they are all unsubscribed
58
58
super (new ChainedSubscription ());
59
59
this .keySelector = keySelector ;
60
- this .childObserver = childObserver ;
60
+ this .child = child ;
61
61
}
62
62
private final Map <K , BufferUntilSubscriber <T >> groups = new HashMap <K , BufferUntilSubscriber <T >>();
63
63
volatile int completionCounter ;
@@ -86,7 +86,7 @@ public void onCompleted() {
86
86
if (completionCounter == 0 ) {
87
87
// we must track 'completionEmitted' seperately from 'completed' since `completeInner` can result in childObserver.onCompleted() being emitted
88
88
if (EMITTED_UPDATER .compareAndSet (this , 0 , 1 )) {
89
- childObserver .onCompleted ();
89
+ child .onCompleted ();
90
90
}
91
91
}
92
92
}
@@ -96,23 +96,23 @@ public void onCompleted() {
96
96
public void onError (Throwable e ) {
97
97
if (TERMINATED_UPDATER .compareAndSet (this , 0 , 1 )) {
98
98
// we immediately tear everything down if we receive an error
99
- childObserver .onError (e );
99
+ child .onError (e );
100
100
}
101
101
}
102
102
103
103
@ Override
104
104
public void onNext (T t ) {
105
105
try {
106
106
final K key = keySelector .call (t );
107
- BufferUntilSubscriber <T > gps = groups .get (key );
108
- if (gps == null ) {
107
+ BufferUntilSubscriber <T > group = groups .get (key );
108
+ if (group == null ) {
109
109
// this group doesn't exist
110
- if (childObserver .isUnsubscribed ()) {
110
+ if (child .isUnsubscribed ()) {
111
111
// we have been unsubscribed on the outer so won't send any more groups
112
112
return ;
113
113
}
114
- gps = BufferUntilSubscriber .create ();
115
- final BufferUntilSubscriber <T > _gps = gps ;
114
+ group = BufferUntilSubscriber .create ();
115
+ final BufferUntilSubscriber <T > _group = group ;
116
116
117
117
GroupedObservable <K , T > go = new GroupedObservable <K , T >(key , new OnSubscribe <T >() {
118
118
@@ -128,7 +128,7 @@ public void call() {
128
128
}
129
129
130
130
}));
131
- _gps .unsafeSubscribe (new Subscriber <T >(o ) {
131
+ _group .unsafeSubscribe (new Subscriber <T >(o ) {
132
132
133
133
@ Override
134
134
public void onCompleted () {
@@ -150,26 +150,26 @@ public void onNext(T t) {
150
150
}
151
151
152
152
});
153
- groups .put (key , gps );
154
- childObserver .onNext (go );
153
+ groups .put (key , group );
154
+ child .onNext (go );
155
155
}
156
156
// we have the correct group so send value to it
157
- gps .onNext (t );
157
+ group .onNext (t );
158
158
} catch (Throwable e ) {
159
159
onError (OnErrorThrowable .addValueAsLastCause (e , t ));
160
160
}
161
161
}
162
162
163
163
private void completeInner () {
164
164
// count can be < 0 because unsubscribe also calls this
165
- if (COUNTER_UPDATER .decrementAndGet (this ) <= 0 && (terminated == 1 || childObserver .isUnsubscribed ())) {
165
+ if (COUNTER_UPDATER .decrementAndGet (this ) <= 0 && (terminated == 1 || child .isUnsubscribed ())) {
166
166
// completionEmitted ensures we only emit onCompleted once
167
167
if (EMITTED_UPDATER .compareAndSet (this , 0 , 1 )) {
168
- if (childObserver .isUnsubscribed ()) {
168
+ if (child .isUnsubscribed ()) {
169
169
// if the entire groupBy has been unsubscribed and children are completed we will propagate the unsubscribe up.
170
170
unsubscribe ();
171
171
}
172
- childObserver .onCompleted ();
172
+ child .onCompleted ();
173
173
}
174
174
}
175
175
}
0 commit comments