Skip to content

Commit 4dd386c

Browse files
committed
BAEL-3200 Error handling with Spring AMQP
1 parent e08d6d1 commit 4dd386c

13 files changed

+433
-0
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.baeldung.springamqp.errorhandling;
2+
3+
import com.baeldung.springamqp.errorhandling.producer.MessageProducer;
4+
import org.springframework.beans.factory.annotation.Autowired;
5+
import org.springframework.boot.SpringApplication;
6+
import org.springframework.boot.autoconfigure.SpringBootApplication;
7+
import org.springframework.boot.context.event.ApplicationReadyEvent;
8+
import org.springframework.context.event.EventListener;
9+
import org.springframework.scheduling.annotation.EnableScheduling;
10+
11+
@SpringBootApplication
12+
@EnableScheduling
13+
public class ErrorHandlingApp {
14+
@Autowired
15+
MessageProducer messageProducer;
16+
17+
public static void main(String[] args) {
18+
SpringApplication.run(ErrorHandlingApp.class, args);
19+
}
20+
21+
@EventListener(ApplicationReadyEvent.class)
22+
public void doSomethingAfterStartup() {
23+
messageProducer.sendMessage();
24+
}
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.baeldung.springamqp.errorhandling.configuration;
2+
3+
import org.springframework.amqp.core.*;
4+
import org.springframework.context.annotation.Bean;
5+
6+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.*;
7+
8+
//@Configuration
9+
public class DLXCustomAmqpConfiguration {
10+
public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
11+
12+
@Bean
13+
Queue messagesQueue() {
14+
return QueueBuilder.durable(QUEUE_MESSAGES)
15+
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
16+
.build();
17+
}
18+
19+
@Bean
20+
FanoutExchange deadLetterExchange() {
21+
return new FanoutExchange(DLX_EXCHANGE_MESSAGES);
22+
}
23+
24+
@Bean
25+
Queue deadLetterQueue() {
26+
return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
27+
}
28+
29+
@Bean
30+
Binding deadLetterBinding() {
31+
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
32+
}
33+
34+
@Bean
35+
DirectExchange messagesExchange() {
36+
return new DirectExchange(EXCHANGE_MESSAGES);
37+
}
38+
39+
@Bean
40+
Binding bindingMessages() {
41+
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
42+
}
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.baeldung.springamqp.errorhandling.configuration;
2+
3+
import org.springframework.amqp.core.*;
4+
import org.springframework.context.annotation.Bean;
5+
6+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.*;
7+
8+
//@Configuration
9+
public class DLXDefaultAmqpConfiguration {
10+
public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
11+
12+
@Bean
13+
Queue messagesQueue() {
14+
return QueueBuilder.durable(QUEUE_MESSAGES)
15+
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
16+
.build();
17+
}
18+
19+
@Bean
20+
FanoutExchange deadLetterExchange() {
21+
return new FanoutExchange(DLX_EXCHANGE_MESSAGES);
22+
}
23+
24+
@Bean
25+
Queue deadLetterQueue() {
26+
return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
27+
}
28+
29+
@Bean
30+
Binding deadLetterBinding() {
31+
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
32+
}
33+
34+
@Bean
35+
DirectExchange messagesExchange() {
36+
return new DirectExchange(EXCHANGE_MESSAGES);
37+
}
38+
39+
@Bean
40+
Binding bindingMessages() {
41+
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
42+
}
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package com.baeldung.springamqp.errorhandling.configuration;
2+
3+
import org.springframework.amqp.core.*;
4+
import org.springframework.context.annotation.Bean;
5+
6+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.*;
7+
8+
//@Configuration
9+
public class DLXParkingLotAmqpConfiguration {
10+
public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
11+
public static final String QUEUE_PARKING_LOT = QUEUE_MESSAGES + ".parking-lot";
12+
public static final String EXCHANGE_PARKING_LOT = QUEUE_MESSAGES + "exchange.parking-lot";
13+
14+
@Bean
15+
FanoutExchange parkingLotExchange() {
16+
return new FanoutExchange(EXCHANGE_PARKING_LOT);
17+
}
18+
19+
@Bean
20+
Queue parkingLotQueue() {
21+
return QueueBuilder.durable(QUEUE_PARKING_LOT).build();
22+
}
23+
24+
@Bean
25+
Binding parkingLotBinding() {
26+
return BindingBuilder.bind(parkingLotQueue()).to(parkingLotExchange());
27+
}
28+
29+
@Bean
30+
Queue messagesQueue() {
31+
return QueueBuilder.durable(QUEUE_MESSAGES)
32+
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
33+
.build();
34+
}
35+
36+
@Bean
37+
FanoutExchange deadLetterExchange() {
38+
return new FanoutExchange(DLX_EXCHANGE_MESSAGES);
39+
}
40+
41+
@Bean
42+
Queue deadLetterQueue() {
43+
return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
44+
}
45+
46+
@Bean
47+
Binding deadLetterBinding() {
48+
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
49+
}
50+
51+
@Bean
52+
DirectExchange messagesExchange() {
53+
return new DirectExchange(EXCHANGE_MESSAGES);
54+
}
55+
56+
@Bean
57+
Binding bindingMessages() {
58+
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
59+
}
60+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.baeldung.springamqp.errorhandling.configuration;
2+
3+
import com.baeldung.springamqp.errorhandling.errorhandler.CustomFatalExceptionStrategy;
4+
import org.springframework.amqp.core.*;
5+
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
6+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
7+
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
8+
import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy;
9+
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
10+
import org.springframework.context.annotation.Bean;
11+
import org.springframework.context.annotation.Configuration;
12+
import org.springframework.util.ErrorHandler;
13+
14+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
15+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
16+
17+
@Configuration
18+
public class FatalExceptionStrategyAmqpConfiguration {
19+
20+
@Bean
21+
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
22+
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
23+
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
24+
configurer.configure(factory, connectionFactory);
25+
factory.setErrorHandler(errorHandler());
26+
return factory;
27+
}
28+
29+
@Bean
30+
public ErrorHandler errorHandler() {
31+
return new ConditionalRejectingErrorHandler(customExceptionStrategy());
32+
}
33+
34+
@Bean
35+
FatalExceptionStrategy customExceptionStrategy() {
36+
return new CustomFatalExceptionStrategy();
37+
}
38+
39+
@Bean
40+
Queue messagesQueue() {
41+
return QueueBuilder.durable(QUEUE_MESSAGES)
42+
.build();
43+
}
44+
45+
@Bean
46+
DirectExchange messagesExchange() {
47+
return new DirectExchange(EXCHANGE_MESSAGES);
48+
}
49+
50+
@Bean
51+
Binding bindingMessages() {
52+
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
53+
}
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package com.baeldung.springamqp.errorhandling.configuration;
2+
3+
import com.baeldung.springamqp.errorhandling.errorhandler.CustomErrorHandler;
4+
import org.springframework.amqp.core.*;
5+
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
6+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
7+
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
8+
import org.springframework.context.annotation.Bean;
9+
import org.springframework.util.ErrorHandler;
10+
11+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
12+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
13+
14+
//@Configuration
15+
public class ListenerErrorHandlerAmqpConfiguration {
16+
17+
@Bean
18+
public ErrorHandler errorHandler() {
19+
return new CustomErrorHandler();
20+
}
21+
22+
@Bean
23+
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
24+
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
25+
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
26+
configurer.configure(factory, connectionFactory);
27+
factory.setErrorHandler(errorHandler());
28+
return factory;
29+
}
30+
31+
@Bean
32+
Queue messagesQueue() {
33+
return QueueBuilder.durable(QUEUE_MESSAGES)
34+
.build();
35+
}
36+
37+
@Bean
38+
DirectExchange messagesExchange() {
39+
return new DirectExchange(EXCHANGE_MESSAGES);
40+
}
41+
42+
@Bean
43+
Binding bindingMessages() {
44+
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
45+
}
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.baeldung.springamqp.errorhandling.configuration;
2+
3+
import org.springframework.amqp.core.*;
4+
import org.springframework.context.annotation.Bean;
5+
6+
//@Configuration
7+
public class SimpleDLQAmqpConfiguration {
8+
public static final String QUEUE_MESSAGES = "baeldung-messages-queue";
9+
public static final String QUEUE_MESSAGES_DLQ = QUEUE_MESSAGES + ".dlq";
10+
public static final String EXCHANGE_MESSAGES = "baeldung-messages-exchange";
11+
12+
@Bean
13+
Queue messagesQueue() {
14+
return QueueBuilder.durable(QUEUE_MESSAGES)
15+
.withArgument("x-dead-letter-exchange", "")
16+
.withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ)
17+
.build();
18+
}
19+
20+
@Bean
21+
Queue deadLetterQueue() {
22+
return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
23+
}
24+
25+
@Bean
26+
DirectExchange messagesExchange() {
27+
return new DirectExchange(EXCHANGE_MESSAGES);
28+
}
29+
30+
@Bean
31+
Binding bindingMessages() {
32+
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
33+
}
34+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package com.baeldung.springamqp.errorhandling.consumer;
2+
3+
import com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration;
4+
import com.baeldung.springamqp.errorhandling.errorhandler.BusinessException;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
import org.springframework.amqp.core.Message;
8+
import org.springframework.amqp.rabbit.annotation.RabbitListener;
9+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
10+
import org.springframework.stereotype.Service;
11+
12+
import static com.baeldung.springamqp.errorhandling.configuration.DLXParkingLotAmqpConfiguration.EXCHANGE_PARKING_LOT;
13+
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
14+
15+
@Service
16+
public class MessagesConsumer {
17+
public static final String HEADER_X_RETRIES_COUNT = "x-retries-count";
18+
public static final int MAX_RETRIES_COUNT = 1;
19+
20+
private static final Logger log = LoggerFactory.getLogger(MessagesConsumer.class);
21+
private final RabbitTemplate rabbitTemplate;
22+
23+
public MessagesConsumer(RabbitTemplate rabbitTemplate) {
24+
this.rabbitTemplate = rabbitTemplate;
25+
}
26+
27+
@RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES)
28+
public void receiveMessage(final Message message) throws BusinessException {
29+
log.info("Received message: {}", message.toString());
30+
throw new BusinessException();
31+
}
32+
33+
//@RabbitListener(queues = DLXCustomAmqpConfiguration.QUEUE_MESSAGES_DLQ)
34+
public void processFailedMessages(final Message message) {
35+
log.info("Received failed message: {}", message.toString());
36+
}
37+
38+
//@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
39+
public void processFailedMessagesRequeue(final Message failedMessage) {
40+
log.info("Received failed message, requeueing: {}", failedMessage.toString());
41+
rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
42+
}
43+
44+
//@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
45+
public void processFailedMessagesRetryHeaders(final Message failedMessage) {
46+
Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT);
47+
if (retriesCnt == null) retriesCnt = 0;
48+
log.info("Retrying message for the {} time", retriesCnt);
49+
if (retriesCnt > MAX_RETRIES_COUNT) {
50+
log.info("Discarding message");
51+
return;
52+
}
53+
failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
54+
rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
55+
}
56+
57+
// @RabbitListener(queues = QUEUE_MESSAGES_DLQ)
58+
public void processFailedMessagesRetryWithParkingLot(final Message failedMessage) {
59+
Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT);
60+
if (retriesCnt == null) retriesCnt = 0;
61+
log.info("Retrying message for the {} time", retriesCnt);
62+
if (retriesCnt > MAX_RETRIES_COUNT) {
63+
log.info("Sending message to the parking lot queue");
64+
rabbitTemplate.send(EXCHANGE_PARKING_LOT, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
65+
return;
66+
}
67+
failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
68+
rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
69+
}
70+
71+
//@RabbitListener(queues = QUEUE_PARKING_LOT)
72+
public void processParkingLotQueue(final Message failedMessage) {
73+
log.info("Received message in parking lot queue");
74+
}
75+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package com.baeldung.springamqp.errorhandling.errorhandler;
2+
3+
public class BusinessException extends Exception {
4+
}

0 commit comments

Comments
 (0)