Skip to content

Commit cffebb9

Browse files
authored
GH-1459: Improve MeterRegistry Discovery
Resolves #1459 * Add micrometer properties and container customizers to LCFB. * Use `ObjectProvider` to locate registry.
1 parent fe37e01 commit cffebb9

File tree

6 files changed

+399
-76
lines changed

6 files changed

+399
-76
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.amqp.rabbit.config;
1818

19+
import java.util.HashMap;
1920
import java.util.Map;
2021
import java.util.concurrent.Executor;
2122

@@ -61,6 +62,8 @@
6162
public class ListenerContainerFactoryBean extends AbstractFactoryBean<AbstractMessageListenerContainer>
6263
implements ApplicationContextAware, BeanNameAware, ApplicationEventPublisherAware, SmartLifecycle {
6364

65+
private final Map<String, String> micrometerTags = new HashMap<>();
66+
6467
private ApplicationContext applicationContext;
6568

6669
private String beanName;
@@ -171,6 +174,12 @@ public class ListenerContainerFactoryBean extends AbstractFactoryBean<AbstractMe
171174

172175
private Boolean consumerBatchEnabled;
173176

177+
private Boolean micrometerEnabled;
178+
179+
private ContainerCustomizer<SimpleMessageListenerContainer> smlcCustomizer;
180+
181+
private ContainerCustomizer<DirectMessageListenerContainer> dmlcCustomizer;
182+
174183
@Override
175184
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
176185
this.applicationEventPublisher = applicationEventPublisher;
@@ -427,6 +436,44 @@ public void setRetryDeclarationInterval(long retryDeclarationInterval) {
427436
this.retryDeclarationInterval = retryDeclarationInterval;
428437
}
429438

439+
/**
440+
* Set to false to disable micrometer listener timers.
441+
* @param micrometerEnabled false to disable.
442+
* @since 2.4.6
443+
*/
444+
public void setMicrometerEnabled(boolean enabled) {
445+
this.micrometerEnabled = enabled;
446+
}
447+
448+
/**
449+
* Set additional tags for the Micrometer listener timers.
450+
* @param tags the tags.
451+
* @since 2.4.6
452+
*/
453+
public void setMicrometerTags(Map<String, String> tags) {
454+
this.micrometerTags.putAll(tags);
455+
}
456+
457+
/**
458+
* Set a {@link ContainerCustomizer} that is invoked after a container is created and
459+
* configured to enable further customization of the container.
460+
* @param containerCustomizer the customizer.
461+
* @since 2.4.6
462+
*/
463+
public void setSMLCCustomizer(ContainerCustomizer<SimpleMessageListenerContainer> customizer) {
464+
this.smlcCustomizer = customizer;
465+
}
466+
467+
/**
468+
* Set a {@link ContainerCustomizer} that is invoked after a container is created and
469+
* configured to enable further customization of the container.
470+
* @param containerCustomizer the customizer.
471+
* @since 2.4.6
472+
*/
473+
public void setDMLCCustomizer(ContainerCustomizer<DirectMessageListenerContainer> customizer) {
474+
this.dmlcCustomizer = customizer;
475+
}
476+
430477
@Override
431478
public Class<?> getObjectType() {
432479
return this.listenerContainer == null
@@ -478,7 +525,16 @@ protected AbstractMessageListenerContainer createInstance() { // NOSONAR complex
478525
.acceptIfNotNull(this.autoDeclare, container::setAutoDeclare)
479526
.acceptIfNotNull(this.failedDeclarationRetryInterval, container::setFailedDeclarationRetryInterval)
480527
.acceptIfNotNull(this.exclusiveConsumerExceptionLogger,
481-
container::setExclusiveConsumerExceptionLogger);
528+
container::setExclusiveConsumerExceptionLogger)
529+
.acceptIfNotNull(this.micrometerEnabled, container::setMicrometerEnabled)
530+
.acceptIfCondition(this.micrometerTags.size() > 0, this.micrometerTags,
531+
container::setMicrometerTags);
532+
if (this.smlcCustomizer != null && this.type.equals(Type.simple)) {
533+
this.smlcCustomizer.configure((SimpleMessageListenerContainer) container);
534+
}
535+
else if (this.dmlcCustomizer != null && this.type.equals(Type.direct)) {
536+
this.dmlcCustomizer.configure((DirectMessageListenerContainer) container);
537+
}
482538
container.afterPropertiesSet();
483539
this.listenerContainer = container;
484540
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

Lines changed: 0 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626
import java.util.Map.Entry;
2727
import java.util.Properties;
2828
import java.util.Set;
29-
import java.util.concurrent.ConcurrentHashMap;
30-
import java.util.concurrent.ConcurrentMap;
3129
import java.util.concurrent.CopyOnWriteArrayList;
3230
import java.util.concurrent.Executor;
3331
import java.util.stream.Collectors;
@@ -93,10 +91,6 @@
9391

9492
import com.rabbitmq.client.Channel;
9593
import com.rabbitmq.client.ShutdownSignalException;
96-
import io.micrometer.core.instrument.MeterRegistry;
97-
import io.micrometer.core.instrument.Timer;
98-
import io.micrometer.core.instrument.Timer.Builder;
99-
import io.micrometer.core.instrument.Timer.Sample;
10094

10195
/**
10296
* @author Mark Pollack
@@ -2102,72 +2096,4 @@ else if (!RabbitUtils.isNormalChannelClose(cause)) {
21022096

21032097
}
21042098

2105-
private static final class MicrometerHolder {
2106-
2107-
private final ConcurrentMap<String, Timer> timers = new ConcurrentHashMap<>();
2108-
2109-
private final MeterRegistry registry;
2110-
2111-
private final Map<String, String> tags;
2112-
2113-
private final String listenerId;
2114-
2115-
MicrometerHolder(@Nullable ApplicationContext context, String listenerId, Map<String, String> tags) {
2116-
if (context == null) {
2117-
throw new IllegalStateException("No micrometer registry present");
2118-
}
2119-
Map<String, MeterRegistry> registries = context.getBeansOfType(MeterRegistry.class, false, false);
2120-
if (registries.size() == 1) {
2121-
this.registry = registries.values().iterator().next();
2122-
this.listenerId = listenerId;
2123-
this.tags = tags;
2124-
}
2125-
else {
2126-
throw new IllegalStateException("No micrometer registry present");
2127-
}
2128-
}
2129-
2130-
Object start() {
2131-
return Timer.start(this.registry);
2132-
}
2133-
2134-
void success(Object sample, String queue) {
2135-
Timer timer = this.timers.get(queue + "none");
2136-
if (timer == null) {
2137-
timer = buildTimer(this.listenerId, "success", queue, "none");
2138-
}
2139-
((Sample) sample).stop(timer);
2140-
}
2141-
2142-
void failure(Object sample, String queue, String exception) {
2143-
Timer timer = this.timers.get(queue + exception);
2144-
if (timer == null) {
2145-
timer = buildTimer(this.listenerId, "failure", queue, exception);
2146-
}
2147-
((Sample) sample).stop(timer);
2148-
}
2149-
2150-
private Timer buildTimer(String aListenerId, String result, String queue, String exception) {
2151-
2152-
Builder builder = Timer.builder("spring.rabbitmq.listener")
2153-
.description("Spring RabbitMQ Listener")
2154-
.tag("listener.id", aListenerId)
2155-
.tag("queue", queue)
2156-
.tag("result", result)
2157-
.tag("exception", exception);
2158-
if (this.tags != null && !this.tags.isEmpty()) {
2159-
this.tags.forEach((key, value) -> builder.tag(key, value));
2160-
}
2161-
Timer registeredTimer = builder.register(this.registry);
2162-
this.timers.put(queue + exception, registeredTimer);
2163-
return registeredTimer;
2164-
}
2165-
2166-
void destroy() {
2167-
this.timers.values().forEach(this.registry::remove);
2168-
this.timers.clear();
2169-
}
2170-
2171-
}
2172-
21732099
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.listener;
18+
19+
import java.util.Collections;
20+
import java.util.Map;
21+
import java.util.Map.Entry;
22+
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.concurrent.ConcurrentMap;
24+
25+
import org.springframework.beans.factory.NoUniqueBeanDefinitionException;
26+
import org.springframework.beans.factory.config.BeanDefinition;
27+
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
28+
import org.springframework.context.ApplicationContext;
29+
import org.springframework.context.ConfigurableApplicationContext;
30+
import org.springframework.lang.Nullable;
31+
32+
import io.micrometer.core.instrument.MeterRegistry;
33+
import io.micrometer.core.instrument.Timer;
34+
import io.micrometer.core.instrument.Timer.Builder;
35+
import io.micrometer.core.instrument.Timer.Sample;
36+
37+
/**
38+
* Abstraction to avoid hard reference to Micrometer.
39+
*
40+
* @author Gary Russell
41+
* @since 2.4.6
42+
*
43+
*/
44+
final class MicrometerHolder {
45+
46+
private final ConcurrentMap<String, Timer> timers = new ConcurrentHashMap<>();
47+
48+
private final MeterRegistry registry;
49+
50+
private final Map<String, String> tags;
51+
52+
private final String listenerId;
53+
54+
MicrometerHolder(@Nullable ApplicationContext context, String listenerId, Map<String, String> tags) {
55+
if (context == null) {
56+
throw new IllegalStateException("No micrometer registry present");
57+
}
58+
try {
59+
this.registry = context.getBeanProvider(MeterRegistry.class).getIfUnique();
60+
}
61+
catch (NoUniqueBeanDefinitionException ex) {
62+
throw new IllegalStateException(ex);
63+
}
64+
if (this.registry != null) {
65+
this.listenerId = listenerId;
66+
this.tags = tags;
67+
}
68+
else {
69+
throw new IllegalStateException("No micrometer registry present (or more than one and "
70+
+ "there is not exactly one marked with @Primary)");
71+
}
72+
}
73+
74+
private Map<String, MeterRegistry> filterRegistries(Map<String, MeterRegistry> registries,
75+
ApplicationContext context) {
76+
77+
if (registries.size() == 1) {
78+
return registries;
79+
}
80+
MeterRegistry primary = null;
81+
if (context instanceof ConfigurableApplicationContext) {
82+
BeanDefinitionRegistry bdr = (BeanDefinitionRegistry) ((ConfigurableApplicationContext) context)
83+
.getBeanFactory();
84+
for (Entry<String, MeterRegistry> entry : registries.entrySet()) {
85+
BeanDefinition beanDefinition = bdr.getBeanDefinition(entry.getKey());
86+
if (beanDefinition.isPrimary()) {
87+
if (primary != null) {
88+
primary = null;
89+
break;
90+
}
91+
else {
92+
primary = entry.getValue();
93+
}
94+
}
95+
}
96+
}
97+
if (primary != null) {
98+
return Collections.singletonMap("primary", primary);
99+
}
100+
else {
101+
return registries;
102+
}
103+
}
104+
105+
Object start() {
106+
return Timer.start(this.registry);
107+
}
108+
109+
void success(Object sample, String queue) {
110+
Timer timer = this.timers.get(queue + "none");
111+
if (timer == null) {
112+
timer = buildTimer(this.listenerId, "success", queue, "none");
113+
}
114+
((Sample) sample).stop(timer);
115+
}
116+
117+
void failure(Object sample, String queue, String exception) {
118+
Timer timer = this.timers.get(queue + exception);
119+
if (timer == null) {
120+
timer = buildTimer(this.listenerId, "failure", queue, exception);
121+
}
122+
((Sample) sample).stop(timer);
123+
}
124+
125+
private Timer buildTimer(String aListenerId, String result, String queue, String exception) {
126+
127+
Builder builder = Timer.builder("spring.rabbitmq.listener")
128+
.description("Spring RabbitMQ Listener")
129+
.tag("listener.id", aListenerId)
130+
.tag("queue", queue)
131+
.tag("result", result)
132+
.tag("exception", exception);
133+
if (this.tags != null && !this.tags.isEmpty()) {
134+
this.tags.forEach((key, value) -> builder.tag(key, value));
135+
}
136+
Timer registeredTimer = builder.register(this.registry);
137+
this.timers.put(queue + exception, registeredTimer);
138+
return registeredTimer;
139+
}
140+
141+
void destroy() {
142+
this.timers.values().forEach(this.registry::remove);
143+
this.timers.clear();
144+
}
145+
146+
}

0 commit comments

Comments
 (0)