Skip to content

Commit d96aa71

Browse files
garyrussellartembilan
authored andcommitted
GH-1489: Batch RabbitListener Improvements
Resolves #1489 - allow listeners to consume `Collection<?>` as well as `List<?>` - detect a non-batch listener method in the batch adapter - coerce `batchListener` to `true` when `consumerBatchEnabled` is true
1 parent 515eb9a commit d96aa71

File tree

6 files changed

+115
-12
lines changed

6 files changed

+115
-12
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2020 the original author or authors.
2+
* Copyright 2014-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -123,13 +123,18 @@ public void setReceiveTimeout(Long receiveTimeout) {
123123

124124
/**
125125
* Set to true to present a list of messages based on the {@link #setBatchSize(Integer)},
126-
* if the listener supports it.
126+
* if the listener supports it. Starting with version 3.0, setting this to true will
127+
* also {@link #setBatchListener(boolean)} to true.
127128
* @param consumerBatchEnabled true to create message batches in the container.
128129
* @since 2.2
129130
* @see #setBatchSize(Integer)
131+
* @see #setBatchListener(boolean)
130132
*/
131133
public void setConsumerBatchEnabled(boolean consumerBatchEnabled) {
132134
this.consumerBatchEnabled = consumerBatchEnabled;
135+
if (consumerBatchEnabled) {
136+
setBatchListener(true);
137+
}
133138
}
134139

135140
@Override

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.lang.reflect.ParameterizedType;
2121
import java.lang.reflect.Type;
2222
import java.lang.reflect.WildcardType;
23+
import java.util.Collection;
2324
import java.util.List;
2425

2526
import org.springframework.amqp.core.MessageProperties;
@@ -328,6 +329,8 @@ protected final class MessagingMessageConverterAdapter extends MessagingMessageC
328329

329330
private boolean isAmqpMessageList;
330331

332+
private boolean isCollection;
333+
331334
MessagingMessageConverterAdapter(Object bean, Method method, boolean batch) {
332335
this.bean = bean;
333336
this.method = method;
@@ -392,6 +395,12 @@ private Type determineInferredType() { // NOSONAR - complexity
392395

393396
if (genericParameterType == null) {
394397
genericParameterType = extractGenericParameterTypFromMethodParameter(methodParameter);
398+
if (this.isBatch && !this.isCollection) {
399+
throw new IllegalStateException(
400+
"Mis-configuration; a batch listener must consume a List<?> or "
401+
+ "Collection<?> for method: " + this.method);
402+
}
403+
395404
}
396405
else {
397406
if (MessagingMessageListenerAdapter.this.logger.isDebugEnabled()) {
@@ -435,9 +444,11 @@ private Type extractGenericParameterTypFromMethodParameter(MethodParameter metho
435444
genericParameterType = ((ParameterizedType) genericParameterType).getActualTypeArguments()[0];
436445
}
437446
else if (this.isBatch
438-
&& parameterizedType.getRawType().equals(List.class)
439-
&& parameterizedType.getActualTypeArguments().length == 1) {
447+
&& ((parameterizedType.getRawType().equals(List.class)
448+
|| parameterizedType.getRawType().equals(Collection.class))
449+
&& parameterizedType.getActualTypeArguments().length == 1)) {
440450

451+
this.isCollection = true;
441452
Type paramType = parameterizedType.getActualTypeArguments()[0];
442453
boolean messageHasGeneric = paramType instanceof ParameterizedType
443454
&& ((ParameterizedType) paramType).getRawType().equals(Message.class);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitBatchIntegrationTests.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020

2121
import java.io.Serializable;
22+
import java.util.ArrayList;
23+
import java.util.Collection;
2224
import java.util.List;
2325
import java.util.concurrent.CountDownLatch;
2426
import java.util.concurrent.TimeUnit;
@@ -51,7 +53,7 @@
5153
*/
5254
@SpringJUnitConfig
5355
@DirtiesContext
54-
@RabbitAvailable(queues = { "batch.1", "batch.2", "batch.3", "batch.4" })
56+
@RabbitAvailable(queues = { "batch.1", "batch.2", "batch.3", "batch.4", "batch.5" })
5557
public class EnableRabbitBatchIntegrationTests {
5658

5759
@Autowired
@@ -114,6 +116,16 @@ public void nativeMessageList() throws InterruptedException {
114116
.isEqualTo(2);
115117
}
116118

119+
@Test
120+
public void collectionWithStringInfer() throws InterruptedException {
121+
this.template.convertAndSend("batch.5", new Foo("foo"));
122+
this.template.convertAndSend("batch.5", new Foo("bar"));
123+
assertThat(this.listener.fivesLatch.await(10, TimeUnit.SECONDS)).isTrue();
124+
assertThat(this.listener.fives).hasSize(2);
125+
assertThat(this.listener.fives.get(0).getBar()).isEqualTo("foo");
126+
assertThat(this.listener.fives.get(1).getBar()).isEqualTo("bar");
127+
}
128+
117129
@Configuration
118130
@EnableRabbit
119131
public static class Config {
@@ -186,6 +198,10 @@ public static class Listener {
186198

187199
CountDownLatch fooConsumerBatchTooLatch = new CountDownLatch(1);
188200

201+
List<Foo> fives = new ArrayList<>();
202+
203+
CountDownLatch fivesLatch = new CountDownLatch(1);
204+
189205
private List<org.springframework.amqp.core.Message> nativeMessages;
190206

191207
private final CountDownLatch nativeMessagesLatch = new CountDownLatch(1);
@@ -214,6 +230,12 @@ public void listen4(List<org.springframework.amqp.core.Message> in) {
214230
this.nativeMessagesLatch.countDown();
215231
}
216232

233+
@RabbitListener(queues = "batch.5")
234+
public void listen5(Collection<Foo> in) {
235+
this.fives.addAll(in);
236+
this.fivesLatch.countDown();
237+
}
238+
217239
}
218240

219241
@SuppressWarnings("serial")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.listener.adapter;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
21+
22+
import java.lang.reflect.Method;
23+
import java.util.List;
24+
25+
import org.junit.jupiter.api.Test;
26+
27+
import org.springframework.amqp.utils.test.TestUtils;
28+
29+
/**
30+
* @author Gary Russell
31+
* @since 3.0
32+
*
33+
*/
34+
public class BatchMessagingMessageListenerAdapterTests {
35+
36+
@Test
37+
void compatibleMethod() throws Exception {
38+
Method method = getClass().getDeclaredMethod("listen", List.class);
39+
BatchMessagingMessageListenerAdapter adapter = new BatchMessagingMessageListenerAdapter(this, method, false,
40+
null, null);
41+
assertThat(TestUtils.getPropertyValue(adapter, "messagingMessageConverter.inferredArgumentType"))
42+
.isEqualTo(String.class);
43+
Method badMethod = getClass().getDeclaredMethod("listen", String.class);
44+
assertThatIllegalStateException().isThrownBy(() ->
45+
new BatchMessagingMessageListenerAdapter(this, badMethod, false, null, null)
46+
).withMessageStartingWith("Mis-configuration");
47+
}
48+
49+
public void listen(String in) {
50+
}
51+
52+
public void listen(List<String> in) {
53+
}
54+
55+
}

src/reference/asciidoc/amqp.adoc

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3422,7 +3422,7 @@ Adding a `group` attribute causes a bean of type `Collection<MessageListenerCont
34223422
===== @RabbitListener with Batching
34233423

34243424
When receiving a <<template-batching, a batch>> of messages, the de-batching is normally performed by the container and the listener is invoked with one message at at time.
3425-
Starting with version 2.2, you can configure the listener container factory and listener to receive the entire batch in one call, simply set the factory's `batchListener` property, and make the method payload parameter a `List`:
3425+
Starting with version 2.2, you can configure the listener container factory and listener to receive the entire batch in one call, simply set the factory's `batchListener` property, and make the method payload parameter a `List` or `Collection`:
34263426

34273427
====
34283428
[source, java]
@@ -3486,20 +3486,17 @@ When using `consumerBatchEnabled` with `@RabbitListener`:
34863486
----
34873487
@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
34883488
public void consumerBatch1(List<Message> amqpMessages) {
3489-
this.amqpMessagesReceived = amqpMessages;
3490-
this.batch1Latch.countDown();
3489+
...
34913490
}
34923491
34933492
@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
34943493
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
3495-
this.messagingMessagesReceived = messages;
3496-
this.batch2Latch.countDown();
3494+
...
34973495
}
34983496
34993497
@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
35003498
public void consumerBatch3(List<Invoice> strings) {
3501-
this.batch3Strings = strings;
3502-
this.batch3Latch.countDown();
3499+
...
35033500
}
35043501
----
35053502
====
@@ -3511,6 +3508,12 @@ public void consumerBatch3(List<Invoice> strings) {
35113508
You can also add a `Channel` parameter, often used when using `MANUAL` ack mode.
35123509
This is not very useful with the third example because you don't have access to the `delivery_tag` property.
35133510

3511+
Spring Boot provides a configuration property for `consumerBatchEnabled` and `batchSize`, but not for `batchListener`.
3512+
Starting with version 3.0, setting `consumerBatchEnabled` to `true` on the container factory also sets `batchListener` to `true`.
3513+
When `consumerBatchEnabled` is `true`, the listener **must** be a batch listener.
3514+
3515+
Starting with version 3.0, listener methods can consume `Collection<?>` or `List<?>`.
3516+
35143517
[[using-container-factories]]
35153518
===== Using Container Factories
35163519

src/reference/asciidoc/whats-new.adoc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,10 @@ See <<async-template>> for more information.
2121

2222
`RabbitStreamOperations2` and `RabbitStreamTemplate2` have been deprecated in favor of `RabbitStreamOperations` and `RabbitStreamTemplate` respectively.
2323
See <<stream-support>> for more information.
24+
25+
==== `@RabbitListener` Changes
26+
27+
Batch listeners can now consume `Collection<?>` as well as `List<?>`.
28+
The batch messaging adapter now ensures that the method is suitable for consuming batches.
29+
When setting the container factory `consumerBatchEnabled` to `true`, the `batchListener` property is also set to `true`.
30+
See <<receiving-batch>> for more infoprmation.

0 commit comments

Comments
 (0)