Skip to content

Commit 6436440

Browse files
committed
Fix issues, segregate configurations
1 parent 4dd386c commit 6436440

13 files changed

+267
-71
lines changed

spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/ErrorHandlingApp.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
@SpringBootApplication
1212
@EnableScheduling
1313
public class ErrorHandlingApp {
14+
1415
@Autowired
1516
MessageProducer messageProducer;
1617

spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXCustomAmqpConfiguration.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,31 @@
11
package com.baeldung.springamqp.errorhandling.configuration;
22

3-
import org.springframework.amqp.core.*;
3+
import org.springframework.amqp.core.Binding;
4+
import org.springframework.amqp.core.BindingBuilder;
5+
import org.springframework.amqp.core.DirectExchange;
6+
import org.springframework.amqp.core.FanoutExchange;
7+
import org.springframework.amqp.core.Queue;
8+
import org.springframework.amqp.core.QueueBuilder;
9+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
410
import org.springframework.context.annotation.Bean;
11+
import org.springframework.context.annotation.Configuration;
512

6-
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.*;
13+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
14+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
15+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
716

8-
//@Configuration
17+
@Configuration
18+
@ConditionalOnProperty(
19+
value = "amqp.configuration.current",
20+
havingValue = "dlx-custom")
921
public class DLXCustomAmqpConfiguration {
1022
public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
1123

1224
@Bean
1325
Queue messagesQueue() {
1426
return QueueBuilder.durable(QUEUE_MESSAGES)
15-
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
16-
.build();
27+
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
28+
.build();
1729
}
1830

1931
@Bean

spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXParkingLotAmqpConfiguration.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,23 @@
11
package com.baeldung.springamqp.errorhandling.configuration;
22

3-
import org.springframework.amqp.core.*;
3+
import org.springframework.amqp.core.Binding;
4+
import org.springframework.amqp.core.BindingBuilder;
5+
import org.springframework.amqp.core.DirectExchange;
6+
import org.springframework.amqp.core.FanoutExchange;
7+
import org.springframework.amqp.core.Queue;
8+
import org.springframework.amqp.core.QueueBuilder;
9+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
410
import org.springframework.context.annotation.Bean;
11+
import org.springframework.context.annotation.Configuration;
512

6-
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.*;
13+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
14+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
15+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
716

8-
//@Configuration
17+
@Configuration
18+
@ConditionalOnProperty(
19+
value = "amqp.configuration.current",
20+
havingValue = "parking-lot-dlx")
921
public class DLXParkingLotAmqpConfiguration {
1022
public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
1123
public static final String QUEUE_PARKING_LOT = QUEUE_MESSAGES + ".parking-lot";
@@ -29,8 +41,8 @@ Binding parkingLotBinding() {
2941
@Bean
3042
Queue messagesQueue() {
3143
return QueueBuilder.durable(QUEUE_MESSAGES)
32-
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
33-
.build();
44+
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
45+
.build();
3446
}
3547

3648
@Bean

spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/FatalExceptionStrategyAmqpConfiguration.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
package com.baeldung.springamqp.errorhandling.configuration;
22

33
import com.baeldung.springamqp.errorhandling.errorhandler.CustomFatalExceptionStrategy;
4-
import org.springframework.amqp.core.*;
4+
import org.springframework.amqp.core.Binding;
5+
import org.springframework.amqp.core.BindingBuilder;
6+
import org.springframework.amqp.core.DirectExchange;
7+
import org.springframework.amqp.core.Queue;
8+
import org.springframework.amqp.core.QueueBuilder;
59
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
610
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
711
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
812
import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy;
913
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
14+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
1015
import org.springframework.context.annotation.Bean;
1116
import org.springframework.context.annotation.Configuration;
1217
import org.springframework.util.ErrorHandler;
@@ -15,11 +20,15 @@
1520
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
1621

1722
@Configuration
23+
@ConditionalOnProperty(
24+
value = "amqp.configuration.current",
25+
havingValue = "fatal-error-strategy")
1826
public class FatalExceptionStrategyAmqpConfiguration {
1927

2028
@Bean
21-
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
22-
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
29+
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
30+
ConnectionFactory connectionFactory,
31+
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
2332
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
2433
configurer.configure(factory, connectionFactory);
2534
factory.setErrorHandler(errorHandler());
@@ -39,7 +48,7 @@ FatalExceptionStrategy customExceptionStrategy() {
3948
@Bean
4049
Queue messagesQueue() {
4150
return QueueBuilder.durable(QUEUE_MESSAGES)
42-
.build();
51+
.build();
4352
}
4453

4554
@Bean

spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/ListenerErrorHandlerAmqpConfiguration.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,26 @@
11
package com.baeldung.springamqp.errorhandling.configuration;
22

33
import com.baeldung.springamqp.errorhandling.errorhandler.CustomErrorHandler;
4-
import org.springframework.amqp.core.*;
4+
import org.springframework.amqp.core.Binding;
5+
import org.springframework.amqp.core.BindingBuilder;
6+
import org.springframework.amqp.core.DirectExchange;
7+
import org.springframework.amqp.core.Queue;
8+
import org.springframework.amqp.core.QueueBuilder;
59
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
610
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
711
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
12+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
813
import org.springframework.context.annotation.Bean;
14+
import org.springframework.context.annotation.Configuration;
915
import org.springframework.util.ErrorHandler;
1016

1117
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
1218
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
1319

14-
//@Configuration
20+
@Configuration
21+
@ConditionalOnProperty(
22+
value = "amqp.configuration.current",
23+
havingValue = "listener-error")
1524
public class ListenerErrorHandlerAmqpConfiguration {
1625

1726
@Bean
@@ -31,7 +40,7 @@ public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(Conne
3140
@Bean
3241
Queue messagesQueue() {
3342
return QueueBuilder.durable(QUEUE_MESSAGES)
34-
.build();
43+
.build();
3544
}
3645

3746
@Bean
Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,31 @@
11
package com.baeldung.springamqp.errorhandling.configuration;
22

3-
import org.springframework.amqp.core.*;
3+
import org.springframework.amqp.core.Binding;
4+
import org.springframework.amqp.core.BindingBuilder;
5+
import org.springframework.amqp.core.DirectExchange;
6+
import org.springframework.amqp.core.FanoutExchange;
7+
import org.springframework.amqp.core.Queue;
8+
import org.springframework.amqp.core.QueueBuilder;
9+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
410
import org.springframework.context.annotation.Bean;
11+
import org.springframework.context.annotation.Configuration;
512

6-
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.*;
13+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
14+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
15+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
716

8-
//@Configuration
9-
public class DLXDefaultAmqpConfiguration {
17+
@Configuration
18+
@ConditionalOnProperty(
19+
value = "amqp.configuration.current",
20+
havingValue = "routing-dlq")
21+
public class RoutingKeyDLQAmqpConfiguration {
1022
public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
1123

1224
@Bean
1325
Queue messagesQueue() {
1426
return QueueBuilder.durable(QUEUE_MESSAGES)
15-
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
16-
.build();
27+
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
28+
.build();
1729
}
1830

1931
@Bean

spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/SimpleDLQAmqpConfiguration.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,18 @@
11
package com.baeldung.springamqp.errorhandling.configuration;
22

3-
import org.springframework.amqp.core.*;
3+
import org.springframework.amqp.core.Binding;
4+
import org.springframework.amqp.core.BindingBuilder;
5+
import org.springframework.amqp.core.DirectExchange;
6+
import org.springframework.amqp.core.Queue;
7+
import org.springframework.amqp.core.QueueBuilder;
8+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
49
import org.springframework.context.annotation.Bean;
10+
import org.springframework.context.annotation.Configuration;
511

6-
//@Configuration
12+
@Configuration
13+
@ConditionalOnProperty(
14+
value = "amqp.configuration.current",
15+
havingValue = "simple-dlq")
716
public class SimpleDLQAmqpConfiguration {
817
public static final String QUEUE_MESSAGES = "baeldung-messages-queue";
918
public static final String QUEUE_MESSAGES_DLQ = QUEUE_MESSAGES + ".dlq";
@@ -12,9 +21,9 @@ public class SimpleDLQAmqpConfiguration {
1221
@Bean
1322
Queue messagesQueue() {
1423
return QueueBuilder.durable(QUEUE_MESSAGES)
15-
.withArgument("x-dead-letter-exchange", "")
16-
.withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ)
17-
.build();
24+
.withArgument("x-dead-letter-exchange", "")
25+
.withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ)
26+
.build();
1827
}
1928

2029
@Bean
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package com.baeldung.springamqp.errorhandling.consumer;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
import org.springframework.amqp.core.Message;
6+
import org.springframework.amqp.rabbit.annotation.RabbitListener;
7+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
8+
9+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
10+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
11+
import static com.baeldung.springamqp.errorhandling.consumer.MessagesConsumer.HEADER_X_RETRIES_COUNT;
12+
13+
public class DLQCustomAmqpContainer {
14+
private static final Logger log = LoggerFactory.getLogger(DLQCustomAmqpContainer.class);
15+
private final RabbitTemplate rabbitTemplate;
16+
public static final int MAX_RETRIES_COUNT = 2;
17+
18+
public DLQCustomAmqpContainer(RabbitTemplate rabbitTemplate) {
19+
this.rabbitTemplate = rabbitTemplate;
20+
}
21+
22+
@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
23+
public void processFailedMessagesRetryHeaders(Message failedMessage) {
24+
Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT);
25+
if (retriesCnt == null)
26+
retriesCnt = 1;
27+
if (retriesCnt > MAX_RETRIES_COUNT) {
28+
log.info("Discarding message");
29+
return;
30+
}
31+
log.info("Retrying message for the {} time", retriesCnt);
32+
failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
33+
rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
34+
}
35+
}

spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/MessagesConsumer.java

Lines changed: 30 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,14 @@
77
import org.springframework.amqp.core.Message;
88
import org.springframework.amqp.rabbit.annotation.RabbitListener;
99
import org.springframework.amqp.rabbit.core.RabbitTemplate;
10-
import org.springframework.stereotype.Service;
10+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
11+
import org.springframework.context.annotation.Bean;
12+
import org.springframework.context.annotation.Configuration;
1113

12-
import static com.baeldung.springamqp.errorhandling.configuration.DLXParkingLotAmqpConfiguration.EXCHANGE_PARKING_LOT;
13-
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
14-
15-
@Service
14+
@Configuration
1615
public class MessagesConsumer {
1716
public static final String HEADER_X_RETRIES_COUNT = "x-retries-count";
18-
public static final int MAX_RETRIES_COUNT = 1;
17+
1918

2019
private static final Logger log = LoggerFactory.getLogger(MessagesConsumer.class);
2120
private final RabbitTemplate rabbitTemplate;
@@ -25,51 +24,40 @@ public MessagesConsumer(RabbitTemplate rabbitTemplate) {
2524
}
2625

2726
@RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES)
28-
public void receiveMessage(final Message message) throws BusinessException {
27+
public void receiveMessage(Message message) throws BusinessException {
2928
log.info("Received message: {}", message.toString());
3029
throw new BusinessException();
3130
}
3231

33-
//@RabbitListener(queues = DLXCustomAmqpConfiguration.QUEUE_MESSAGES_DLQ)
34-
public void processFailedMessages(final Message message) {
35-
log.info("Received failed message: {}", message.toString());
36-
}
37-
38-
//@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
39-
public void processFailedMessagesRequeue(final Message failedMessage) {
40-
log.info("Received failed message, requeueing: {}", failedMessage.toString());
41-
rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
32+
@Bean
33+
@ConditionalOnProperty(
34+
value = "amqp.configuration.current",
35+
havingValue = "simple-dlq")
36+
public SimpleDLQAmqpContainer simpleAmqpContainer() {
37+
return new SimpleDLQAmqpContainer(rabbitTemplate);
4238
}
4339

44-
//@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
45-
public void processFailedMessagesRetryHeaders(final Message failedMessage) {
46-
Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT);
47-
if (retriesCnt == null) retriesCnt = 0;
48-
log.info("Retrying message for the {} time", retriesCnt);
49-
if (retriesCnt > MAX_RETRIES_COUNT) {
50-
log.info("Discarding message");
51-
return;
52-
}
53-
failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
54-
rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
40+
@Bean
41+
@ConditionalOnProperty(
42+
value = "amqp.configuration.current",
43+
havingValue = "routing-dlq")
44+
public RoutingDLQAmqpContainer routingDLQAmqpContainer() {
45+
return new RoutingDLQAmqpContainer(rabbitTemplate);
5546
}
5647

57-
// @RabbitListener(queues = QUEUE_MESSAGES_DLQ)
58-
public void processFailedMessagesRetryWithParkingLot(final Message failedMessage) {
59-
Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT);
60-
if (retriesCnt == null) retriesCnt = 0;
61-
log.info("Retrying message for the {} time", retriesCnt);
62-
if (retriesCnt > MAX_RETRIES_COUNT) {
63-
log.info("Sending message to the parking lot queue");
64-
rabbitTemplate.send(EXCHANGE_PARKING_LOT, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
65-
return;
66-
}
67-
failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
68-
rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
48+
@Bean
49+
@ConditionalOnProperty(
50+
value = "amqp.configuration.current",
51+
havingValue = "dlx-custom")
52+
public DLQCustomAmqpContainer dlqAmqpContainer() {
53+
return new DLQCustomAmqpContainer(rabbitTemplate);
6954
}
7055

71-
//@RabbitListener(queues = QUEUE_PARKING_LOT)
72-
public void processParkingLotQueue(final Message failedMessage) {
73-
log.info("Received message in parking lot queue");
56+
@Bean
57+
@ConditionalOnProperty(
58+
value = "amqp.configuration.current",
59+
havingValue = "parking-lot-dlx")
60+
public ParkingLotDLQAmqpContainer parkingLotDLQAmqpContainer() {
61+
return new ParkingLotDLQAmqpContainer(rabbitTemplate);
7462
}
7563
}

0 commit comments

Comments
 (0)