Skip to content

Commit 70b35d4

Browse files
committed
Add support for Rabbitmq AMQP 1.0
1 parent 2b6a3e7 commit 70b35d4

File tree

6 files changed

+437
-0
lines changed

6 files changed

+437
-0
lines changed

module/spring-boot-amqp/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ dependencies {
4141
optional(project(":module:spring-boot-metrics"))
4242
optional("io.micrometer:micrometer-core")
4343
optional("org.springframework.amqp:spring-rabbit-stream")
44+
optional("org.springframework.amqp:spring-rabbitmq-client")
4445
optional("org.testcontainers:rabbitmq")
4546

4647
dockerTestImplementation(project(":test-support:spring-boot-docker-test-support"))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2012-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.amqp.autoconfigure;
18+
19+
import com.rabbitmq.client.amqp.Environment;
20+
import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder;
21+
22+
/**
23+
* Callback interface that can be implemented by beans wishing to customize the
24+
* auto-configured {@link Environment} that is created by an
25+
* {@link AmqpEnvironmentBuilder}.
26+
*
27+
* @author Eddú Meléndez
28+
* @since 4.0.0
29+
*/
30+
@FunctionalInterface
31+
public interface AmqpEnvironmentBuilderCustomizer {
32+
33+
/**
34+
* Customize the {@code AmqpEnvironmentBuilder}.
35+
* @param builder the builder to customize
36+
*/
37+
void customize(AmqpEnvironmentBuilder builder);
38+
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Copyright 2012-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.amqp.autoconfigure;
18+
19+
import com.rabbitmq.client.amqp.Connection;
20+
import com.rabbitmq.client.amqp.CredentialsProvider;
21+
import com.rabbitmq.client.amqp.Environment;
22+
import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder;
23+
import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder.EnvironmentConnectionSettings;
24+
25+
import org.springframework.amqp.rabbit.config.ContainerCustomizer;
26+
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
27+
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
28+
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
29+
import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory;
30+
import org.springframework.amqp.rabbitmq.client.RabbitAmqpAdmin;
31+
import org.springframework.amqp.rabbitmq.client.RabbitAmqpTemplate;
32+
import org.springframework.amqp.rabbitmq.client.SingleAmqpConnectionFactory;
33+
import org.springframework.amqp.rabbitmq.client.config.RabbitAmqpListenerContainerFactory;
34+
import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpListenerContainer;
35+
import org.springframework.amqp.support.converter.MessageConverter;
36+
import org.springframework.beans.factory.ObjectProvider;
37+
import org.springframework.boot.amqp.autoconfigure.RabbitConnectionDetails.Address;
38+
import org.springframework.boot.amqp.autoconfigure.RabbitProperties.ListenerRetry;
39+
import org.springframework.boot.amqp.autoconfigure.RabbitRetryTemplateCustomizer.Target;
40+
import org.springframework.boot.autoconfigure.AutoConfiguration;
41+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
42+
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
43+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
44+
import org.springframework.boot.context.properties.EnableConfigurationProperties;
45+
import org.springframework.boot.context.properties.PropertyMapper;
46+
import org.springframework.boot.ssl.SslBundles;
47+
import org.springframework.context.annotation.Bean;
48+
import org.springframework.retry.support.RetryTemplate;
49+
50+
/**
51+
* {@link EnableAutoConfiguration Auto-configuration} for {@link RabbitAmqpTemplate}.
52+
*
53+
* @author Eddú Meléndez
54+
* @since 4.0.0
55+
*/
56+
@AutoConfiguration
57+
@ConditionalOnClass({ RabbitAmqpTemplate.class, Connection.class })
58+
@EnableConfigurationProperties(RabbitProperties.class)
59+
public final class RabbitAmqpAutoConfiguration {
60+
61+
private final RabbitProperties properties;
62+
63+
RabbitAmqpAutoConfiguration(RabbitProperties properties) {
64+
this.properties = properties;
65+
}
66+
67+
@Bean
68+
@ConditionalOnMissingBean
69+
RabbitConnectionDetails rabbitConnectionDetails(ObjectProvider<SslBundles> sslBundles) {
70+
return new PropertiesRabbitConnectionDetails(this.properties, sslBundles.getIfAvailable());
71+
}
72+
73+
@Bean(name = "rabbitAmqpListenerContainerFactory")
74+
@ConditionalOnMissingBean(name = "rabbitAmqpListenerContainerFactory")
75+
public RabbitAmqpListenerContainerFactory rabbitAmqpListenerContainerFactory(
76+
AmqpConnectionFactory connectionFactory,
77+
ObjectProvider<ContainerCustomizer<RabbitAmqpListenerContainer>> amqpContainerCustomizer,
78+
ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers,
79+
ObjectProvider<MessageRecoverer> messageRecoverer) {
80+
RabbitAmqpListenerContainerFactory factory = new RabbitAmqpListenerContainerFactory(connectionFactory);
81+
amqpContainerCustomizer.ifUnique(factory::setContainerCustomizer);
82+
83+
RabbitProperties.AmqpContainer configuration = this.properties.getListener().getSimple();
84+
factory.setObservationEnabled(configuration.isObservationEnabled());
85+
ListenerRetry retryConfig = configuration.getRetry();
86+
if (retryConfig.isEnabled()) {
87+
RetryInterceptorBuilder<?, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless()
88+
: RetryInterceptorBuilder.stateful();
89+
90+
RetryTemplate retryTemplate = new RetryTemplateFactory(retryTemplateCustomizers.orderedStream().toList())
91+
.createRetryTemplate(retryConfig, Target.LISTENER);
92+
93+
builder.retryOperations(retryTemplate);
94+
MessageRecoverer recoverer = (messageRecoverer.getIfAvailable() != null) ? messageRecoverer.getIfAvailable()
95+
: new RejectAndDontRequeueRecoverer();
96+
builder.recoverer(recoverer);
97+
factory.setAdviceChain(builder.build());
98+
}
99+
return factory;
100+
}
101+
102+
@Bean
103+
@ConditionalOnMissingBean
104+
public Environment rabbitAmqpEnvironment(RabbitConnectionDetails connectionDetails,
105+
ObjectProvider<AmqpEnvironmentBuilderCustomizer> customizers,
106+
ObjectProvider<CredentialsProvider> credentialsProvider) {
107+
PropertyMapper map = PropertyMapper.get();
108+
EnvironmentConnectionSettings environmentConnectionSettings = new AmqpEnvironmentBuilder().connectionSettings();
109+
Address address = connectionDetails.getFirstAddress();
110+
map.from(address::host).whenNonNull().to(environmentConnectionSettings::host);
111+
map.from(address::port).to(environmentConnectionSettings::port);
112+
map.from(connectionDetails::getUsername).whenNonNull().to(environmentConnectionSettings::username);
113+
map.from(connectionDetails::getPassword).whenNonNull().to(environmentConnectionSettings::password);
114+
map.from(connectionDetails::getVirtualHost).whenNonNull().to(environmentConnectionSettings::virtualHost);
115+
map.from(credentialsProvider::getIfAvailable)
116+
.whenNonNull()
117+
.to(environmentConnectionSettings::credentialsProvider);
118+
119+
AmqpEnvironmentBuilder builder = environmentConnectionSettings.environmentBuilder();
120+
customizers.orderedStream().forEach((customizer) -> customizer.customize(builder));
121+
return builder.build();
122+
}
123+
124+
@Bean
125+
@ConditionalOnMissingBean
126+
public AmqpConnectionFactory amqpConnection(Environment environment) {
127+
return new SingleAmqpConnectionFactory(environment);
128+
}
129+
130+
@Bean
131+
@ConditionalOnMissingBean
132+
public RabbitAmqpTemplate rabbitAmqpTemplate(AmqpConnectionFactory connectionFactory,
133+
ObjectProvider<RabbitAmqpTemplateCustomizer> customizers,
134+
ObjectProvider<MessageConverter> messageConverter) {
135+
RabbitAmqpTemplate rabbitAmqpTemplate = new RabbitAmqpTemplate(connectionFactory);
136+
if (messageConverter.getIfAvailable() != null) {
137+
rabbitAmqpTemplate.setMessageConverter(messageConverter.getIfAvailable());
138+
}
139+
RabbitProperties.Template templateProperties = this.properties.getTemplate();
140+
141+
PropertyMapper map = PropertyMapper.get();
142+
map.from(templateProperties::getDefaultReceiveQueue).whenNonNull().to(rabbitAmqpTemplate::setReceiveQueue);
143+
map.from(templateProperties::getExchange).whenNonNull().to(rabbitAmqpTemplate::setExchange);
144+
map.from(templateProperties::getRoutingKey).to(rabbitAmqpTemplate::setRoutingKey);
145+
146+
customizers.orderedStream().forEach((customizer) -> customizer.customize(rabbitAmqpTemplate));
147+
return rabbitAmqpTemplate;
148+
}
149+
150+
@Bean
151+
@ConditionalOnMissingBean
152+
public RabbitAmqpAdmin rabbitAmqpAdmin(AmqpConnectionFactory connectionFactory) {
153+
return new RabbitAmqpAdmin(connectionFactory);
154+
}
155+
156+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2012-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.amqp.autoconfigure;
18+
19+
import org.springframework.amqp.rabbitmq.client.RabbitAmqpTemplate;
20+
21+
/**
22+
* Callback interface that can be used to customize a {@link RabbitAmqpTemplate}.
23+
*
24+
* @author Eddú Meléndez
25+
* @since 4.0.0
26+
*/
27+
@FunctionalInterface
28+
public interface RabbitAmqpTemplateCustomizer {
29+
30+
/**
31+
* Callback to customize a {@link RabbitAmqpTemplate} instance.
32+
* @param rabbitAmqpTemplate the rabbitAmqpTemplate to customize
33+
*/
34+
void customize(RabbitAmqpTemplate rabbitAmqpTemplate);
35+
36+
}
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
org.springframework.boot.amqp.autoconfigure.RabbitAutoConfiguration
2+
org.springframework.boot.amqp.autoconfigure.RabbitAmqpAutoConfiguration
23
org.springframework.boot.amqp.autoconfigure.health.RabbitHealthContributorAutoConfiguration
34
org.springframework.boot.amqp.autoconfigure.metrics.RabbitMetricsAutoConfiguration

0 commit comments

Comments
 (0)