diff --git a/spring-kafka-docs/src/main/asciidoc/changes-since-1.0.adoc b/spring-kafka-docs/src/main/asciidoc/changes-since-1.0.adoc index 158c473f78..137772cc2d 100644 --- a/spring-kafka-docs/src/main/asciidoc/changes-since-1.0.adoc +++ b/spring-kafka-docs/src/main/asciidoc/changes-since-1.0.adoc @@ -33,6 +33,12 @@ You can now configure which inbound headers should be mapped. Also available in version 2.8.8 or later. See <> for more information. +[[x29-template-changes]] +==== `KafkaTemplate` Changes + +In 3.0, the futures returned by this class will be `CompletableFuture` s instead of `ListenableFuture` s. +See <> for assistance in transitioning when using this release. + [[x29-rkt-changes]] ==== `ReplyingKafkaTemplate` Changes @@ -40,6 +46,9 @@ The template now provides a method to wait for assignment on the reply container Also available in version 2.8.8 or later. See <>. +In 3.0, the futures returned by this class will be `CompletableFuture` s instead of `ListenableFuture` s. +See <> and <> for assistance in transitioning when using this release. + === What's New in 2.8 Since 2.7 This section covers the changes made from version 2.7 to version 2.8. diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index 77c95dbe47..1cb0c7d5b8 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -170,25 +170,25 @@ The following listing shows the relevant methods from `KafkaTemplate`: ==== [source, java] ---- -ListenableFuture> sendDefault(V data); +CompletableFuture> sendDefault(V data); -ListenableFuture> sendDefault(K key, V data); +CompletableFuture> sendDefault(K key, V data); -ListenableFuture> sendDefault(Integer partition, K key, V data); +CompletableFuture> sendDefault(Integer partition, K key, V data); -ListenableFuture> sendDefault(Integer partition, Long timestamp, K key, V data); +CompletableFuture> sendDefault(Integer partition, Long timestamp, K key, V data); -ListenableFuture> send(String topic, V data); +CompletableFuture> send(String topic, V data); -ListenableFuture> send(String topic, K key, V data); +CompletableFuture> send(String topic, K key, V data); -ListenableFuture> send(String topic, Integer partition, K key, V data); +CompletableFuture> send(String topic, Integer partition, K key, V data); -ListenableFuture> send(String topic, Integer partition, Long timestamp, K key, V data); +CompletableFuture> send(String topic, Integer partition, Long timestamp, K key, V data); -ListenableFuture> send(ProducerRecord record); +CompletableFuture> send(ProducerRecord record); -ListenableFuture> send(Message message); +CompletableFuture> send(Message message); Map metrics(); @@ -210,6 +210,9 @@ interface ProducerCallback { See the https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/KafkaTemplate.html[Javadoc] for more detail. +IMPORTANT: In version 3.0, the methods that previously returned `ListenableFuture` have been changed to return `CompletableFuture`. +To facilitate the migration, the 2.9 version added a method `.usingCompletableFuture()` which provided the same methods with `CompletableFuture` return types; this method is no longer available. + The `sendDefault` API requires that a default topic has been provided to the template. The API takes in a `timestamp` as a parameter and stores this timestamp in the record. @@ -302,26 +305,16 @@ By default, the template is configured with a `LoggingProducerListener`, which l For convenience, default method implementations are provided in case you want to implement only one of the methods. -Notice that the send methods return a `ListenableFuture`. +Notice that the send methods return a `CompletableFuture`. You can register a callback with the listener to receive the result of the send asynchronously. The following example shows how to do so: ==== [source, java] ---- -ListenableFuture> future = template.send("myTopic", "something"); -future.addCallback(new ListenableFutureCallback>() { - - @Override - public void onSuccess(SendResult result) { - ... - } - - @Override - public void onFailure(Throwable ex) { - ... - } - +CompletableFuture> future = template.send("myTopic", "something"); +future.whenComplete((result, ex) -> { + ... }); ---- ==== @@ -329,48 +322,10 @@ future.addCallback(new ListenableFutureCallback>() { `SendResult` has two properties, a `ProducerRecord` and `RecordMetadata`. See the Kafka API documentation for information about those objects. -The `Throwable` in `onFailure` can be cast to a `KafkaProducerException`; its `failedProducerRecord` property contains the failed record. - -Starting with version 2.5, you can use a `KafkaSendCallback` instead of a `ListenableFutureCallback`, making it easier to extract the failed `ProducerRecord`, avoiding the need to cast the `Throwable`: - -==== -[source, java] ----- -ListenableFuture> future = template.send("topic", 1, "thing"); -future.addCallback(new KafkaSendCallback() { - - @Override - public void onSuccess(SendResult result) { - ... - } - - @Override - public void onFailure(KafkaProducerException ex) { - ProducerRecord failed = ex.getFailedProducerRecord(); - ... - } - -}); ----- -==== - -You can also use a pair of lambdas: - -==== -[source, java] ----- -ListenableFuture> future = template.send("topic", 1, "thing"); -future.addCallback(result -> { - ... - }, (KafkaFailureCallback) ex -> { - ProducerRecord failed = ex.getFailedProducerRecord(); - ... - }); ----- -==== +The `Throwable` can be cast to a `KafkaProducerException`; its `failedProducerRecord` property contains the failed record. If you wish to block the sending thread to await the result, you can invoke the future's `get()` method; using the method with a timeout is recommended. -You may wish to invoke `flush()` before waiting or, for convenience, the template has a constructor with an `autoFlush` parameter that causes the template to `flush()` on each send. +If you have set a `linger.ms`, you may wish to invoke `flush()` before waiting or, for convenience, the template has a constructor with an `autoFlush` parameter that causes the template to `flush()` on each send. Flushing is only needed if you have set the `linger.ms` producer property and want to immediately send a partial batch. ====== Examples @@ -384,19 +339,14 @@ This section shows examples of sending messages to Kafka: public void sendToKafka(final MyOutputData data) { final ProducerRecord record = createRecord(data); - ListenableFuture> future = template.send(record); - future.addCallback(new KafkaSendCallback() { - - @Override - public void onSuccess(SendResult result) { + CompletableFuture> future = template.send(record); + future.whenComplete((result, ex) -> { + if (ex == null) { handleSuccess(data); } - - @Override - public void onFailure(KafkaProducerException ex) { + else { handleFailure(data, record, ex); } - }); } ---- @@ -549,10 +499,12 @@ RequestReplyFuture sendAndReceive(ProducerRecord record, (Also see <>). -The result is a `ListenableFuture` that is asynchronously populated with the result (or an exception, for a timeout). +The result is a `CompletableFuture` that is asynchronously populated with the result (or an exception, for a timeout). The result also has a `sendFuture` property, which is the result of calling `KafkaTemplate.send()`. You can use this future to determine the result of the send operation. +IMPORTANT: In version 3.0, the futures returned by these methods (and their `sendFuture` properties) have been changed to `CompletableFuture` s instead of `ListenableFuture` s. + If the first method is used, or the `replyTimeout` argument is `null`, the template's `defaultReplyTimeout` property is used (5 seconds by default). Starting with version 2.8.8, the template has a new method `waitForAssignment`. @@ -791,6 +743,8 @@ RequestReplyMessageFuture sendAndReceive(Message message); These will use the template's default `replyTimeout`, there are also overloaded versions that can take a timeout in the method call. +IMPORTANT: In version 3.0, the futures returned by these methods (and their `sendFuture` properties) have been changed to `CompletableFuture` s instead of `ListenableFuture` s. + Use the first method if the consumer's `Deserializer` or the template's `MessageConverter` can convert the payload without any additional information, either via configuration or type metadata in the reply message. Use the second method if you need to provide type information for the return type, to assist the message converter. @@ -2236,6 +2190,7 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerCon ==== When you use `@SendTo`, you must configure the `ConcurrentKafkaListenerContainerFactory` with a `KafkaTemplate` in its `replyTemplate` property to perform the send. +Spring Boot will automatically wire in its auto configured template (or any if a single instance is present). NOTE: Unless you use <> only the simple `send(topic, value)` method is used, so you may wish to create a subclass to generate the partition or key. The following example shows how to do so: @@ -2248,7 +2203,7 @@ public KafkaTemplate myReplyingTemplate() { return new KafkaTemplate(producerFactory()) { @Override - public ListenableFuture> send(String topic, String data) { + public CompletableFuture> send(String topic, String data) { return super.send(topic, partitionForData(data), keyForData(data), data); } diff --git a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc index 466fdbde3b..0fe7cbe01b 100644 --- a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc +++ b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc @@ -34,3 +34,15 @@ See <> for more information. Events related to consumer authentication and authorization failures are now published by the container. See <> for more information. + +[[x30-template-changes]] +==== `KafkaTemplate` Changes + +The futures returned by this class are now `CompletableFuture` s instead of `ListenableFuture` s. +See <>. + +[[x30-rkt-changes]] +==== `ReplyingKafkaTemplate` Changes + +The futures returned by this class are now `CompletableFuture` s instead of `ListenableFuture` s. +See <> and <>. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java index 88b75abaa1..d37e67d92b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -36,10 +37,9 @@ import org.springframework.kafka.support.TopicPartitionOffset; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; -import org.springframework.util.concurrent.ListenableFuture; /** - * The basic Kafka operations contract returning {@link ListenableFuture}s. + * The basic Kafka operations contract returning {@link CompletableFuture}s. * * @param the key type. * @param the value type. @@ -68,7 +68,7 @@ public interface KafkaOperations { * @param data The data. * @return a Future for the {@link SendResult}. */ - ListenableFuture> sendDefault(V data); + CompletableFuture> sendDefault(V data); /** * Send the data to the default topic with the provided key and no partition. @@ -76,7 +76,7 @@ public interface KafkaOperations { * @param data The data. * @return a Future for the {@link SendResult}. */ - ListenableFuture> sendDefault(K key, V data); + CompletableFuture> sendDefault(K key, V data); /** * Send the data to the default topic with the provided key and partition. @@ -85,7 +85,7 @@ public interface KafkaOperations { * @param data the data. * @return a Future for the {@link SendResult}. */ - ListenableFuture> sendDefault(Integer partition, K key, V data); + CompletableFuture> sendDefault(Integer partition, K key, V data); /** * Send the data to the default topic with the provided key and partition. @@ -96,7 +96,7 @@ public interface KafkaOperations { * @return a Future for the {@link SendResult}. * @since 1.3 */ - ListenableFuture> sendDefault(Integer partition, Long timestamp, K key, V data); + CompletableFuture> sendDefault(Integer partition, Long timestamp, K key, V data); /** * Send the data to the provided topic with no key or partition. @@ -104,7 +104,7 @@ public interface KafkaOperations { * @param data The data. * @return a Future for the {@link SendResult}. */ - ListenableFuture> send(String topic, V data); + CompletableFuture> send(String topic, V data); /** * Send the data to the provided topic with the provided key and no partition. @@ -113,7 +113,7 @@ public interface KafkaOperations { * @param data The data. * @return a Future for the {@link SendResult}. */ - ListenableFuture> send(String topic, K key, V data); + CompletableFuture> send(String topic, K key, V data); /** * Send the data to the provided topic with the provided key and partition. @@ -123,7 +123,7 @@ public interface KafkaOperations { * @param data the data. * @return a Future for the {@link SendResult}. */ - ListenableFuture> send(String topic, Integer partition, K key, V data); + CompletableFuture> send(String topic, Integer partition, K key, V data); /** * Send the data to the provided topic with the provided key and partition. @@ -135,7 +135,7 @@ public interface KafkaOperations { * @return a Future for the {@link SendResult}. * @since 1.3 */ - ListenableFuture> send(String topic, Integer partition, Long timestamp, K key, V data); + CompletableFuture> send(String topic, Integer partition, Long timestamp, K key, V data); /** * Send the provided {@link ProducerRecord}. @@ -143,7 +143,7 @@ public interface KafkaOperations { * @return a Future for the {@link SendResult}. * @since 1.3 */ - ListenableFuture> send(ProducerRecord record); + CompletableFuture> send(ProducerRecord record); /** * Send a message with routing information in message headers. The message payload @@ -154,7 +154,7 @@ public interface KafkaOperations { * @see org.springframework.kafka.support.KafkaHeaders#PARTITION * @see org.springframework.kafka.support.KafkaHeaders#KEY */ - ListenableFuture> send(Message message); + CompletableFuture> send(Message message); /** * See {@link Producer#partitionsFor(String)}. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations2.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations2.java new file mode 100644 index 0000000000..517c2ac285 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations2.java @@ -0,0 +1,281 @@ +/* + * Copyright 2015-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core; + +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; + +import org.springframework.kafka.core.KafkaOperations.OperationsCallback; +import org.springframework.kafka.core.KafkaOperations.ProducerCallback; +import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.support.TopicPartitionOffset; +import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; + +/** + * The basic Kafka operations contract returning {@link CompletableFuture}s. + * + * @param the key type. + * @param the value type. + * + * @author Gary Russell + * @since 2.9 + * @deprecated no longer needed; use {@code KafkaOperations}. + */ +@Deprecated +public interface KafkaOperations2 { + + /** + * Default timeout for {@link #receive(String, int, long)}. + */ + Duration DEFAULT_POLL_TIMEOUT = Duration.ofSeconds(5); + + /** + * Send the data to the default topic with no key or partition. + * @param data The data. + * @return a Future for the {@link SendResult}. + */ + CompletableFuture> sendDefault(V data); + + /** + * Send the data to the default topic with the provided key and no partition. + * @param key the key. + * @param data The data. + * @return a Future for the {@link SendResult}. + */ + CompletableFuture> sendDefault(K key, V data); + + /** + * Send the data to the default topic with the provided key and partition. + * @param partition the partition. + * @param key the key. + * @param data the data. + * @return a Future for the {@link SendResult}. + */ + CompletableFuture> sendDefault(Integer partition, K key, V data); + + /** + * Send the data to the default topic with the provided key and partition. + * @param partition the partition. + * @param timestamp the timestamp of the record. + * @param key the key. + * @param data the data. + * @return a Future for the {@link SendResult}. + */ + CompletableFuture> sendDefault(Integer partition, Long timestamp, K key, V data); + + /** + * Send the data to the provided topic with no key or partition. + * @param topic the topic. + * @param data The data. + * @return a Future for the {@link SendResult}. + */ + CompletableFuture> send(String topic, V data); + + /** + * Send the data to the provided topic with the provided key and no partition. + * @param topic the topic. + * @param key the key. + * @param data The data. + * @return a Future for the {@link SendResult}. + */ + CompletableFuture> send(String topic, K key, V data); + + /** + * Send the data to the provided topic with the provided key and partition. + * @param topic the topic. + * @param partition the partition. + * @param key the key. + * @param data the data. + * @return a Future for the {@link SendResult}. + */ + CompletableFuture> send(String topic, Integer partition, K key, V data); + + /** + * Send the data to the provided topic with the provided key and partition. + * @param topic the topic. + * @param partition the partition. + * @param timestamp the timestamp of the record. + * @param key the key. + * @param data the data. + * @return a Future for the {@link SendResult}. + */ + CompletableFuture> send(String topic, Integer partition, Long timestamp, K key, V data); + + /** + * Send the provided {@link ProducerRecord}. + * @param record the record. + * @return a Future for the {@link SendResult}. + */ + CompletableFuture> send(ProducerRecord record); + + /** + * Send a message with routing information in message headers. The message payload + * may be converted before sending. + * @param message the message to send. + * @return a Future for the {@link SendResult}. + * @see org.springframework.kafka.support.KafkaHeaders#TOPIC + * @see org.springframework.kafka.support.KafkaHeaders#PARTITION + * @see org.springframework.kafka.support.KafkaHeaders#KEY + */ + CompletableFuture> send(Message message); + + /** + * See {@link Producer#partitionsFor(String)}. + * @param topic the topic. + * @return the partition info. + */ + List partitionsFor(String topic); + + /** + * See {@link Producer#metrics()}. + * @return the metrics. + */ + Map metrics(); + + /** + * Execute some arbitrary operation(s) on the producer and return the result. + * @param callback the callback. + * @param the result type. + * @return the result. + */ + @Nullable + T execute(ProducerCallback callback); + + /** + * Execute some arbitrary operation(s) on the operations and return the result. + * The operations are invoked within a local transaction and do not participate + * in a global transaction (if present). + * @param callback the callback. + * @param the result type. + * @return the result. + */ + @Nullable + T executeInTransaction(OperationsCallback callback); + + /** + * Flush the producer. + */ + void flush(); + + /** + * When running in a transaction, send the consumer offset(s) to the transaction. It + * is not necessary to call this method if the operations are invoked on a listener + * container thread (and the listener container is configured with a + * {@link org.springframework.kafka.transaction.KafkaAwareTransactionManager}) since + * the container will take care of sending the offsets to the transaction. + * Use with 2.5 brokers or later. + * @param offsets The offsets. + * @param groupMetadata the consumer group metadata. + * @see Producer#sendOffsetsToTransaction(Map, ConsumerGroupMetadata) + */ + default void sendOffsetsToTransaction(Map offsets, + ConsumerGroupMetadata groupMetadata) { + + throw new UnsupportedOperationException(); + } + + /** + * Return true if the implementation supports transactions (has a transaction-capable + * producer factory). + * @return true or false. + */ + boolean isTransactional(); + + /** + * Return true if this template, when transactional, allows non-transactional operations. + * @return true to allow. + */ + default boolean isAllowNonTransactional() { + return false; + } + + /** + * Return true if the template is currently running in a transaction on the calling + * thread. + * @return true if a transaction is running. + */ + default boolean inTransaction() { + return false; + } + + /** + * Return the producer factory used by this template. + * @return the factory. + */ + default ProducerFactory getProducerFactory() { + throw new UnsupportedOperationException("This implementation does not support this operation"); + } + + /** + * Receive a single record with the default poll timeout (5 seconds). + * @param topic the topic. + * @param partition the partition. + * @param offset the offset. + * @return the record or null. + * @see #DEFAULT_POLL_TIMEOUT + */ + @Nullable + default ConsumerRecord receive(String topic, int partition, long offset) { + return receive(topic, partition, offset, DEFAULT_POLL_TIMEOUT); + } + + /** + * Receive a single record. + * @param topic the topic. + * @param partition the partition. + * @param offset the offset. + * @param pollTimeout the timeout. + * @return the record or null. + */ + @Nullable + ConsumerRecord receive(String topic, int partition, long offset, Duration pollTimeout); + + /** + * Receive a multiple records with the default poll timeout (5 seconds). Only + * absolute, positive offsets are supported. + * @param requested a collection of record requests (topic/partition/offset). + * @return the records + * @see #DEFAULT_POLL_TIMEOUT + */ + default ConsumerRecords receive(Collection requested) { + return receive(requested, DEFAULT_POLL_TIMEOUT); + } + + /** + * Receive multiple records. Only absolute, positive offsets are supported. + * @param requested a collection of record requests (topic/partition/offset). + * @param pollTimeout the timeout. + * @return the record or null. + */ + ConsumerRecords receive(Collection requested, Duration pollTimeout); + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index a910714368..8b9a634465 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -67,8 +68,6 @@ import org.springframework.messaging.converter.SmartMessageConverter; import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.util.Assert; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.SettableListenableFuture; /** * A template for executing high-level operations. When used with a @@ -89,6 +88,7 @@ * @author Soby Chacko * @author Gurps Bassi */ +@SuppressWarnings("deprecation") public class KafkaTemplate implements KafkaOperations, ApplicationContextAware, BeanNameAware, ApplicationListener, DisposableBean { @@ -390,45 +390,45 @@ public void onApplicationEvent(ContextStoppedEvent event) { } @Override - public ListenableFuture> sendDefault(@Nullable V data) { + public CompletableFuture> sendDefault(@Nullable V data) { return send(this.defaultTopic, data); } @Override - public ListenableFuture> sendDefault(K key, @Nullable V data) { + public CompletableFuture> sendDefault(K key, @Nullable V data) { return send(this.defaultTopic, key, data); } @Override - public ListenableFuture> sendDefault(Integer partition, K key, @Nullable V data) { + public CompletableFuture> sendDefault(Integer partition, K key, @Nullable V data) { return send(this.defaultTopic, partition, key, data); } @Override - public ListenableFuture> sendDefault(Integer partition, Long timestamp, K key, @Nullable V data) { + public CompletableFuture> sendDefault(Integer partition, Long timestamp, K key, @Nullable V data) { return send(this.defaultTopic, partition, timestamp, key, data); } @Override - public ListenableFuture> send(String topic, @Nullable V data) { + public CompletableFuture> send(String topic, @Nullable V data) { ProducerRecord producerRecord = new ProducerRecord<>(topic, data); return doSend(producerRecord); } @Override - public ListenableFuture> send(String topic, K key, @Nullable V data) { + public CompletableFuture> send(String topic, K key, @Nullable V data) { ProducerRecord producerRecord = new ProducerRecord<>(topic, key, data); return doSend(producerRecord); } @Override - public ListenableFuture> send(String topic, Integer partition, K key, @Nullable V data) { + public CompletableFuture> send(String topic, Integer partition, K key, @Nullable V data) { ProducerRecord producerRecord = new ProducerRecord<>(topic, partition, key, data); return doSend(producerRecord); } @Override - public ListenableFuture> send(String topic, Integer partition, Long timestamp, K key, + public CompletableFuture> send(String topic, Integer partition, Long timestamp, K key, @Nullable V data) { ProducerRecord producerRecord = new ProducerRecord<>(topic, partition, timestamp, key, data); @@ -436,14 +436,14 @@ public ListenableFuture> send(String topic, Integer partition, } @Override - public ListenableFuture> send(ProducerRecord record) { + public CompletableFuture> send(ProducerRecord record) { Assert.notNull(record, "'record' cannot be null"); return doSend(record); } @SuppressWarnings("unchecked") @Override - public ListenableFuture> send(Message message) { + public CompletableFuture> send(Message message) { ProducerRecord producerRecord = this.messageConverter.fromMessage(message, this.defaultTopic); if (!producerRecord.headers().iterator().hasNext()) { // possibly no Jackson byte[] correlationId = message.getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class); @@ -627,10 +627,10 @@ protected void closeProducer(Producer producer, boolean inTx) { * @return a Future for the {@link org.apache.kafka.clients.producer.RecordMetadata * RecordMetadata}. */ - protected ListenableFuture> doSend(final ProducerRecord producerRecord) { + protected CompletableFuture> doSend(final ProducerRecord producerRecord) { final Producer producer = getTheProducer(producerRecord.topic()); this.logger.trace(() -> "Sending: " + KafkaUtils.format(producerRecord)); - final SettableListenableFuture> future = new SettableListenableFuture<>(); + final CompletableFuture> future = new CompletableFuture<>(); Object sample = null; if (this.micrometerEnabled && this.micrometerHolder == null) { this.micrometerHolder = obtainMicrometerHolder(); @@ -664,7 +664,7 @@ protected ListenableFuture> doSend(final ProducerRecord p } private Callback buildCallback(final ProducerRecord producerRecord, final Producer producer, - final SettableListenableFuture> future, @Nullable Object sample) { + final CompletableFuture> future, @Nullable Object sample) { return (metadata, exception) -> { try { @@ -680,7 +680,7 @@ private Callback buildCallback(final ProducerRecord producerRecord, final if (sample != null) { this.micrometerHolder.success(sample); } - future.set(new SendResult<>(producerRecord, metadata)); + future.complete(new SendResult<>(producerRecord, metadata)); if (KafkaTemplate.this.producerListener != null) { KafkaTemplate.this.producerListener.onSuccess(producerRecord, metadata); } @@ -691,7 +691,8 @@ private Callback buildCallback(final ProducerRecord producerRecord, final if (sample != null) { this.micrometerHolder.failure(sample, exception.getClass().getSimpleName()); } - future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception)); + future.completeExceptionally( + new KafkaProducerException(producerRecord, "Failed to send", exception)); if (KafkaTemplate.this.producerListener != null) { KafkaTemplate.this.producerListener.onError(producerRecord, metadata, exception); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java index f56935bc5b..f3a399f847 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -54,7 +55,6 @@ import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; -import org.springframework.util.concurrent.ListenableFuture; /** * A {@link ConsumerRecordRecoverer} that publishes a failed record to a dead-letter @@ -625,14 +625,17 @@ protected ProducerRecord createProducerRecord(ConsumerRecord outRecord, KafkaOperations kafkaTemplate, ConsumerRecord inRecord) { - ListenableFuture> sendResult = null; + CompletableFuture> sendResult = null; try { sendResult = kafkaTemplate.send(outRecord); - sendResult.addCallback(result -> { - this.logger.debug(() -> "Successful dead-letter publication: " - + KafkaUtils.format(inRecord) + " to " + result.getRecordMetadata()); - }, ex -> { - this.logger.error(ex, () -> pubFailMessage(outRecord, inRecord)); + sendResult.whenComplete((result, ex) -> { + if (ex == null) { + this.logger.debug(() -> "Successful dead-letter publication: " + + KafkaUtils.format(inRecord) + " to " + result.getRecordMetadata()); + } + else { + this.logger.error(ex, () -> pubFailMessage(outRecord, inRecord)); + } }); } catch (Exception e) { @@ -652,7 +655,7 @@ protected void publish(ProducerRecord outRecord, KafkaOperations */ protected void verifySendResult(KafkaOperations kafkaTemplate, ProducerRecord outRecord, - @Nullable ListenableFuture> sendResult, ConsumerRecord inRecord) { + @Nullable CompletableFuture> sendResult, ConsumerRecord inRecord) { Duration sendTimeout = determineSendTimeout(kafkaTemplate); if (sendResult == null) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java index ac5f6850c4..2e937bcd49 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java @@ -170,7 +170,7 @@ protected synchronized boolean handleTimeout(CorrelationKey correlationId, .map(RecordHolder::getRecord) .collect(Collectors.toList()); if (this.releaseStrategy.test(list, true)) { - future.set(new ConsumerRecord<>(PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC, 0, 0L, null, list)); + future.complete(new ConsumerRecord<>(PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC, 0, 0L, null, list)); return true; } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java index f305f98a52..8a45fd45cc 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java @@ -367,17 +367,21 @@ public

RequestReplyTypedMessageFuture sendAndReceive(Message mes .fromMessage(message, getDefaultTopic()), replyTimeout); RequestReplyTypedMessageFuture replyFuture = new RequestReplyTypedMessageFuture<>(future.getSendFuture()); - future.addCallback( - result -> { + future.whenComplete((result, ex) -> { + if (ex == null) { try { - replyFuture.set(getMessageConverter() + replyFuture.complete(getMessageConverter() .toMessage(result, null, null, returnType == null ? null : returnType.getType())); } - catch (Exception ex) { // NOSONAR - replyFuture.setException(ex); + catch (Exception ex2) { // NOSONAR + replyFuture.completeExceptionally(ex2); } - }, - ex -> replyFuture.setException(ex)); + } + else { + replyFuture.completeExceptionally(ex); + } + }); + return replyFuture; } @@ -425,7 +429,7 @@ private void scheduleTimeout(ProducerRecord record, CorrelationKey correla this.logger.warn(() -> "Reply timed out for: " + KafkaUtils.format(record) + WITH_CORRELATION_ID + correlationId); if (!handleTimeout(correlationId, removed)) { - removed.setException(new KafkaReplyTimeoutException("Reply timed out")); + removed.completeExceptionally(new KafkaReplyTimeoutException("Reply timed out")); } } }, Instant.now().plus(replyTimeout)); @@ -497,12 +501,12 @@ public void onMessage(List> data) { Exception exception = checkForErrors(record); if (exception != null) { ok = false; - future.setException(exception); + future.completeExceptionally(exception); } if (ok) { this.logger.debug(() -> "Received: " + KafkaUtils.format(record) + WITH_CORRELATION_ID + correlationKey); - future.set(record); + future.complete(record); } } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/RequestReplyFuture.java b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/RequestReplyFuture.java index 9505f398a6..cc97d2afe7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/RequestReplyFuture.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/RequestReplyFuture.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,14 +16,14 @@ package org.springframework.kafka.requestreply; +import java.util.concurrent.CompletableFuture; + import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.support.SendResult; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.SettableListenableFuture; /** - * A listenable future for requests/replies. + * A {@link CompletableFuture} for requests/replies. * * @param the key type. * @param the outbound data type. @@ -33,15 +33,21 @@ * @since 2.1.3 * */ -public class RequestReplyFuture extends SettableListenableFuture> { +public class RequestReplyFuture extends CompletableFuture> { + + private volatile CompletableFuture> sendFuture; - private volatile ListenableFuture> sendFuture; + private CompletableFuture> completableSendFuture; - protected void setSendFuture(ListenableFuture> sendFuture) { + protected void setSendFuture(CompletableFuture> sendFuture) { this.sendFuture = sendFuture; } - public ListenableFuture> getSendFuture() { + /** + * Return the send future. + * @return the send future. + */ + public CompletableFuture> getSendFuture() { return this.sendFuture; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/RequestReplyMessageFuture.java b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/RequestReplyMessageFuture.java index 936e0af53f..0580d9c518 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/RequestReplyMessageFuture.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/RequestReplyMessageFuture.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,10 +16,10 @@ package org.springframework.kafka.requestreply; +import java.util.concurrent.CompletableFuture; + import org.springframework.kafka.support.SendResult; import org.springframework.messaging.Message; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.SettableListenableFuture; /** * A listenable future for {@link Message} replies. @@ -31,11 +31,11 @@ * @since 2.7 * */ -public class RequestReplyMessageFuture extends SettableListenableFuture> { +public class RequestReplyMessageFuture extends CompletableFuture> { - private final ListenableFuture> sendFuture; + private final CompletableFuture> sendFuture; // NOSONAR - RequestReplyMessageFuture(ListenableFuture> sendFuture) { + RequestReplyMessageFuture(CompletableFuture> sendFuture) { this.sendFuture = sendFuture; } @@ -43,7 +43,7 @@ public class RequestReplyMessageFuture extends SettableListenableFuture> getSendFuture() { + public CompletableFuture> getSendFuture() { return this.sendFuture; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/RequestReplyTypedMessageFuture.java b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/RequestReplyTypedMessageFuture.java index 4cccb40618..52e2fbb61a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/RequestReplyTypedMessageFuture.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/RequestReplyTypedMessageFuture.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,13 +16,13 @@ package org.springframework.kafka.requestreply; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.springframework.kafka.support.SendResult; import org.springframework.messaging.Message; -import org.springframework.util.concurrent.ListenableFuture; /** * A listenable future for {@link Message} replies with a specific payload type. @@ -37,7 +37,7 @@ */ public class RequestReplyTypedMessageFuture extends RequestReplyMessageFuture { - RequestReplyTypedMessageFuture(ListenableFuture> sendFuture) { + RequestReplyTypedMessageFuture(CompletableFuture> sendFuture) { super(sendFuture); } @@ -55,5 +55,4 @@ public Message

get(long timeout, TimeUnit unit) return (Message

) super.get(timeout, unit); } - } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/SendResult.java b/spring-kafka/src/main/java/org/springframework/kafka/support/SendResult.java index c85f5c12d8..007c986fdb 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/SendResult.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/SendResult.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,7 @@ import org.apache.kafka.clients.producer.RecordMetadata; /** - * Result for a ListenableFuture after a send. + * Result for a {@link java.util.concurrent.CompletableFuture} after a send. * * @param the key type. * @param the value type. diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index 9da4731c29..6c3ca10fe1 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -39,6 +39,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -155,7 +156,6 @@ import org.springframework.transaction.annotation.EnableTransactionManagement; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.MimeType; -import org.springframework.util.concurrent.ListenableFuture; import org.springframework.validation.Errors; import org.springframework.validation.Validator; @@ -1442,7 +1442,7 @@ public KafkaTemplate partitionZeroReplyTemplate() { return new KafkaTemplate(producerFactory(), true) { @Override - public ListenableFuture> send(String topic, String data) { + public CompletableFuture> send(String topic, String data) { return super.send(topic, 0, null, data); } @@ -1455,7 +1455,7 @@ public KafkaTemplate partitionZeroReplyJsonTemplate() { return new KafkaTemplate(jsonProducerFactory(), true) { @Override - public ListenableFuture> send(String topic, Object data) { + public CompletableFuture> send(String topic, Object data) { return super.send(topic, 0, null, data); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java index 7f0ff8dda6..ab51365ca2 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -58,7 +59,6 @@ import org.springframework.kafka.transaction.KafkaTransactionManager; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; -import org.springframework.util.concurrent.ListenableFuture; /** * @author Gary Russell @@ -337,7 +337,7 @@ public void testNestedTxProducerIsCached() throws Exception { KafkaTemplate template = new KafkaTemplate<>(pf); DefaultKafkaProducerFactory pfTx = new DefaultKafkaProducerFactory<>(producerProps); pfTx.setTransactionIdPrefix("fooTx."); - KafkaTemplate templateTx = new KafkaTemplate<>(pfTx); + KafkaOperations templateTx = new KafkaTemplate<>(pfTx); Map consumerProps = KafkaTestUtils.consumerProps("txCache1Group", "false", this.embeddedKafka); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); AtomicReference> wrapped = new AtomicReference<>(); @@ -362,7 +362,7 @@ public void testNestedTxProducerIsCached() throws Exception { containerProps); container.start(); try { - ListenableFuture> future = template.send("txCache1", "foo"); + CompletableFuture> future = template.send("txCache1", "foo"); future.get(10, TimeUnit.SECONDS); pf.getCache(); assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", Map.class)).hasSize(0); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java index 4c5bbb0370..30d3db4886 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java @@ -43,6 +43,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -95,8 +96,6 @@ import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.util.concurrent.SettableListenableFuture; /** @@ -401,22 +400,15 @@ void testWithCallback() throws Exception { DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf, true); template.setDefaultTopic(INT_KEY_TOPIC); - ListenableFuture> future = template.sendDefault("foo"); + CompletableFuture> future = template.sendDefault("foo"); template.flush(); final CountDownLatch latch = new CountDownLatch(1); final AtomicReference> theResult = new AtomicReference<>(); - future.addCallback(new ListenableFutureCallback<>() { - - @Override - public void onSuccess(SendResult result) { + future.whenComplete((result, ex) -> { + if (ex == null) { theResult.set(result); latch.countDown(); } - - @Override - public void onFailure(Throwable ex) { - } - }); assertThat(KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC)).has(value("foo")); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); @@ -435,23 +427,16 @@ void testWithCallbackFailure() throws Exception { ProducerFactory pf = mock(ProducerFactory.class); given(pf.createProducer()).willReturn(producer); KafkaTemplate template = new KafkaTemplate<>(pf); - ListenableFuture> future = template.send("foo", 1, "bar"); + CompletableFuture> future = template.send("foo", 1, "bar"); final CountDownLatch latch = new CountDownLatch(1); final AtomicReference> theResult = new AtomicReference<>(); AtomicReference value = new AtomicReference<>(); - future.addCallback(new KafkaSendCallback<>() { - - @Override - public void onSuccess(SendResult result) { - } - - @Override - public void onFailure(KafkaProducerException ex) { - ProducerRecord failed = ex.getFailedProducerRecord(); + future.whenComplete((result, ex) -> { + if (ex != null) { + ProducerRecord failed = ((KafkaProducerException) ex).getFailedProducerRecord(); value.set(failed.value()); latch.countDown(); } - }); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(value.get()).isEqualTo("bar"); @@ -469,14 +454,16 @@ void testWithCallbackFailureFunctional() throws Exception { ProducerFactory pf = mock(ProducerFactory.class); given(pf.createProducer()).willReturn(producer); KafkaTemplate template = new KafkaTemplate<>(pf); - ListenableFuture> future = template.send("foo", 1, "bar"); + CompletableFuture> future = template.send("foo", 1, "bar"); final CountDownLatch latch = new CountDownLatch(1); final AtomicReference> theResult = new AtomicReference<>(); AtomicReference value = new AtomicReference<>(); - future.addCallback(result -> { }, (KafkaFailureCallback) ex -> { - ProducerRecord failed = ex.getFailedProducerRecord(); - value.set(failed.value()); - latch.countDown(); + future.whenComplete((record, ex) -> { + if (ex != null) { + ProducerRecord failed = ((KafkaProducerException) ex).getFailedProducerRecord(); + value.set(failed.value()); + latch.countDown(); + } }); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(value.get()).isEqualTo("bar"); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java index 8e6d96ec23..67cbcdf9b0 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java @@ -44,6 +44,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -71,8 +72,6 @@ import org.springframework.kafka.support.serializer.DeserializationException; import org.springframework.kafka.support.serializer.SerializationUtils; import org.springframework.kafka.test.utils.KafkaTestUtils; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.SettableListenableFuture; /** * @author Gary Russell @@ -80,6 +79,7 @@ * @since 2.4.3 * */ +@SuppressWarnings("deprecation") public class DeadLetterPublishingRecovererTests { @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -89,8 +89,8 @@ void testTxNoTx() { given(template.isTransactional()).willReturn(true); given(template.inTransaction()).willReturn(false); given(template.isAllowNonTransactional()).willReturn(true); - SettableListenableFuture future = new SettableListenableFuture(); - future.set(new Object()); + CompletableFuture future = new CompletableFuture(); + future.complete(new Object()); given(template.send(any(ProducerRecord.class))).willReturn(future); DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz"); @@ -116,8 +116,8 @@ void testTxExisting() { KafkaOperations template = mock(KafkaOperations.class); given(template.isTransactional()).willReturn(true); given(template.inTransaction()).willReturn(true); - SettableListenableFuture future = new SettableListenableFuture(); - future.set(new Object()); + CompletableFuture future = new CompletableFuture(); + future.complete(new Object()); given(template.send(any(ProducerRecord.class))).willReturn(future); DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz"); @@ -131,8 +131,8 @@ void testTxExisting() { void testNonTx() { KafkaOperations template = mock(KafkaOperations.class); given(template.isTransactional()).willReturn(false); - SettableListenableFuture future = new SettableListenableFuture(); - future.set(new Object()); + CompletableFuture future = new CompletableFuture(); + future.complete(new Object()); given(template.send(any(ProducerRecord.class))).willReturn(future); DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz"); @@ -153,8 +153,8 @@ void testTxNewTx() { ((OperationsCallback) inv.getArgument(0)).doInOperations(template); return null; }).given(template).executeInTransaction(any()); - SettableListenableFuture future = new SettableListenableFuture(); - future.set(new Object()); + CompletableFuture future = new CompletableFuture(); + future.complete(new Object()); given(template.send(any(ProducerRecord.class))).willReturn(future); DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz"); @@ -174,8 +174,8 @@ void valueHeaderStripped() { Headers custom = new RecordHeaders(); custom.add(new RecordHeader("foo", "bar".getBytes())); recoverer.setHeadersFunction((rec, ex) -> custom); - SettableListenableFuture future = new SettableListenableFuture(); - future.set(new Object()); + CompletableFuture future = new CompletableFuture(); + future.complete(new Object()); willReturn(future).given(template).send(any(ProducerRecord.class)); ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, "bar", "baz", headers, Optional.empty()); @@ -200,8 +200,8 @@ void keyHeaderStripped() { DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); Headers headers = new RecordHeaders(); headers.add(new RecordHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, header(true))); - SettableListenableFuture future = new SettableListenableFuture(); - future.set(new Object()); + CompletableFuture future = new CompletableFuture(); + future.complete(new Object()); willReturn(future).given(template).send(any(ProducerRecord.class)); ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, "bar", "baz", headers, Optional.empty()); @@ -221,8 +221,8 @@ void keyDeserOnly() { DeserializationException deserEx = createDeserEx(true); headers.add( new RecordHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, header(true, deserEx))); - SettableListenableFuture future = new SettableListenableFuture(); - future.set(new Object()); + CompletableFuture future = new CompletableFuture(); + future.complete(new Object()); willReturn(future).given(template).send(any(ProducerRecord.class)); ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, "bar", "baz", headers, Optional.empty()); @@ -244,8 +244,8 @@ void headersNotStripped() { Headers headers = new RecordHeaders(); headers.add(new RecordHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, header(false))); headers.add(new RecordHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, header(true))); - SettableListenableFuture future = new SettableListenableFuture(); - future.set(new Object()); + CompletableFuture future = new CompletableFuture(); + future.complete(new Object()); willReturn(future).given(template).send(any(ProducerRecord.class)); ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, "bar", "baz", headers, Optional.empty()); @@ -263,8 +263,8 @@ void headersNotStripped() { @Test void tombstoneWithMultiTemplates() { KafkaOperations template1 = mock(KafkaOperations.class); - SettableListenableFuture future = new SettableListenableFuture(); - future.set(new Object()); + CompletableFuture future = new CompletableFuture(); + future.complete(new Object()); given(template1.send(any(ProducerRecord.class))).willReturn(future); KafkaOperations template2 = mock(KafkaOperations.class); Map, KafkaOperations> templates = new LinkedHashMap<>(); @@ -281,8 +281,8 @@ void tombstoneWithMultiTemplates() { void tombstoneWithMultiTemplatesExplicit() { KafkaOperations template1 = mock(KafkaOperations.class); KafkaOperations template2 = mock(KafkaOperations.class); - SettableListenableFuture future = new SettableListenableFuture(); - future.set(new Object()); + CompletableFuture future = new CompletableFuture(); + future.complete(new Object()); given(template2.send(any(ProducerRecord.class))).willReturn(future); Map, KafkaOperations> templates = new LinkedHashMap<>(); templates.put(String.class, template1); @@ -318,7 +318,7 @@ private byte[] header(boolean isKey, DeserializationException deserEx) { @Test void allOriginalHeaders() { KafkaOperations template = mock(KafkaOperations.class); - ListenableFuture future = mock(ListenableFuture.class); + CompletableFuture future = mock(CompletableFuture.class); given(template.send(any(ProducerRecord.class))).willReturn(future); ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, "bar", null); DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); @@ -338,7 +338,7 @@ void allOriginalHeaders() { @Test void dontAppendOriginalHeaders() { KafkaOperations template = mock(KafkaOperations.class); - ListenableFuture future = mock(ListenableFuture.class); + CompletableFuture future = mock(CompletableFuture.class); given(template.send(any(ProducerRecord.class))).willReturn(future); ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, 1234L, TimestampType.CREATE_TIME, 123, 123, "bar", null, new RecordHeaders(), Optional.empty()); @@ -389,7 +389,7 @@ void dontAppendOriginalHeaders() { @Test void appendOriginalHeaders() { KafkaOperations template = mock(KafkaOperations.class); - ListenableFuture future = mock(ListenableFuture.class); + CompletableFuture future = mock(CompletableFuture.class); given(template.send(any(ProducerRecord.class))).willReturn(future); ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, 1234L, TimestampType.CREATE_TIME, 123, 123, "bar", null, new RecordHeaders(), Optional.empty()); @@ -450,7 +450,7 @@ void failIfSendResultIsError() throws Exception { props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 10L); given(pf.getConfigurationProperties()).willReturn(props); given(template.getProducerFactory()).willReturn(pf); - ListenableFuture future = mock(ListenableFuture.class); + CompletableFuture future = mock(CompletableFuture.class); ArgumentCaptor timeoutCaptor = ArgumentCaptor.forClass(Long.class); given(template.send(any(ProducerRecord.class))).willReturn(future); given(future.get(timeoutCaptor.capture(), eq(TimeUnit.MILLISECONDS))).willThrow(new TimeoutException()); @@ -473,11 +473,11 @@ void sendTimeoutDefault() throws Exception { Map props = new HashMap<>(); given(pf.getConfigurationProperties()).willReturn(props); given(template.getProducerFactory()).willReturn(pf); - SettableListenableFuture future = spy(new SettableListenableFuture<>()); + CompletableFuture future = spy(new CompletableFuture<>()); ArgumentCaptor timeoutCaptor = ArgumentCaptor.forClass(Long.class); given(template.send(any(ProducerRecord.class))).willReturn(future); willAnswer(inv -> { - future.set(new SendResult(null, null)); + future.complete(new SendResult(null, null)); return null; }).given(future).get(timeoutCaptor.capture(), eq(TimeUnit.MILLISECONDS)); ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, "bar", null); @@ -498,11 +498,11 @@ void sendTimeoutConfig() throws Exception { props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 30_000L); given(pf.getConfigurationProperties()).willReturn(props); given(template.getProducerFactory()).willReturn(pf); - SettableListenableFuture future = spy(new SettableListenableFuture<>()); + CompletableFuture future = spy(new CompletableFuture<>()); ArgumentCaptor timeoutCaptor = ArgumentCaptor.forClass(Long.class); given(template.send(any(ProducerRecord.class))).willReturn(future); willAnswer(inv -> { - future.set(new SendResult(null, null)); + future.complete(new SendResult(null, null)); return null; }).given(future).get(timeoutCaptor.capture(), eq(TimeUnit.MILLISECONDS)); ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, "bar", null); @@ -518,7 +518,7 @@ void sendTimeoutConfig() throws Exception { @Test void notFailIfSendResultIsError() throws Exception { KafkaOperations template = mock(KafkaOperations.class); - ListenableFuture future = mock(ListenableFuture.class); + CompletableFuture future = mock(CompletableFuture.class); given(template.send(any(ProducerRecord.class))).willReturn(future); given(future.get(anyLong(), eq(TimeUnit.MILLISECONDS))).willThrow(new TimeoutException()); ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, "bar", null); @@ -531,7 +531,7 @@ void notFailIfSendResultIsError() throws Exception { @Test void throwIfNoDestinationReturned() { KafkaOperations template = mock(KafkaOperations.class); - ListenableFuture future = mock(ListenableFuture.class); + CompletableFuture future = mock(CompletableFuture.class); given(template.send(any(ProducerRecord.class))).willReturn(future); ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, "bar", null); DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template, (cr, e) -> null); @@ -544,7 +544,7 @@ void throwIfNoDestinationReturned() { @Test void notThrowIfNoDestinationReturnedByDefault() { KafkaOperations template = mock(KafkaOperations.class); - ListenableFuture future = mock(ListenableFuture.class); + CompletableFuture future = mock(CompletableFuture.class); given(template.send(any(ProducerRecord.class))).willReturn(future); ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, "bar", null); DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template, (cr, e) -> null); @@ -555,7 +555,7 @@ void notThrowIfNoDestinationReturnedByDefault() { @Test void noCircularRoutingIfFatal() { KafkaOperations template = mock(KafkaOperations.class); - ListenableFuture future = mock(ListenableFuture.class); + CompletableFuture future = mock(CompletableFuture.class); given(template.send(any(ProducerRecord.class))).willReturn(future); ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, "bar", null); DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template, @@ -575,7 +575,7 @@ void noCircularRoutingIfFatal() { @Test void doNotSkipCircularFatalIfSet() { KafkaOperations template = mock(KafkaOperations.class); - ListenableFuture future = mock(ListenableFuture.class); + CompletableFuture future = mock(CompletableFuture.class); given(template.send(any(ProducerRecord.class))).willReturn(future); ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, "bar", null); DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template, @@ -596,7 +596,7 @@ void doNotSkipCircularFatalIfSet() { @Test void headerBitsTurnedOffOneByOne() { KafkaOperations template = mock(KafkaOperations.class); - ListenableFuture future = mock(ListenableFuture.class); + CompletableFuture future = mock(CompletableFuture.class); given(template.send(any(ProducerRecord.class))).willReturn(future); ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, "bar", null); DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); @@ -793,7 +793,7 @@ void headerBitsTurnedOffOneByOne() { @Test void headerCreator() { KafkaOperations template = mock(KafkaOperations.class); - ListenableFuture future = mock(ListenableFuture.class); + CompletableFuture future = mock(CompletableFuture.class); given(template.send(any(ProducerRecord.class))).willReturn(future); ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, "bar", null); DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); @@ -819,7 +819,7 @@ void headerCreator() { @Test void addHeaderFunctionsProcessedInOrder() { KafkaOperations template = mock(KafkaOperations.class); - ListenableFuture future = mock(ListenableFuture.class); + CompletableFuture future = mock(CompletableFuture.class); given(template.send(any(ProducerRecord.class))).willReturn(future); ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, "bar", null); DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); @@ -856,7 +856,7 @@ void addHeaderFunctionsProcessedInOrder() { @Test void immutableHeaders() { KafkaOperations template = mock(KafkaOperations.class); - ListenableFuture future = mock(ListenableFuture.class); + CompletableFuture future = mock(CompletableFuture.class); given(template.send(any(ProducerRecord.class))).willReturn(future); ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, "bar", null); DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerBatchIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerBatchIntegrationTests.java index 950874508d..398f90f6fa 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerBatchIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerBatchIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 the original author or authors. + * Copyright 2020-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,9 +21,11 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -37,6 +39,7 @@ import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.event.ConsumerStoppedEvent; +import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.condition.EmbeddedKafkaCondition; import org.springframework.kafka.test.context.EmbeddedKafka; @@ -73,7 +76,7 @@ public static void setup() { } @Test - public void recoveryAndDlt() throws InterruptedException { + public void recoveryAndDlt() throws Exception { Map props = KafkaTestUtils.consumerProps("recoverBatch", "false", embeddedKafka); props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1000); props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); @@ -117,7 +120,11 @@ public void recoveryAndDlt() throws InterruptedException { template.send(topic1, 0, 0, "baz"); template.send(topic1, 0, 0, "qux"); template.send(topic1, 0, 0, "fiz"); - template.send(topic1, 0, 0, "buz"); + AtomicReference> sendResult = new AtomicReference<>(); + CompletableFuture> future = template.send(topic1, 0, 0, "buz") + .whenComplete((sr, thrown) -> sendResult.set(sr)); + future.get(10, TimeUnit.SECONDS); + assertThat(sendResult.get()).isNotNull(); assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); assertThat(data).hasSize(13); assertThat(data) @@ -140,7 +147,7 @@ public void recoveryAndDlt() throws InterruptedException { } @Test - public void recoveryFails() throws InterruptedException { + public void recoveryFails() throws Exception { Map props = KafkaTestUtils.consumerProps("recoverBatch2", "false", embeddedKafka); props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1000); props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); @@ -196,7 +203,8 @@ public void accept(ConsumerRecord record, Consumer consumer, Excepti template.send(topic2, 0, 0, "baz"); template.send(topic2, 0, 0, "qux"); template.send(topic2, 0, 0, "fiz"); - template.send(topic2, 0, 0, "buz"); + CompletableFuture> future = template.send(topic2, 0, 0, "buz"); + future.get(10, TimeUnit.SECONDS); assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); assertThat(data).hasSize(17); assertThat(data) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java index 01dc8022ca..0aaec40f7e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java @@ -34,6 +34,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -219,9 +220,10 @@ public void testGoodWithMessage() throws Exception { try { template.setMessageConverter(new StringJsonMessageConverter()); template.setDefaultReplyTimeout(Duration.ofSeconds(30)); - RequestReplyMessageFuture fut = template.sendAndReceive(MessageBuilder.withPayload("foo") - .setHeader(KafkaHeaders.TOPIC, A_REQUEST) - .build()); + RequestReplyMessageFuture fut = template + .sendAndReceive(MessageBuilder.withPayload("foo") + .setHeader(KafkaHeaders.TOPIC, A_REQUEST) + .build()); fut.getSendFuture().get(10, TimeUnit.SECONDS); // send ok Message reply = fut.get(30, TimeUnit.SECONDS); assertThat(reply.getPayload()).isEqualTo("FOO"); @@ -711,7 +713,7 @@ void nullDuration() throws Exception { @SuppressWarnings({ "rawtypes", "unchecked" }) @Test - void requestTimeoutWithMessage() { + void requestTimeoutWithMessage() throws Exception { ProducerFactory pf = mock(ProducerFactory.class); Producer producer = mock(Producer.class); willAnswer(invocation -> { @@ -727,7 +729,7 @@ void requestTimeoutWithMessage() { .setHeader(KafkaHeaders.TOPIC, "foo") .build(); long t1 = System.currentTimeMillis(); - RequestReplyTypedMessageFuture future = template.sendAndReceive(msg, Duration.ofMillis(10), + CompletableFuture future = template.sendAndReceive(msg, Duration.ofMillis(10), new ParameterizedTypeReference() { }); try { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactoryTests.java index d88d4e9748..0e20a76ce0 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactoryTests.java @@ -37,6 +37,7 @@ import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -62,14 +63,13 @@ import org.springframework.kafka.listener.TimestampedException; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.test.condition.LogLevels; -import org.springframework.util.concurrent.ListenableFuture; /** * @author Tomaz Fernandes * @since 2.7 */ @ExtendWith(MockitoExtension.class) -@SuppressWarnings({"unchecked", "rawtypes"}) +@SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) class DeadLetterPublishingRecovererFactoryTests { private final Clock clock = TestClockUtils.CLOCK; @@ -94,7 +94,7 @@ class DeadLetterPublishingRecovererFactoryTests { private KafkaOperations kafkaOperations; @Mock - private ListenableFuture listenableFuture; + private CompletableFuture completableFuture; @Captor private ArgumentCaptor producerRecordCaptor; @@ -119,7 +119,7 @@ void shouldSendMessage() { given(destinationTopicResolver.getDestinationTopicByName(testRetryTopic)).willReturn(destinationTopic); given(destinationTopic.getDestinationDelay()).willReturn(1000L); willReturn(this.kafkaOperations).given(destinationTopic).getKafkaOperations(); - given(kafkaOperations.send(any(ProducerRecord.class))).willReturn(listenableFuture); + given(kafkaOperations.send(any(ProducerRecord.class))).willReturn(completableFuture); this.consumerRecord.headers().add(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP, originalTimestampBytes); DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(this.destinationTopicResolver); @@ -160,7 +160,7 @@ void shouldIncreaseAttemptsInLegacyHeader() { given(destinationTopic.getDestinationName()).willReturn(testRetryTopic); given(destinationTopicResolver.getDestinationTopicByName(testRetryTopic)).willReturn(destinationTopic); willReturn(kafkaOperations).given(destinationTopic).getKafkaOperations(); - given(kafkaOperations.send(any(ProducerRecord.class))).willReturn(listenableFuture); + given(kafkaOperations.send(any(ProducerRecord.class))).willReturn(completableFuture); DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(this.destinationTopicResolver); @@ -193,7 +193,7 @@ void shouldIncreaseAttemptsInNewHeader() { given(destinationTopic.getDestinationName()).willReturn(testRetryTopic); given(destinationTopicResolver.getDestinationTopicByName(testRetryTopic)).willReturn(destinationTopic); willReturn(kafkaOperations).given(destinationTopic).getKafkaOperations(); - given(kafkaOperations.send(any(ProducerRecord.class))).willReturn(listenableFuture); + given(kafkaOperations.send(any(ProducerRecord.class))).willReturn(completableFuture); DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(this.destinationTopicResolver); @@ -224,7 +224,7 @@ void shouldAddOriginalTimestampHeaderAndCustom() { given(destinationTopic.getDestinationName()).willReturn(testRetryTopic); given(destinationTopicResolver.getDestinationTopicByName(testRetryTopic)).willReturn(destinationTopic); willReturn(this.kafkaOperations).given(destinationTopic).getKafkaOperations(); - given(kafkaOperations.send(any(ProducerRecord.class))).willReturn(listenableFuture); + given(kafkaOperations.send(any(ProducerRecord.class))).willReturn(completableFuture); DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory( this.destinationTopicResolver); @@ -259,7 +259,7 @@ void shouldNotReplaceOriginalTimestampHeader() { given(destinationTopic.getDestinationName()).willReturn(testRetryTopic); given(destinationTopicResolver.getDestinationTopicByName(testRetryTopic)).willReturn(destinationTopic); willReturn(this.kafkaOperations).given(destinationTopic).getKafkaOperations(); - given(kafkaOperations.send(any(ProducerRecord.class))).willReturn(listenableFuture); + given(kafkaOperations.send(any(ProducerRecord.class))).willReturn(completableFuture); DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(this.destinationTopicResolver);