18
18
import static org .junit .Assert .*;
19
19
import static org .mockito .Matchers .*;
20
20
import static org .mockito .Mockito .*;
21
- import static rx .operators .OperatorRetry .*;
22
21
23
22
import java .util .concurrent .atomic .AtomicInteger ;
24
23
25
24
import org .junit .Test ;
26
25
import org .mockito .InOrder ;
27
26
28
27
import rx .Observable ;
28
+ import rx .Observable .OnSubscribeFunc ;
29
29
import rx .Observer ;
30
30
import rx .Subscription ;
31
31
import rx .functions .Action1 ;
@@ -130,7 +130,7 @@ public Subscription onSubscribe(Observer<? super String> o) {
130
130
return Subscriptions .empty ();
131
131
}
132
132
}
133
-
133
+
134
134
@ Test
135
135
public void testUnsubscribeFromRetry () {
136
136
PublishSubject <Integer > subject = PublishSubject .create ();
@@ -139,10 +139,44 @@ public void testUnsubscribeFromRetry() {
139
139
@ Override
140
140
public void call (Integer n ) {
141
141
count .incrementAndGet ();
142
- }});
142
+ }
143
+ });
143
144
subject .onNext (1 );
144
145
sub .unsubscribe ();
145
146
subject .onNext (2 );
146
- assertEquals (1 ,count .get ());
147
+ assertEquals (1 , count .get ());
148
+ }
149
+
150
+ @ Test
151
+ public void testRetryAllowsSubscriptionAfterAllSubscriptionsUnsubsribed () throws InterruptedException {
152
+ final AtomicInteger subsCount = new AtomicInteger (0 );
153
+ OnSubscribeFunc <String > onSubscribe = new OnSubscribeFunc <String >() {
154
+ @ Override
155
+ public Subscription onSubscribe (Observer <? super String > observer ) {
156
+ subsCount .incrementAndGet ();
157
+ return new Subscription () {
158
+ boolean unsubscribed = false ;
159
+
160
+ @ Override
161
+ public void unsubscribe () {
162
+ subsCount .decrementAndGet ();
163
+ unsubscribed = true ;
164
+ }
165
+
166
+ @ Override
167
+ public boolean isUnsubscribed () {
168
+ return unsubscribed ;
169
+ }
170
+ };
171
+ }
172
+ };
173
+ Observable <String > stream = Observable .create (onSubscribe );
174
+ Observable <String > streamWithRetry = stream .retry ();
175
+ Subscription sub = streamWithRetry .subscribe ();
176
+ assertEquals (1 , subsCount .get ());
177
+ sub .unsubscribe ();
178
+ assertEquals (0 , subsCount .get ());
179
+ streamWithRetry .subscribe ();
180
+ assertEquals (1 , subsCount .get ());
147
181
}
148
182
}
0 commit comments