Skip to content

Commit 97a508e

Browse files
garyrussellartembilan
authored andcommitted
GH-1480: Switch to CompletableFuture in s-r-stream
Resolves #1480 Also reinstate deprecated `AsyncAmqpTemplate2`.
1 parent b03a0bc commit 97a508e

File tree

9 files changed

+138
-20
lines changed

9 files changed

+138
-20
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.core;
18+
19+
import java.util.concurrent.CompletableFuture;
20+
21+
/**
22+
* This interface was added in 2.4.7 to aid migration from methods returning
23+
* {@code ListenableFuture}s to {@link CompletableFuture}s.
24+
*
25+
* @author Gary Russell
26+
* @since 2.4.7
27+
* @deprecated in favor of {@link AsyncAmqpTemplate}.
28+
*/
29+
@Deprecated
30+
public interface AsyncAmqpTemplate2 extends AsyncAmqpTemplate {
31+
32+
}

spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamOperations.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@
1616

1717
package org.springframework.rabbit.stream.producer;
1818

19+
import java.util.concurrent.CompletableFuture;
20+
1921
import org.springframework.amqp.AmqpException;
2022
import org.springframework.amqp.core.Message;
2123
import org.springframework.amqp.core.MessagePostProcessor;
2224
import org.springframework.amqp.support.converter.MessageConverter;
2325
import org.springframework.lang.Nullable;
2426
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
25-
import org.springframework.util.concurrent.ListenableFuture;
2627

2728
import com.rabbitmq.stream.MessageBuilder;
2829

@@ -40,14 +41,14 @@ public interface RabbitStreamOperations extends AutoCloseable {
4041
* @param message the message.
4142
* @return a future to indicate success/failure.
4243
*/
43-
ListenableFuture<Boolean> send(Message message);
44+
CompletableFuture<Boolean> send(Message message);
4445

4546
/**
4647
* Convert to and send a Spring AMQP message.
4748
* @param message the payload.
4849
* @return a future to indicate success/failure.
4950
*/
50-
ListenableFuture<Boolean> convertAndSend(Object message);
51+
CompletableFuture<Boolean> convertAndSend(Object message);
5152

5253
/**
5354
* Convert to and send a Spring AMQP message. If a {@link MessagePostProcessor} is
@@ -57,15 +58,15 @@ public interface RabbitStreamOperations extends AutoCloseable {
5758
* @param mpp a message post processor.
5859
* @return a future to indicate success/failure.
5960
*/
60-
ListenableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);
61+
CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);
6162

6263
/**
6364
* Send a native stream message.
6465
* @param message the message.
6566
* @return a future to indicate success/failure.
6667
* @see #messageBuilder()
6768
*/
68-
ListenableFuture<Boolean> send(com.rabbitmq.stream.Message message);
69+
CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);
6970

7071
/**
7172
* Return the producer's {@link MessageBuilder} to create native stream messages.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.rabbit.stream.producer;
18+
19+
import java.util.concurrent.CompletableFuture;
20+
21+
/**
22+
* Provides methods for sending messages using a RabbitMQ Stream producer,
23+
* returning {@link CompletableFuture}.
24+
* This interface was added in 2.4.7 to aid migration from methods returning
25+
* {@code ListenableFuture}s to {@link CompletableFuture}s.
26+
*
27+
* @author Gary Russell
28+
* @since 2.4.7
29+
* @deprecated in favor of {@link RabbitStreamOperations}.
30+
*/
31+
@Deprecated
32+
public interface RabbitStreamOperations2 extends RabbitStreamOperations {
33+
34+
}

spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.rabbit.stream.producer;
1818

19+
import java.util.concurrent.CompletableFuture;
20+
1921
import org.springframework.amqp.core.Message;
2022
import org.springframework.amqp.core.MessagePostProcessor;
2123
import org.springframework.amqp.support.converter.MessageConverter;
@@ -27,8 +29,6 @@
2729
import org.springframework.rabbit.stream.support.converter.DefaultStreamMessageConverter;
2830
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
2931
import org.springframework.util.Assert;
30-
import org.springframework.util.concurrent.ListenableFuture;
31-
import org.springframework.util.concurrent.SettableListenableFuture;
3232

