File tree Expand file tree Collapse file tree 2 files changed +20
-3
lines changed
rxjava-contrib/rxjava-string/src Expand file tree Collapse file tree 2 files changed +20
-3
lines changed Original file line number Diff line number Diff line change @@ -71,11 +71,11 @@ public void call(Subscriber<? super byte[]> o) {
71
71
try {
72
72
if (o .isUnsubscribed ())
73
73
return ;
74
- int n = 0 ;
75
- n = i .read (buffer );
74
+ int n = i .read (buffer );
76
75
while (n != -1 && !o .isUnsubscribed ()) {
77
76
o .onNext (Arrays .copyOf (buffer , n ));
78
- n = i .read (buffer );
77
+ if (!o .isUnsubscribed ())
78
+ n = i .read (buffer );
79
79
}
80
80
} catch (IOException e ) {
81
81
o .onError (e );
Original file line number Diff line number Diff line change 33
33
import java .nio .charset .MalformedInputException ;
34
34
import java .util .Arrays ;
35
35
import java .util .List ;
36
+ import java .util .concurrent .atomic .AtomicInteger ;
36
37
37
38
import org .junit .Test ;
38
39
@@ -246,6 +247,22 @@ public void testFromInputStream() {
246
247
assertArrayEquals (inBytes , outBytes );
247
248
}
248
249
250
+ @ Test
251
+ public void testFromInputStreamWillUnsubscribeBeforeCallingNextRead () {
252
+ final byte [] inBytes = "test" .getBytes ();
253
+ final AtomicInteger numReads = new AtomicInteger (0 );
254
+ ByteArrayInputStream is = new ByteArrayInputStream (inBytes ) {
255
+
256
+ @ Override
257
+ public synchronized int read (byte [] b , int off , int len ) {
258
+ numReads .incrementAndGet ();
259
+ return super .read (b , off , len );
260
+ }
261
+ };
262
+ StringObservable .from (is ).first ().toBlockingObservable ().single ();
263
+ assertEquals (1 , numReads .get ());
264
+ }
265
+
249
266
@ Test
250
267
public void testFromReader () {
251
268
final String inStr = "test" ;
You can’t perform that action at this time.
0 commit comments