diff --git a/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationAverage.java b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationAverage.java
deleted file mode 100644
index 25d6bc7376..0000000000
--- a/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationAverage.java
+++ /dev/null
@@ -1,348 +0,0 @@
-/**
- * Copyright 2014 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package rx.math.operators;
-
-import rx.Observable;
-import rx.Observable.OnSubscribeFunc;
-import rx.Observer;
-import rx.Subscription;
-import rx.functions.Func1;
-import rx.functions.Func2;
-
-/**
- * A few operators for implementing the averaging operation.
- *
- * @see MSDN: Observable.Average
- */
-public final class OperationAverage {
- private static final class Tuple2 {
- private final T current;
- private final Integer count;
-
- private Tuple2(T v1, Integer v2) {
- current = v1;
- count = v2;
- }
- }
-
- public static Observable average(Observable source) {
- return source.reduce(new Tuple2(0, 0), new Func2, Integer, Tuple2>() {
- @Override
- public Tuple2 call(Tuple2 accu, Integer next) {
- return new Tuple2(accu.current + next, accu.count + 1);
- }
- }).map(new Func1, Integer>() {
- @Override
- public Integer call(Tuple2 result) {
- if (result.count == 0) {
- throw new IllegalArgumentException("Sequence contains no elements");
- }
- return result.current / result.count;
- }
- });
- }
-
- public static Observable averageLongs(Observable source) {
- return source.reduce(new Tuple2(0L, 0), new Func2, Long, Tuple2>() {
- @Override
- public Tuple2 call(Tuple2 accu, Long next) {
- return new Tuple2(accu.current + next, accu.count + 1);
- }
- }).map(new Func1, Long>() {
- @Override
- public Long call(Tuple2 result) {
- if (result.count == 0) {
- throw new IllegalArgumentException("Sequence contains no elements");
- }
- return result.current / result.count;
- }
- });
- }
-
- public static Observable averageFloats(Observable source) {
- return source.reduce(new Tuple2(0.0f, 0), new Func2, Float, Tuple2>() {
- @Override
- public Tuple2 call(Tuple2 accu, Float next) {
- return new Tuple2(accu.current + next, accu.count + 1);
- }
- }).map(new Func1, Float>() {
- @Override
- public Float call(Tuple2 result) {
- if (result.count == 0) {
- throw new IllegalArgumentException("Sequence contains no elements");
- }
- return result.current / result.count;
- }
- });
- }
-
- public static Observable averageDoubles(Observable source) {
- return source.reduce(new Tuple2(0.0d, 0), new Func2, Double, Tuple2>() {
- @Override
- public Tuple2 call(Tuple2 accu, Double next) {
- return new Tuple2(accu.current + next, accu.count + 1);
- }
- }).map(new Func1, Double>() {
- @Override
- public Double call(Tuple2 result) {
- if (result.count == 0) {
- throw new IllegalArgumentException("Sequence contains no elements");
- }
- return result.current / result.count;
- }
- });
- }
-
- /**
- * Compute the average by extracting integer values from the source via an
- * extractor function.
- *
- * @param
- * the source value type
- */
- public static final class AverageIntegerExtractor implements OnSubscribeFunc {
- final Observable extends T> source;
- final Func1 super T, Integer> valueExtractor;
-
- public AverageIntegerExtractor(Observable extends T> source, Func1 super T, Integer> valueExtractor) {
- this.source = source;
- this.valueExtractor = valueExtractor;
- }
-
- @Override
- public Subscription onSubscribe(Observer super Integer> t1) {
- return source.subscribe(new AverageObserver(t1));
- }
-
- /** Computes the average. */
- private final class AverageObserver implements Observer {
- final Observer super Integer> observer;
- int sum;
- int count;
-
- public AverageObserver(Observer super Integer> observer) {
- this.observer = observer;
- }
-
- @Override
- public void onNext(T args) {
- sum += valueExtractor.call(args);
- count++;
- }
-
- @Override
- public void onError(Throwable e) {
- observer.onError(e);
- }
-
- @Override
- public void onCompleted() {
- if (count > 0) {
- try {
- observer.onNext(sum / count);
- } catch (Throwable t) {
- observer.onError(t);
- return;
- }
- observer.onCompleted();
- } else {
- observer.onError(new IllegalArgumentException("Sequence contains no elements"));
- }
- }
-
- }
- }
-
- /**
- * Compute the average by extracting long values from the source via an
- * extractor function.
- *
- * @param
- * the source value type
- */
- public static final class AverageLongExtractor implements OnSubscribeFunc {
- final Observable extends T> source;
- final Func1 super T, Long> valueExtractor;
-
- public AverageLongExtractor(Observable extends T> source, Func1 super T, Long> valueExtractor) {
- this.source = source;
- this.valueExtractor = valueExtractor;
- }
-
- @Override
- public Subscription onSubscribe(Observer super Long> t1) {
- return source.subscribe(new AverageObserver(t1));
- }
-
- /** Computes the average. */
- private final class AverageObserver implements Observer {
- final Observer super Long> observer;
- long sum;
- int count;
-
- public AverageObserver(Observer super Long> observer) {
- this.observer = observer;
- }
-
- @Override
- public void onNext(T args) {
- sum += valueExtractor.call(args);
- count++;
- }
-
- @Override
- public void onError(Throwable e) {
- observer.onError(e);
- }
-
- @Override
- public void onCompleted() {
- if (count > 0) {
- try {
- observer.onNext(sum / count);
- } catch (Throwable t) {
- observer.onError(t);
- return;
- }
- observer.onCompleted();
- } else {
- observer.onError(new IllegalArgumentException("Sequence contains no elements"));
- }
- }
-
- }
- }
-
- /**
- * Compute the average by extracting float values from the source via an
- * extractor function.
- *
- * @param
- * the source value type
- */
- public static final class AverageFloatExtractor implements OnSubscribeFunc {
- final Observable extends T> source;
- final Func1 super T, Float> valueExtractor;
-
- public AverageFloatExtractor(Observable extends T> source, Func1 super T, Float> valueExtractor) {
- this.source = source;
- this.valueExtractor = valueExtractor;
- }
-
- @Override
- public Subscription onSubscribe(Observer super Float> t1) {
- return source.subscribe(new AverageObserver(t1));
- }
-
- /** Computes the average. */
- private final class AverageObserver implements Observer {
- final Observer super Float> observer;
- float sum;
- int count;
-
- public AverageObserver(Observer super Float> observer) {
- this.observer = observer;
- }
-
- @Override
- public void onNext(T args) {
- sum += valueExtractor.call(args);
- count++;
- }
-
- @Override
- public void onError(Throwable e) {
- observer.onError(e);
- }
-
- @Override
- public void onCompleted() {
- if (count > 0) {
- try {
- observer.onNext(sum / count);
- } catch (Throwable t) {
- observer.onError(t);
- return;
- }
- observer.onCompleted();
- } else {
- observer.onError(new IllegalArgumentException("Sequence contains no elements"));
- }
- }
-
- }
- }
-
- /**
- * Compute the average by extracting double values from the source via an
- * extractor function.
- *
- * @param
- * the source value type
- */
- public static final class AverageDoubleExtractor implements OnSubscribeFunc {
- final Observable extends T> source;
- final Func1 super T, Double> valueExtractor;
-
- public AverageDoubleExtractor(Observable extends T> source, Func1 super T, Double> valueExtractor) {
- this.source = source;
- this.valueExtractor = valueExtractor;
- }
-
- @Override
- public Subscription onSubscribe(Observer super Double> t1) {
- return source.subscribe(new AverageObserver(t1));
- }
-
- /** Computes the average. */
- private final class AverageObserver implements Observer {
- final Observer super Double> observer;
- double sum;
- int count;
-
- public AverageObserver(Observer super Double> observer) {
- this.observer = observer;
- }
-
- @Override
- public void onNext(T args) {
- sum += valueExtractor.call(args);
- count++;
- }
-
- @Override
- public void onError(Throwable e) {
- observer.onError(e);
- }
-
- @Override
- public void onCompleted() {
- if (count > 0) {
- try {
- observer.onNext(sum / count);
- } catch (Throwable t) {
- observer.onError(t);
- return;
- }
- observer.onCompleted();
- } else {
- observer.onError(new IllegalArgumentException("Sequence contains no elements"));
- }
- }
-
- }
- }
-}
diff --git a/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperatorAverageDouble.java b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperatorAverageDouble.java
new file mode 100644
index 0000000000..baad9ff8e6
--- /dev/null
+++ b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperatorAverageDouble.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.math.operators;
+
+import rx.Observable.Operator;
+import rx.Subscriber;
+import rx.functions.Func1;
+
+/**
+ * Compute the average by extracting double values from the source via an
+ * extractor function.
+ *
+ * @param
+ * the source value type
+ */
+public final class OperatorAverageDouble implements Operator {
+ final Func1 super T, Double> valueExtractor;
+
+ public OperatorAverageDouble(Func1 super T, Double> valueExtractor) {
+ this.valueExtractor = valueExtractor;
+ }
+
+ @Override
+ public Subscriber super T> call(Subscriber super Double> child) {
+ return new AverageObserver(child);
+ }
+
+ /** Computes the average. */
+ private final class AverageObserver extends Subscriber {
+ final Subscriber super Double> child;
+ double sum;
+ int count;
+
+ public AverageObserver(Subscriber super Double> subscriber) {
+ super(subscriber);
+ this.child = subscriber;
+ }
+
+ @Override
+ public void onNext(T args) {
+ sum += valueExtractor.call(args);
+ count++;
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ child.onError(e);
+ }
+
+ @Override
+ public void onCompleted() {
+ if (count > 0) {
+ try {
+ child.onNext(sum / count);
+ } catch (Throwable t) {
+ child.onError(t);
+ return;
+ }
+ child.onCompleted();
+ } else {
+ child.onError(new IllegalArgumentException("Sequence contains no elements"));
+ }
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperatorAverageFloat.java b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperatorAverageFloat.java
new file mode 100644
index 0000000000..148d4f3d84
--- /dev/null
+++ b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperatorAverageFloat.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.math.operators;
+
+import rx.Observable.Operator;
+import rx.Subscriber;
+import rx.functions.Func1;
+
+/**
+ * Compute the average by extracting float values from the source via an
+ * extractor function.
+ *
+ * @param
+ * the source value type
+ */
+public final class OperatorAverageFloat implements Operator {
+ final Func1 super T, Float> valueExtractor;
+
+ public OperatorAverageFloat(Func1 super T, Float> valueExtractor) {
+ this.valueExtractor = valueExtractor;
+ }
+
+ @Override
+ public Subscriber super T> call(Subscriber super Float> child) {
+ return new AverageObserver(child);
+ }
+
+ /** Computes the average. */
+ private final class AverageObserver extends Subscriber {
+ final Subscriber super Float> child;
+ float sum;
+ int count;
+
+ public AverageObserver(Subscriber super Float> subscriber) {
+ super(subscriber);
+ this.child = subscriber;
+ }
+
+ @Override
+ public void onNext(T args) {
+ sum += valueExtractor.call(args);
+ count++;
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ child.onError(e);
+ }
+
+ @Override
+ public void onCompleted() {
+ if (count > 0) {
+ try {
+ child.onNext(sum / count);
+ } catch (Throwable t) {
+ child.onError(t);
+ return;
+ }
+ child.onCompleted();
+ } else {
+ child.onError(new IllegalArgumentException("Sequence contains no elements"));
+ }
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperatorAverageInteger.java b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperatorAverageInteger.java
new file mode 100644
index 0000000000..ef037101bd
--- /dev/null
+++ b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperatorAverageInteger.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.math.operators;
+
+import rx.Observable.Operator;
+import rx.Subscriber;
+import rx.functions.Func1;
+
+/**
+ * Compute the average by extracting integer values from the source via an
+ * extractor function.
+ *
+ * @param
+ * the source value type
+ */
+public final class OperatorAverageInteger implements Operator {
+ final Func1 super T, Integer> valueExtractor;
+
+ public OperatorAverageInteger(Func1 super T, Integer> valueExtractor) {
+ this.valueExtractor = valueExtractor;
+ }
+
+ @Override
+ public Subscriber super T> call(Subscriber super Integer> t1) {
+ return new AverageObserver(t1);
+ }
+
+ /** Computes the average. */
+ private final class AverageObserver extends Subscriber {
+ final Subscriber super Integer> child;
+ int sum;
+ int count;
+
+ public AverageObserver(Subscriber super Integer> subscriber) {
+ super(subscriber);
+ this.child = subscriber;
+ }
+
+ @Override
+ public void onNext(T args) {
+ sum += valueExtractor.call(args);
+ count++;
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ child.onError(e);
+ }
+
+ @Override
+ public void onCompleted() {
+ if (count > 0) {
+ try {
+ child.onNext(sum / count);
+ } catch (Throwable t) {
+ child.onError(t);
+ return;
+ }
+ child.onCompleted();
+ } else {
+ child.onError(new IllegalArgumentException("Sequence contains no elements"));
+ }
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperatorAverageLong.java b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperatorAverageLong.java
new file mode 100644
index 0000000000..5f879a0528
--- /dev/null
+++ b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperatorAverageLong.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.math.operators;
+
+import rx.Observable.Operator;
+import rx.Subscriber;
+import rx.functions.Func1;
+
+/**
+ * Compute the average by extracting long values from the source via an
+ * extractor function.
+ *
+ * @param
+ * the source value type
+ */
+public final class OperatorAverageLong implements Operator {
+ final Func1 super T, Long> valueExtractor;
+
+ public OperatorAverageLong(Func1 super T, Long> valueExtractor) {
+ this.valueExtractor = valueExtractor;
+ }
+
+ @Override
+ public Subscriber super T> call(Subscriber super Long> child) {
+ return new AverageObserver(child);
+ }
+
+ /** Computes the average. */
+ private final class AverageObserver extends Subscriber {
+ final Subscriber super Long> child;
+ long sum;
+ int count;
+
+ public AverageObserver(Subscriber super Long> subscriber) {
+ super(subscriber);
+ this.child = subscriber;
+ }
+
+ @Override
+ public void onNext(T args) {
+ sum += valueExtractor.call(args);
+ count++;
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ child.onError(e);
+ }
+
+ @Override
+ public void onCompleted() {
+ if (count > 0) {
+ try {
+ child.onNext(sum / count);
+ } catch (Throwable t) {
+ child.onError(t);
+ return;
+ }
+ child.onCompleted();
+ } else {
+ child.onError(new IllegalArgumentException("Sequence contains no elements"));
+ }
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationMinMax.java b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperatorMinMax.java
similarity index 97%
rename from rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationMinMax.java
rename to rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperatorMinMax.java
index cfa92d4fce..d3a42fc2f2 100644
--- a/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationMinMax.java
+++ b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperatorMinMax.java
@@ -26,7 +26,8 @@
/**
* Returns the minimum element in an observable sequence.
*/
-public class OperationMinMax {
+public final class OperatorMinMax {
+ private OperatorMinMax() { throw new IllegalStateException("No instances!"); }
public static > Observable min(
Observable source) {
diff --git a/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationSum.java b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperatorSum.java
similarity index 96%
rename from rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationSum.java
rename to rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperatorSum.java
index 3d519e4220..19bd0e9fa3 100644
--- a/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationSum.java
+++ b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperatorSum.java
@@ -25,7 +25,8 @@
* href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum%28v=vs.103%29.aspx">MSDN:
* Observable.Sum
*/
-public final class OperationSum {
+public final class OperatorSum {
+ private OperatorSum() { throw new IllegalStateException("No instances!"); }
public static Observable sumIntegers(Observable source) {
return source.reduce(0, ACCUM_INT);
diff --git a/rxjava-contrib/rxjava-math/src/main/java/rx/observables/MathObservable.java b/rxjava-contrib/rxjava-math/src/main/java/rx/observables/MathObservable.java
index 2614983bf8..1514f1c9a4 100644
--- a/rxjava-contrib/rxjava-math/src/main/java/rx/observables/MathObservable.java
+++ b/rxjava-contrib/rxjava-math/src/main/java/rx/observables/MathObservable.java
@@ -19,9 +19,13 @@
import rx.Observable;
import rx.functions.Func1;
-import rx.math.operators.OperationAverage;
-import rx.math.operators.OperationMinMax;
-import rx.math.operators.OperationSum;
+import rx.functions.Functions;
+import rx.math.operators.OperatorMinMax;
+import rx.math.operators.OperatorSum;
+import rx.math.operators.OperatorAverageDouble;
+import rx.math.operators.OperatorAverageFloat;
+import rx.math.operators.OperatorAverageInteger;
+import rx.math.operators.OperatorAverageLong;
public class MathObservable {
@@ -48,7 +52,7 @@ public static MathObservable from(Observable o) {
* @see MSDN: Observable.Average
*/
public final static Observable averageDouble(Observable source) {
- return OperationAverage.averageDoubles(source);
+ return source.lift(new OperatorAverageDouble(Functions.identity()));
}
/**
@@ -64,7 +68,7 @@ public final static Observable averageDouble(Observable source)
* @see MSDN: Observable.Average
*/
public final static Observable averageFloat(Observable source) {
- return OperationAverage.averageFloats(source);
+ return source.lift(new OperatorAverageFloat(Functions.identity()));
}
/**
@@ -82,7 +86,7 @@ public final static Observable averageFloat(Observable source) {
* @see MSDN: Observable.Average
*/
public final static Observable averageInteger(Observable source) {
- return OperationAverage.average(source);
+ return source.lift(new OperatorAverageInteger(Functions.identity()));
}
/**
@@ -98,7 +102,7 @@ public final static Observable averageInteger(Observable sourc
* @see MSDN: Observable.Average
*/
public final static Observable averageLong(Observable source) {
- return OperationAverage.averageLongs(source);
+ return source.lift(new OperatorAverageLong(Functions.identity()));
}
/**
@@ -117,7 +121,7 @@ public final static Observable averageLong(Observable source) {
* @see MSDN: Observable.Max
*/
public final static > Observable max(Observable source) {
- return OperationMinMax.max(source);
+ return OperatorMinMax.max(source);
}
/**
@@ -134,7 +138,7 @@ public final static > Observable max(Observab
* @see MSDN: Observable.Min
*/
public final static > Observable min(Observable source) {
- return OperationMinMax.min(source);
+ return OperatorMinMax.min(source);
}
/**
@@ -150,7 +154,7 @@ public final static > Observable min(Observab
* @see MSDN: Observable.Sum
*/
public final static Observable sumDouble(Observable source) {
- return OperationSum.sumDoubles(source);
+ return OperatorSum.sumDoubles(source);
}
/**
@@ -166,7 +170,7 @@ public final static Observable sumDouble(Observable source) {
* @see MSDN: Observable.Sum
*/
public final static Observable sumFloat(Observable source) {
- return OperationSum.sumFloats(source);
+ return OperatorSum.sumFloats(source);
}
/**
@@ -182,7 +186,7 @@ public final static Observable sumFloat(Observable source) {
* @see MSDN: Observable.Sum
*/
public final static Observable sumInteger(Observable source) {
- return OperationSum.sumIntegers(source);
+ return OperatorSum.sumIntegers(source);
}
/**
@@ -198,7 +202,7 @@ public final static Observable sumInteger(Observable source) {
* @see MSDN: Observable.Sum
*/
public final static Observable sumLong(Observable source) {
- return OperationSum.sumLongs(source);
+ return OperatorSum.sumLongs(source);
}
/**
@@ -215,7 +219,7 @@ public final static Observable sumLong(Observable source) {
* @see MSDN: Observable.Average
*/
public final Observable averageDouble(Func1 super T, Double> valueExtractor) {
- return Observable.create(new OperationAverage.AverageDoubleExtractor(o, valueExtractor));
+ return o.lift(new OperatorAverageDouble(valueExtractor));
}
/**
@@ -232,7 +236,7 @@ public final Observable averageDouble(Func1 super T, Double> valueExtr
* @see MSDN: Observable.Average
*/
public final Observable averageFloat(Func1 super T, Float> valueExtractor) {
- return Observable.create(new OperationAverage.AverageFloatExtractor(o, valueExtractor));
+ return o.lift(new OperatorAverageFloat(valueExtractor));
}
/**
@@ -249,7 +253,7 @@ public final Observable averageFloat(Func1 super T, Float> valueExtract
* @see MSDN: Observable.Average
*/
public final Observable averageInteger(Func1 super T, Integer> valueExtractor) {
- return Observable.create(new OperationAverage.AverageIntegerExtractor(o, valueExtractor));
+ return o.lift(new OperatorAverageInteger(valueExtractor));
}
/**
@@ -266,7 +270,7 @@ public final Observable averageInteger(Func1 super T, Integer> valueE
* @see MSDN: Observable.Average
*/
public final Observable averageLong(Func1 super T, Long> valueExtractor) {
- return Observable.create(new OperationAverage.AverageLongExtractor(o, valueExtractor));
+ return o.lift(new OperatorAverageLong(valueExtractor));
}
/**
@@ -286,7 +290,7 @@ public final Observable averageLong(Func1 super T, Long> valueExtractor)
* @see MSDN: Observable.Max
*/
public final Observable max(Comparator super T> comparator) {
- return OperationMinMax.max(o, comparator);
+ return OperatorMinMax.max(o, comparator);
}
/**
@@ -305,7 +309,7 @@ public final Observable max(Comparator super T> comparator) {
* @see MSDN: Observable.Min
*/
public final Observable min(Comparator super T> comparator) {
- return OperationMinMax.min(o, comparator);
+ return OperatorMinMax.min(o, comparator);
}
/**
@@ -322,7 +326,7 @@ public final Observable min(Comparator super T> comparator) {
* @see MSDN: Observable.Sum
*/
public final Observable sumDouble(Func1 super T, Double> valueExtractor) {
- return OperationSum.sumAtLeastOneDoubles(o.map(valueExtractor));
+ return OperatorSum.sumAtLeastOneDoubles(o.map(valueExtractor));
}
/**
@@ -339,7 +343,7 @@ public final Observable sumDouble(Func1 super T, Double> valueExtracto
* @see MSDN: Observable.Sum
*/
public final Observable sumFloat(Func1 super T, Float> valueExtractor) {
- return OperationSum.sumAtLeastOneFloats(o.map(valueExtractor));
+ return OperatorSum.sumAtLeastOneFloats(o.map(valueExtractor));
}
/**
@@ -356,7 +360,7 @@ public final Observable sumFloat(Func1 super T, Float> valueExtractor)
* @see MSDN: Observable.Sum
*/
public final Observable sumInteger(Func1 super T, Integer> valueExtractor) {
- return OperationSum.sumAtLeastOneIntegers(o.map(valueExtractor));
+ return OperatorSum.sumAtLeastOneIntegers(o.map(valueExtractor));
}
/**
@@ -373,6 +377,6 @@ public final Observable sumInteger(Func1 super T, Integer> valueExtra
* @see MSDN: Observable.Sum
*/
public final Observable sumLong(Func1 super T, Long> valueExtractor) {
- return OperationSum.sumAtLeastOneLongs(o.map(valueExtractor));
+ return OperatorSum.sumAtLeastOneLongs(o.map(valueExtractor));
}
}
diff --git a/rxjava-contrib/rxjava-math/src/test/java/rx/math/operators/OperationAverageTest.java b/rxjava-contrib/rxjava-math/src/test/java/rx/math/operators/OperatorAverageTest.java
similarity index 88%
rename from rxjava-contrib/rxjava-math/src/test/java/rx/math/operators/OperationAverageTest.java
rename to rxjava-contrib/rxjava-math/src/test/java/rx/math/operators/OperatorAverageTest.java
index a77868dc7a..b53d19d874 100644
--- a/rxjava-contrib/rxjava-math/src/test/java/rx/math/operators/OperationAverageTest.java
+++ b/rxjava-contrib/rxjava-math/src/test/java/rx/math/operators/OperatorAverageTest.java
@@ -15,9 +15,16 @@
*/
package rx.math.operators;
-import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.*;
-import static rx.math.operators.OperationAverage.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyDouble;
+import static org.mockito.Matchers.anyFloat;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import org.junit.Test;
@@ -25,8 +32,9 @@
import rx.Observer;
import rx.functions.Func1;
import rx.observables.MathObservable;
+import static rx.observables.MathObservable.*;
-public class OperationAverageTest {
+public class OperatorAverageTest {
@SuppressWarnings("unchecked")
Observer w = mock(Observer.class);
@@ -40,7 +48,7 @@ public class OperationAverageTest {
@Test
public void testAverageOfAFewInts() throws Throwable {
Observable src = Observable.from(1, 2, 3, 4, 6);
- average(src).subscribe(w);
+ averageInteger(src).subscribe(w);
verify(w, times(1)).onNext(anyInt());
verify(w).onNext(3);
@@ -51,7 +59,7 @@ public void testAverageOfAFewInts() throws Throwable {
@Test
public void testEmptyAverage() throws Throwable {
Observable src = Observable.empty();
- average(src).subscribe(w);
+ averageInteger(src).subscribe(w);
verify(w, never()).onNext(anyInt());
verify(w, times(1)).onError(isA(IllegalArgumentException.class));
@@ -61,7 +69,7 @@ public void testEmptyAverage() throws Throwable {
@Test
public void testAverageOfAFewLongs() throws Throwable {
Observable src = Observable.from(1L, 2L, 3L, 4L, 6L);
- averageLongs(src).subscribe(wl);
+ averageLong(src).subscribe(wl);
verify(wl, times(1)).onNext(anyLong());
verify(wl).onNext(3L);
@@ -72,7 +80,7 @@ public void testAverageOfAFewLongs() throws Throwable {
@Test
public void testEmptyAverageLongs() throws Throwable {
Observable src = Observable.empty();
- averageLongs(src).subscribe(wl);
+ averageLong(src).subscribe(wl);
verify(wl, never()).onNext(anyLong());
verify(wl, times(1)).onError(isA(IllegalArgumentException.class));
@@ -82,7 +90,7 @@ public void testEmptyAverageLongs() throws Throwable {
@Test
public void testAverageOfAFewFloats() throws Throwable {
Observable src = Observable.from(1.0f, 2.0f);
- averageFloats(src).subscribe(wf);
+ averageFloat(src).subscribe(wf);
verify(wf, times(1)).onNext(anyFloat());
verify(wf).onNext(1.5f);
@@ -93,7 +101,7 @@ public void testAverageOfAFewFloats() throws Throwable {
@Test
public void testEmptyAverageFloats() throws Throwable {
Observable src = Observable.empty();
- averageFloats(src).subscribe(wf);
+ averageFloat(src).subscribe(wf);
verify(wf, never()).onNext(anyFloat());
verify(wf, times(1)).onError(isA(IllegalArgumentException.class));
@@ -103,7 +111,7 @@ public void testEmptyAverageFloats() throws Throwable {
@Test
public void testAverageOfAFewDoubles() throws Throwable {
Observable src = Observable.from(1.0d, 2.0d);
- averageDoubles(src).subscribe(wd);
+ averageDouble(src).subscribe(wd);
verify(wd, times(1)).onNext(anyDouble());
verify(wd).onNext(1.5d);
@@ -114,7 +122,7 @@ public void testAverageOfAFewDoubles() throws Throwable {
@Test
public void testEmptyAverageDoubles() throws Throwable {
Observable src = Observable.empty();
- averageDoubles(src).subscribe(wd);
+ averageDouble(src).subscribe(wd);
verify(wd, never()).onNext(anyDouble());
verify(wd, times(1)).onError(isA(IllegalArgumentException.class));
@@ -144,6 +152,7 @@ public Integer call(String t1) {
};
Observable result = MathObservable.from(source).averageInteger(length);
+ @SuppressWarnings("unchecked")
Observer