3333
import com.rabbitmq.stream.ConfirmationHandler;
3434
import com.rabbitmq.stream.Constants;
@@ -138,27 +138,27 @@ public StreamMessageConverter streamMessageConverter() {
138138

139139

140140
@Override
141-
public ListenableFuture<Boolean> send(Message message) {
142-
SettableListenableFuture<Boolean> future = new SettableListenableFuture<>();
141+
public CompletableFuture<Boolean> send(Message message) {
142+
CompletableFuture<Boolean> future = new CompletableFuture<>();
143143
createOrGetProducer().send(this.streamConverter.fromMessage(message), handleConfirm(future));
144144
return future;
145145
}
146146

147147
@Override
148-
public ListenableFuture<Boolean> convertAndSend(Object message) {
148+
public CompletableFuture<Boolean> convertAndSend(Object message) {
149149
return convertAndSend(message, null);
150150
}
151151

152152
@Override
153-
public ListenableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp) {
153+
public CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp) {
154154
Message message2 = this.messageConverter.toMessage(message, new StreamMessageProperties());
155155
Assert.notNull(message2, "The message converter returned null");
156156
if (mpp != null) {
157157
message2 = mpp.postProcessMessage(message2);
158158
if (message2 == null) {
159159
this.logger.debug("Message Post Processor returned null, message not sent");
160-
SettableListenableFuture<Boolean> future = new SettableListenableFuture<>();
161-
future.set(false);
160+
CompletableFuture<Boolean> future = new CompletableFuture<>();
161+
future.complete(false);
162162
return future;
163163
}
164164
}
@@ -167,8 +167,8 @@ public ListenableFuture<Boolean> convertAndSend(Object message, @Nullable Messag
167167

168168

169169
@Override
170-
public ListenableFuture<Boolean> send(com.rabbitmq.stream.Message message) {
171-
SettableListenableFuture<Boolean> future = new SettableListenableFuture<>();
170+
public CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message) {
171+
CompletableFuture<Boolean> future = new CompletableFuture<>();
172172
createOrGetProducer().send(message, handleConfirm(future));
173173
return future;
174174
}
@@ -178,10 +178,10 @@ public MessageBuilder messageBuilder() {
178178
return createOrGetProducer().messageBuilder();
179179
}
180180

181-
private ConfirmationHandler handleConfirm(SettableListenableFuture<Boolean> future) {
181+
private ConfirmationHandler handleConfirm(CompletableFuture<Boolean> future) {
182182
return confStatus -> {
183183
if (confStatus.isConfirmed()) {
184-
future.set(true);
184+
future.complete(true);
185185
}
186186
else {
187187
int code = confStatus.getCode();
@@ -203,7 +203,7 @@ private ConfirmationHandler handleConfirm(SettableListenableFuture<Boolean> futu
203203
errorMessage = "Unknown code: " + code;
204204
break;
205205
}
206-
future.setException(new StreamSendException(errorMessage, code));
206+
future.completeExceptionally(new StreamSendException(errorMessage, code));
207207
}
208208
};
209209
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.rabbit.stream.producer;
18+
19+
import java.util.concurrent.CompletableFuture;
20+
21+
import com.rabbitmq.stream.Environment;
22+
23+
/**
24+
* This interface was added in 2.4.7 to aid migration from methods returning
25+
* {@code ListenableFuture}s to {@link CompletableFuture}s.
26+
*
27+
* @author Gary Russell
28+
* @since 2.8
29+
* @deprecated in favor of {@link RabbitStreamTemplate}.
30+
*/
31+
@Deprecated
32+
public class RabbitStreamTemplate2 extends RabbitStreamTemplate implements RabbitStreamOperations2 {
33+
34+
public RabbitStreamTemplate2(Environment environment, String streamName) {
35+
super(environment, streamName);
36+
}
37+
38+
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate2.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.concurrent.CompletableFuture;
2020

21+
import org.springframework.amqp.core.AsyncAmqpTemplate2;
2122
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
2223
import org.springframework.amqp.rabbit.core.RabbitTemplate;
2324
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
@@ -32,7 +33,7 @@
3233
*
3334
*/
3435
@Deprecated
35-
public class AsyncRabbitTemplate2 extends AsyncRabbitTemplate {
36+
public class AsyncRabbitTemplate2 extends AsyncRabbitTemplate implements AsyncAmqpTemplate2 {
3637

3738
public AsyncRabbitTemplate2(ConnectionFactory connectionFactory, String exchange, String routingKey,
3839
String replyQueue, String replyAddress) {

src/reference/asciidoc/appendix.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ See <<async-template>> for more information.
4444
The `Jackson2JsonMessageConverter` can now determine the charset from the `contentEncoding` header.
4545
See <<json-message-converter>> for more information.
4646

47+
==== Stream Support Changes
48+
49+
`RabbitStreamOperations` and `RabbitStreamTemplate` have been deprecated in favor of `RabbitStreamOperations2` and `RabbitStreamTemplate2` respectively; they return `CompletableFuture` instead of `ListenableFuture`.
50+
See <<stream-support>> for more information.
51+
4752
==== Changes in 2.3 Since 2.2
4853

4954
This section describes the changes between version 2.2 and version 2.3.

src/reference/asciidoc/stream.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ The `ProducerCustomizer` provides a mechanism to customize the producer before i
6767

6868
Refer to the https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/[Java Client Documentation] about customizing the `Environment` and `Producer`.
6969

70+
IMPORTANT: In version 2.4.7 `RabbitStreamOperations2` and `RabbitStreamTemplate2` were added to assist migration to this version; `RabbitStreamOperations2` and `RabbitStreamTemplate2` are now deprecated in favor of `RabbitStreamOperations` and `RabbitStreamTemplate` respectively.
71+
7072
==== Receiving Messages
7173

7274
Asynchronous message reception is provided by the `StreamListenerContainer` (and the `StreamRabbitListenerContainerFactory` when using `@RabbitListener`).

src/reference/asciidoc/whats-new.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,8 @@ The remoting feature (using RMI) is no longer supported.
1616
The `AsyncRabbitTemplate2`, which was added in 2.4.7 to aid migration to this release, is deprecated in favor of `AsyncRabbitTemplate`.
1717
The `AsyncRabbitTemplate` now returns `CompletableFuture` s instead of `ListenableFuture` s.
1818
See <<async-template>> for more information.
19+
20+
==== Stream Support Changes
21+
22+
`RabbitStreamOperations2` and `RabbitStreamTemplate2` have been deprecated in favor of `RabbitStreamOperations` and `RabbitStreamTemplate` respectively.
23+
See <<stream-support>> for more information.

0 commit comments

Comments
 (0)