生产者重写 RabbitTemplate.ConfirmCallback的 confirm方法以及 returnedMessage 方法。
将 ack==false 的消息 持久化到数据库,定时扫描 DB 中投递失败的数据,重新投递到MQ中;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
|
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info(correlationData.getId() + ":发送成功"); redisTemplate.delete(correlationData.getId()); } else { log.error(correlationData.getId() + ":发送失败"); log.info("备份到DB的内容:" + redisTemplate.opsForValue().get(correlationData.getId())); try { SaveNackMessage strategy = SaveNackMessage.getStrategy(SaveNackMessage.NackTypeEnum.PRODUCER.getType()); HashMap<String, Object> map = new HashMap<>(); map.put("cause", StringUtils.isNoneBlank(cause) ? cause : StringUtils.EMPTY); map.put("ack", ack ? 1 : 0); map.put("correlationData", Objects.nonNull(correlationData) ? correlationData : StringUtils.EMPTY); saveNackMessageThread.execute(strategy.template(map)); } catch (Exception e) { log.error("记录mq发送端错误日志失败", e); } } }
|
另外除了实现confirm方法,还需要实现returnedMessage方法 即(投递消息后,交换机找不到具体的queue将会回调该方法 一般我们需要配置钉钉预警,告知开发者)
具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| @Autowired private ApplicationEventPublisher publisher;
@Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("returnedMessage 消息主体 message : {}", message); log.error("returnedMessage 描述:{}", replyText); log.error("returnedMessage 消息使用的交换器 exchange : {}", exchange); log.error("returnedMessage 消息使用的路由键 routing : {}", routingKey);
HashMap<String, Object> maps = Maps.newHashMap(); maps.put("message", message); maps.put("replyCode", replyCode); maps.put("replyText", replyText); maps.put("exchange", exchange); maps.put("routingKey", routingKey); String returnedMessage = JSON.toJSONString(maps);
SendFailNoticeEvent noticeEvent = new SendFailNoticeEvent(); noticeEvent.setLevel(1); noticeEvent.setErrorMsg( System.lineSeparator() + "producer投递消息失败;报错信息: " + returnedMessage); noticeEvent.setTalkTypeEnum(DingTalkTypeEnum.BIZ_NOTICE); publisher.publishEvent(noticeEvent); }
|
durable=true
(消费者) 需要做的 手动ack,保证业务执行完后再ack,通知mq将某条消息删除
1
| spring.rabbitmq.listener.simple.acknowledge-mode=manual
|
我的博客即将同步至腾讯云开发者社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=ocj4bhqfct36