diff --git a/rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java b/rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java index ce154043c5..4ed99fbc22 100644 --- a/rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java +++ b/rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java @@ -71,11 +71,11 @@ public void call(Subscriber o) { try { if (o.isUnsubscribed()) return; - int n = 0; - n = i.read(buffer); + int n = i.read(buffer); while (n != -1 && !o.isUnsubscribed()) { o.onNext(Arrays.copyOf(buffer, n)); - n = i.read(buffer); + if (!o.isUnsubscribed()) + n = i.read(buffer); } } catch (IOException e) { o.onError(e); diff --git a/rxjava-contrib/rxjava-string/src/test/java/rx/observables/StringObservableTest.java b/rxjava-contrib/rxjava-string/src/test/java/rx/observables/StringObservableTest.java index aced4c12b8..6500ff7ba5 100644 --- a/rxjava-contrib/rxjava-string/src/test/java/rx/observables/StringObservableTest.java +++ b/rxjava-contrib/rxjava-string/src/test/java/rx/observables/StringObservableTest.java @@ -33,6 +33,7 @@ import java.nio.charset.MalformedInputException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; @@ -246,6 +247,22 @@ public void testFromInputStream() { assertArrayEquals(inBytes, outBytes); } + @Test + public void testFromInputStreamWillUnsubscribeBeforeCallingNextRead() { + final byte[] inBytes = "test".getBytes(); + final AtomicInteger numReads = new AtomicInteger(0); + ByteArrayInputStream is = new ByteArrayInputStream(inBytes) { + + @Override + public synchronized int read(byte[] b, int off, int len) { + numReads.incrementAndGet(); + return super.read(b, off, len); + } + }; + StringObservable.from(is).first().toBlockingObservable().single(); + assertEquals(1, numReads.get()); + } + @Test public void testFromReader() { final String inStr = "test";