Skip to content

Commit 8d41cc5

Browse files
authored
3.x: [Java 8] Implement mapOptional, collector, first/last/single stage (#6775)
* 3.x: [Java 8] Implement mapOptional, collector, first/last/single stage * Upgrade the Checkstyle plugin
1 parent 2f2cef5 commit 8d41cc5

17 files changed

+2967
-2
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ buildscript {
1616
ext.bintrayVersion = "1.8.4"
1717
ext.jfrogExtractorVersion = "4.11.0"
1818
ext.bndVersion = "4.3.1"
19-
ext.checkstyleVersion = "6.19"
19+
ext.checkstyleVersion = "8.26"
2020

2121
// --------------------------------------
2222

src/main/java/io/reactivex/rxjava3/core/Flowable.java

Lines changed: 259 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6930,6 +6930,7 @@ public final <U> Flowable<U> cast(final Class<U> clazz) {
69306930
* @return a Single that emits the result of collecting the values emitted by the source Publisher
69316931
* into a single mutable data structure
69326932
* @see <a href="http://reactivex.io/documentation/operators/reduce.html">ReactiveX operators documentation: Reduce</a>
6933+
* @see #collect(Collector)
69336934
*/
69346935
@CheckReturnValue
69356936
@NonNull
@@ -11256,12 +11257,13 @@ public final <R> Flowable<R> lift(FlowableOperator<? extends R, ? super T> lifte
1125611257
* @return a Flowable that emits the items from the source Publisher, transformed by the specified
1125711258
* function
1125811259
* @see <a href="http://reactivex.io/documentation/operators/map.html">ReactiveX operators documentation: Map</a>
11260+
* @see #mapOptional(Function)
1125911261
*/
1126011262
@CheckReturnValue
1126111263
@NonNull
1126211264
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
1126311265
@SchedulerSupport(SchedulerSupport.NONE)
11264-
public final <R> Flowable<R> map(Function<? super T, ? extends R> mapper) {
11266+
public final <@NonNull R> Flowable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
1126511267
Objects.requireNonNull(mapper, "mapper is null");
1126611268
return RxJavaPlugins.onAssembly(new FlowableMap<T, R>(this, mapper));
1126711269
}
@@ -18739,4 +18741,260 @@ public final TestSubscriber<T> test(long initialRequest, boolean cancel) { // No
1873918741
Objects.requireNonNull(stream, "stream is null");
1874018742
return RxJavaPlugins.onAssembly(new FlowableFromStream<>(stream));
1874118743
}
18744+
18745+
/**
18746+
* Maps each upstream value into an {@link Optional} and emits the contained item if not empty.
18747+
* <p>
18748+
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mapOptional.f.png" alt="">
18749+
*
18750+
* <dl>
18751+
* <dt><b>Backpressure:</b></dt>
18752+
* <dd>The operator is a pass-through for downstream requests but issues {@code request(1)} whenever the
18753+
* mapped {@code Optional} is empty.</dd>
18754+
* <dt><b>Scheduler:</b></dt>
18755+
* <dd>{@code mapOptional} does not operate by default on a particular {@link Scheduler}.</dd>
18756+
* </dl>
18757+
* @param <R> the non-null output type
18758+
* @param mapper the function that receives the upstream item and should return a <em>non-empty</em> {@code Optional}
18759+
* to emit as the output or an <em>empty</em> {@code Optional} to skip to the next upstream value
18760+
* @return the new Flowable instance
18761+
* @since 3.0.0
18762+
* @see #map(Function)
18763+
* @see #filter(Predicate)
18764+
*/
18765+
@CheckReturnValue
18766+
@BackpressureSupport(BackpressureKind.FULL)
18767+
@SchedulerSupport(SchedulerSupport.NONE)
18768+
@NonNull
18769+
public final <@NonNull R> Flowable<R> mapOptional(@NonNull Function<? super T, @NonNull Optional<? extends R>> mapper) {
18770+
Objects.requireNonNull(mapper, "mapper is null");
18771+
return RxJavaPlugins.onAssembly(new FlowableMapOptional<>(this, mapper));
18772+
}
18773+
18774+
/**
18775+
* Collects the finite upstream's values into a container via a Stream {@link Collector} callback set and emits
18776+
* it as the success result.
18777+
* <p>
18778+
* <img width="640" height="360" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/collector.f.png" alt="">
18779+
*
18780+
* <dl>
18781+
* <dt><b>Backpressure:</b></dt>
18782+
* <dd>The operator consumes the upstream in an unbounded manner.</dd>
18783+
* <dt><b>Scheduler:</b></dt>
18784+
* <dd>{@code collect} does not operate by default on a particular {@link Scheduler}.</dd>
18785+
* </dl>
18786+
* @param <R> the non-null result type
18787+
* @param <A> the intermediate container type used for the accumulation
18788+
* @param collector the interface defining the container supplier, accumulator and finisher functions;
18789+
* see {@link Collectors} for some standard implementations
18790+
* @return the new Single instance
18791+
* @since 3.0.0
18792+
* @see Collectors
18793+
* @see #collect(Supplier, BiConsumer)
18794+
*/
18795+
@CheckReturnValue
18796+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
18797+
@SchedulerSupport(SchedulerSupport.NONE)
18798+
@NonNull
18799+
public final <@NonNull R, A> Single<R> collect(@NonNull Collector<T, A, R> collector) {
18800+
Objects.requireNonNull(collector, "collector is null");
18801+
return RxJavaPlugins.onAssembly(new FlowableCollectWithCollectorSingle<>(this, collector));
18802+
}
18803+
18804+
/**
18805+
* Signals the first upstream item (or the default item if the upstream is empty) via
18806+
* a {@link CompletionStage}.
18807+
* <p>
18808+
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/firstStage.f.png" alt="">
18809+
* <p>
18810+
* The upstream can be canceled by converting the resulting {@code CompletionStage} into
18811+
* {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and
18812+
* calling {@link CompletableFuture#cancel(boolean)} on it.
18813+
* The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and
18814+
* completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}.
18815+
* <p>
18816+
* {@code CompletionStage}s don't have a notion of emptyness and allow {@code null}s, therefore, one can either use
18817+
* a {@code defaultItem} of {@code null} or turn the flow into a sequence of {@link Optional}s and default to {@link Optional#empty()}:
18818+
* <pre><code>
18819+
* CompletionStage&lt;Optional&lt;T&gt;&gt; stage = source.map(Optional::of).firstStage(Optional.empty());
18820+
* </code></pre>
18821+
* <dl>
18822+
* <dt><b>Backpressure:</b></dt>
18823+
* <dd>The operator requests one item from upstream and then when received, cancels the upstream.</dd>
18824+
* <dt><b>Scheduler:</b></dt>
18825+
* <dd>{@code firstStage} does not operate by default on a particular {@link Scheduler}.</dd>
18826+
* </dl>
18827+
* @param defaultItem the item to signal if the upstream is empty
18828+
* @return the new CompletionStage instance
18829+
* @since 3.0.0
18830+
* @see #firstOrErrorStage()
18831+
*/
18832+
@CheckReturnValue
18833+
@BackpressureSupport(BackpressureKind.FULL)
18834+
@SchedulerSupport(SchedulerSupport.NONE)
18835+
@NonNull
18836+
public final CompletionStage<T> firstStage(@Nullable T defaultItem) {
18837+
return subscribeWith(new FlowableFirstStageSubscriber<>(true, defaultItem));
18838+
}
18839+
18840+
/**
18841+
* Signals the only expected upstream item (or the default item if the upstream is empty)
18842+
* or signals {@link IllegalArgumentException} if the upstream has more than one item
18843+
* via a {@link CompletionStage}.
18844+
* <p>
18845+
* <img width="640" height="229" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/singleStage.f.png" alt="">
18846+
* <p>
18847+
* The upstream can be canceled by converting the resulting {@code CompletionStage} into
18848+
* {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and
18849+
* calling {@link CompletableFuture#cancel(boolean)} on it.
18850+
* The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and
18851+
* completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}.
18852+
* <p>
18853+
* {@code CompletionStage}s don't have a notion of emptyness and allow {@code null}s, therefore, one can either use
18854+
* a {@code defaultItem} of {@code null} or turn the flow into a sequence of {@link Optional}s and default to {@link Optional#empty()}:
18855+
* <pre><code>
18856+
* CompletionStage&lt;Optional&lt;T&gt;&gt; stage = source.map(Optional::of).singleStage(Optional.empty());
18857+
* </code></pre>
18858+
* <dl>
18859+
* <dt><b>Backpressure:</b></dt>
18860+
* <dd>The operator requests two items from upstream and then when more than one item is received, cancels the upstream.</dd>
18861+
* <dt><b>Scheduler:</b></dt>
18862+
* <dd>{@code singleStage} does not operate by default on a particular {@link Scheduler}.</dd>
18863+
* </dl>
18864+
* @param defaultItem the item to signal if the upstream is empty
18865+
* @return the new CompletionStage instance
18866+
* @since 3.0.0
18867+
* @see #singleOrErrorStage()
18868+
*/
18869+
@CheckReturnValue
18870+
@BackpressureSupport(BackpressureKind.FULL)
18871+
@SchedulerSupport(SchedulerSupport.NONE)
18872+
@NonNull
18873+
public final CompletionStage<T> singleStage(@Nullable T defaultItem) {
18874+
return subscribeWith(new FlowableSingleStageSubscriber<>(true, defaultItem));
18875+
}
18876+
18877+
/**
18878+
* Signals the last upstream item (or the default item if the upstream is empty) via
18879+
* a {@link CompletionStage}.
18880+
* <p>
18881+
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/lastStage.f.png" alt="">
18882+
* <p>
18883+
* The upstream can be canceled by converting the resulting {@code CompletionStage} into
18884+
* {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and
18885+
* calling {@link CompletableFuture#cancel(boolean)} on it.
18886+
* The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and
18887+
* completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}.
18888+
* <p>
18889+
* {@code CompletionStage}s don't have a notion of emptyness and allow {@code null}s, therefore, one can either use
18890+
* a {@code defaultItem} of {@code null} or turn the flow into a sequence of {@link Optional}s and default to {@link Optional#empty()}:
18891+
* <pre><code>
18892+
* CompletionStage&lt;Optional&lt;T&gt;&gt; stage = source.map(Optional::of).lastStage(Optional.empty());
18893+
* </code></pre>
18894+
* <dl>
18895+
* <dt><b>Backpressure:</b></dt>
18896+
* <dd>The operator requests an unbounded number of items from the upstream.</dd>
18897+
* <dt><b>Scheduler:</b></dt>
18898+
* <dd>{@code lastStage} does not operate by default on a particular {@link Scheduler}.</dd>
18899+
* </dl>
18900+
* @param defaultItem the item to signal if the upstream is empty
18901+
* @return the new CompletionStage instance
18902+
* @since 3.0.0
18903+
* @see #lastOrErrorStage()
18904+
*/
18905+
@CheckReturnValue
18906+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
18907+
@SchedulerSupport(SchedulerSupport.NONE)
18908+
@NonNull
18909+
public final CompletionStage<T> lastStage(@Nullable T defaultItem) {
18910+
return subscribeWith(new FlowableLastStageSubscriber<>(true, defaultItem));
18911+
}
18912+
18913+
/**
18914+
* Signals the first upstream item or a {@link NoSuchElementException} if the upstream is empty via
18915+
* a {@link CompletionStage}.
18916+
* <p>
18917+
* <img width="640" height="338" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/firstOrErrorStage.f.png" alt="">
18918+
* <p>
18919+
* The upstream can be canceled by converting the resulting {@code CompletionStage} into
18920+
* {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and
18921+
* calling {@link CompletableFuture#cancel(boolean)} on it.
18922+
* The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and
18923+
* completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}.
18924+
* <dl>
18925+
* <dt><b>Backpressure:</b></dt>
18926+
* <dd>The operator requests one item from upstream and then when received, cancels the upstream.</dd>
18927+
* <dt><b>Scheduler:</b></dt>
18928+
* <dd>{@code firstOrErrorStage} does not operate by default on a particular {@link Scheduler}.</dd>
18929+
* </dl>
18930+
* @return the new CompletionStage instance
18931+
* @since 3.0.0
18932+
* @see #firstStage(Object)
18933+
*/
18934+
@CheckReturnValue
18935+
@BackpressureSupport(BackpressureKind.FULL)
18936+
@SchedulerSupport(SchedulerSupport.NONE)
18937+
@NonNull
18938+
public final CompletionStage<T> firstOrErrorStage() {
18939+
return subscribeWith(new FlowableFirstStageSubscriber<>(false, null));
18940+
}
18941+
18942+
/**
18943+
* Signals the only expected upstream item, a {@link NoSuchElementException} if the upstream is empty
18944+
* or signals {@link IllegalArgumentException} if the upstream has more than one item
18945+
* via a {@link CompletionStage}.
18946+
* <p>
18947+
* <img width="640" height="229" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/singleOrErrorStage.f.png" alt="">
18948+
* <p>
18949+
* The upstream can be canceled by converting the resulting {@code CompletionStage} into
18950+
* {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and
18951+
* calling {@link CompletableFuture#cancel(boolean)} on it.
18952+
* The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and
18953+
* completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}.
18954+
* <dl>
18955+
* <dt><b>Backpressure:</b></dt>
18956+
* <dd>The operator requests two items from upstream and then when more than one item is received, cancels the upstream.</dd>
18957+
* <dt><b>Scheduler:</b></dt>
18958+
* <dd>{@code singleOrErrorStage} does not operate by default on a particular {@link Scheduler}.</dd>
18959+
* </dl>
18960+
* @return the new CompletionStage instance
18961+
* @since 3.0.0
18962+
* @see #singleStage(Object)
18963+
*/
18964+
@CheckReturnValue
18965+
@BackpressureSupport(BackpressureKind.FULL)
18966+
@SchedulerSupport(SchedulerSupport.NONE)
18967+
@NonNull
18968+
public final CompletionStage<T> singleOrErrorStage() {
18969+
return subscribeWith(new FlowableSingleStageSubscriber<>(false, null));
18970+
}
18971+
18972+
/**
18973+
* Signals the last upstream item or a {@link NoSuchElementException} if the upstream is empty via
18974+
* a {@link CompletionStage}.
18975+
* <p>
18976+
* <img width="640" height="346" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/lastOrErrorStage.f.png" alt="">
18977+
* <p>
18978+
* The upstream can be canceled by converting the resulting {@code CompletionStage} into
18979+
* {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and
18980+
* calling {@link CompletableFuture#cancel(boolean)} on it.
18981+
* The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and
18982+
* completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}.
18983+
* <dl>
18984+
* <dt><b>Backpressure:</b></dt>
18985+
* <dd>The operator requests an unbounded number of items from the upstream.</dd>
18986+
* <dt><b>Scheduler:</b></dt>
18987+
* <dd>{@code lastOrErrorStage} does not operate by default on a particular {@link Scheduler}.</dd>
18988+
* </dl>
18989+
* @return the new CompletionStage instance
18990+
* @since 3.0.0
18991+
* @see #lastStage(Object)
18992+
*/
18993+
@CheckReturnValue
18994+
@BackpressureSupport(BackpressureKind.FULL)
18995+
@SchedulerSupport(SchedulerSupport.NONE)
18996+
@NonNull
18997+
public final CompletionStage<T> lastOrErrorStage() {
18998+
return subscribeWith(new FlowableLastStageSubscriber<>(false, null));
18999+
}
1874219000
}

0 commit comments

Comments
 (0)