Skip to content

Commit d143d4b

Browse files
authored
GH-1465: Part 1: Provision Super Streams over AMQP
See #1465 Inspired by https://github.com/rabbitmq/rabbitmq-stream-java-client/blob/29b1a3eca72c8e3f719db4fd9fafdf503f37ea20/src/test/java/com/rabbitmq/stream/impl/TestUtils.java#L244-L276 * Docs and Polishing. - address PR review - move abstract test class to `support`
1 parent 6dae0fc commit d143d4b

File tree

5 files changed

+170
-4
lines changed

5 files changed

+170
-4
lines changed
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.rabbit.stream.config;
18+
19+
import java.util.ArrayList;
20+
import java.util.Collection;
21+
import java.util.List;
22+
import java.util.Map;
23+
import java.util.stream.IntStream;
24+
25+
import org.springframework.amqp.core.Binding;
26+
import org.springframework.amqp.core.Binding.DestinationType;
27+
import org.springframework.amqp.core.Declarable;
28+
import org.springframework.amqp.core.Declarables;
29+
import org.springframework.amqp.core.DirectExchange;
30+
import org.springframework.amqp.core.Queue;
31+
32+
/**
33+
* Create Super Stream Topology {@link Declarable}s.
34+
*
35+
* @author Gary Russell
36+
* @since 3.0
37+
*
38+
*/
39+
public class SuperStream extends Declarables {
40+
41+
/**
42+
* Create a Super Stream with the provided parameters.
43+
* @param name the stream name.
44+
* @param partitions the number of partitions.
45+
*/
46+
public SuperStream(String name, int partitions) {
47+
super(declarables(name, partitions));
48+
}
49+
50+
private static Collection<Declarable> declarables(String name, int partitions) {
51+
List<Declarable> declarables = new ArrayList<>();
52+
String[] rks = IntStream.range(0, partitions).mapToObj(String::valueOf).toArray(String[]::new);
53+
declarables.add(new DirectExchange(name, true, false, Map.of("x-super-stream", true)));
54+
for (int i = 0; i < partitions; i++) {
55+
String rk = rks[i];
56+
Queue q = new Queue(name + "-" + rk, true, false, false, Map.of("x-queue-type", "stream"));
57+
declarables.add(q);
58+
declarables.add(new Binding(q.getName(), DestinationType.QUEUE, name, rk,
59+
Map.of("x-stream-partition-order", i)));
60+
}
61+
return declarables;
62+
}
63+
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.rabbit.stream.config;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.List;
22+
23+
import org.junit.jupiter.api.Test;
24+
25+
import org.springframework.amqp.core.Declarables;
26+
import org.springframework.amqp.core.DirectExchange;
27+
import org.springframework.amqp.core.Queue;
28+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
29+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
30+
import org.springframework.amqp.rabbit.core.RabbitAdmin;
31+
import org.springframework.beans.factory.annotation.Autowired;
32+
import org.springframework.context.annotation.Bean;
33+
import org.springframework.context.annotation.Configuration;
34+
import org.springframework.rabbit.stream.support.AbstractIntegrationTests;
35+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
36+
37+
/**
38+
* @author Gary Russell
39+
* @since 3.0
40+
*
41+
*/
42+
@SpringJUnitConfig
43+
public class SuperStreamProvisioningTests extends AbstractIntegrationTests {
44+
45+
@Test
46+
void provision(@Autowired Declarables declarables, @Autowired CachingConnectionFactory cf,
47+
@Autowired RabbitAdmin admin) {
48+
49+
assertThat(declarables.getDeclarables()).hasSize(7);
50+
cf.createConnection();
51+
List<Queue> queues = declarables.getDeclarablesByType(Queue.class);
52+
assertThat(queues).extracting(que -> que.getName()).contains("test-0", "test-1", "test-2");
53+
queues.forEach(que -> admin.deleteQueue(que.getName()));
54+
declarables.getDeclarablesByType(DirectExchange.class).forEach(ex -> admin.deleteExchange(ex.getName()));
55+
}
56+
57+
@Configuration
58+
public static class Config {
59+
60+
@Bean
61+
CachingConnectionFactory cf() {
62+
return new CachingConnectionFactory("localhost", amqpPort());
63+
}
64+
65+
@Bean
66+
RabbitAdmin admin(ConnectionFactory cf) {
67+
return new RabbitAdmin(cf);
68+
}
69+
70+
@Bean
71+
SuperStream superStream() {
72+
return new SuperStream("test", 3);
73+
}
74+
75+
}
76+
77+
}

spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
4545
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
4646
import org.springframework.rabbit.stream.retry.StreamRetryOperationsInterceptorFactoryBean;
47+
import org.springframework.rabbit.stream.support.AbstractIntegrationTests;
4748
import org.springframework.rabbit.stream.support.StreamMessageProperties;
4849
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
4950
import org.springframework.test.annotation.DirtiesContext;
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.rabbit.stream.listener;
17+
package org.springframework.rabbit.stream.support;
1818

1919
import java.time.Duration;
2020

@@ -48,15 +48,15 @@ public abstract class AbstractIntegrationTests {
4848
}
4949
}
5050

51-
static int amqpPort() {
51+
public static int amqpPort() {
5252
return RABBITMQ != null ? RABBITMQ.getMappedPort(5672) : 5672;
5353
}
5454

55-
static int managementPort() {
55+
public static int managementPort() {
5656
return RABBITMQ != null ? RABBITMQ.getMappedPort(15672) : 15672;
5757
}
5858

59-
static int streamPort() {
59+
public static int streamPort() {
6060
return RABBITMQ != null ? RABBITMQ.getMappedPort(5552) : 5552;
6161
}
6262

src/reference/asciidoc/stream.adoc

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,3 +157,27 @@ public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTempla
157157
====
158158

159159
IMPORTANT: Stateful retry is not supported with this container.
160+
161+
==== Super Streams
162+
163+
A Super Stream is an abstract concept for a partitioned stream, implemented by binding a number of stream queues to an exchange having an argument `x-super-stream: true`.
164+
165+
===== Provisioning
166+
167+
For convenience, a super stream can be provisioned by defining a single bean of type `SuperStream`.
168+
169+
====
170+
[source, java]
171+
----
172+
@Bean
173+
SuperStream superStream() {
174+
return new SuperStream("my.super.stream", 3);
175+
}
176+
----
177+
====
178+
179+
The `RabbitAdmin` detects this bean and will declare the exchange (`my.super.stream`) and 3 queues (partitions) - `my.super-stream-n` where `n` is `0`, `1`, `2`, bound with routing keys equal to `n`.
180+
181+
===== Consuming Super Streams with Single Active Consumers
182+
183+
TBD.

0 commit comments

Comments
 (0)