Emergency Situation: Production services encounter RabbitMQ message backlog, affecting normal business operations.
When the following situations occur in the production environment, immediate action is required:
When facing urgent situations, these two methods can provide immediate relief:
Implementation Method:
Advantages:
Limitations:
Code Example:
// Increase consumer thread count
@RabbitListener(queues = "order.queue", concurrency = "5-20")
public void processOrder(OrderMessage message) {
// Business processing logic
orderService.processOrder(message);
}
Implementation Methods:
Method A: Queue Purging
# Use management API to purge queue
curl -i -u admin:password -H "content-type:application/json" \
-XDELETE http://localhost:15672/api/queues/vhost/queue-name/contents
Method B: Set Message TTL
// Set TTL for new messages
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue")
.ttl(60000) // 60 seconds TTL
.build();
}
Method C: Batch Consume and Acknowledge
@RabbitListener(queues = "order.queue")
public void batchClearMessages(
@Payload List<OrderMessage> messages,
@Header(AmqpHeaders.DELIVERY_TAG) List<Long> deliveryTags,
Channel channel) throws IOException {
// Batch acknowledge without processing
for (Long deliveryTag : deliveryTags) {
channel.basicAck(deliveryTag, false);
}
}
⚠️ Warning: These methods may cause data loss. Use only when data loss is acceptable.
Core Concept: Separate message consumption from business processing
Implementation Steps:
@RabbitListener(queues = "order.queue")
public void receiveMessage(OrderMessage message) {
// Store message temporarily
messageBuffer.offer(message);
// Acknowledge immediately
}
@Async
@Scheduled(fixedDelay = 1000)
public void processBufferedMessages() {
while (!messageBuffer.isEmpty()) {
OrderMessage message = messageBuffer.poll();
if (message != null) {
orderService.processOrder(message);
}
}
}
Advantages:
Core Concept: Use message TTL to transfer backlogged messages to dead letter queue for off-peak processing
Configuration:
@Configuration
public class RabbitMQConfig {
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue")
.ttl(300000) // 5 minutes TTL
.deadLetterExchange("order.dlx")
.deadLetterRoutingKey("order.dlq")
.build();
}
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("order.dlq").build();
}
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("order.dlx");
}
}
Dead Letter Queue Consumer:
@RabbitListener(queues = "order.dlq")
public void processExpiredMessages(OrderMessage message) {
// Process expired messages during off-peak hours
orderService.processOrder(message);
}
Advantages:
Scenario | Recommended Solution | Pros | Cons |
---|---|---|---|
Extreme Emergency | Scale Consumers | Immediate effect | Resource intensive |
Data Loss Acceptable | Clear Messages | Quick relief | Data loss risk |
Production Stability | Decoupled Processing | Safe and reliable | Implementation complexity |
Peak Load Management | TTL + DLQ | Automatic handling | Delayed processing |
@Component
public class QueueMonitor {
@Value("${rabbitmq.queue.threshold:1000}")
private int alertThreshold;
@Scheduled(fixedDelay = 30000)
public void checkQueueDepth() {
int messageCount = rabbitAdmin.getQueueProperties("order.queue")
.getMessageCount();
if (messageCount > alertThreshold) {
alertService.sendAlert("Queue depth exceeded threshold: " + messageCount);
}
}
}
@Component
public class DynamicConsumerManager {
public void scaleConsumers(String queueName, int targetConsumerCount) {
SimpleMessageListenerContainer container =
(SimpleMessageListenerContainer) registry.getListenerContainer(queueName);
container.setConcurrentConsumers(targetConsumerCount);
container.setMaxConcurrentConsumers(targetConsumerCount * 2);
}
}
@Component
public class MessageProcessor {
private final CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("messageProcessor");
public void processMessage(OrderMessage message) {
circuitBreaker.executeSupplier(() -> {
return orderService.processOrder(message);
});
}
}
Recommended Approach:
Key Takeaways:
The choice of solution depends on your specific requirements for data consistency, processing latency, and system complexity. In most production scenarios, a combination of decoupled processing and TTL + dead letter queue provides the best balance of reliability and performance.