消息手动ack + 手动重试 1234567891011121314151617181920212223242526272829303132333435363738394041/** * 消息最大重试次数 */private static final int MAX_RETRIES = 3; /** * 重试间隔(秒) */private static final long RETRY_INTERVAL = 5; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @RabbitListener(queues = RabbitMqConfig.USER_ADD_QUEUE, concurrency = "10")public void userAddReceiver(String data, Message message, Channel channel) throws IOException, InterruptedException { UserVo vo = OBJECT_MAPPER.readValue(data, UserVo.class); // 重试次数 int retryCount = 0; boolean success = false; // 消费失败并且重试次数<=重试上限次数 while (!success && retryCount < MAX_RETRIES) { retryCount++; // 具体业务逻辑 success = messageHandle(vo); // 如果失败则重试 if (!success) { String errorTip = "第" + retryCount + "次消费失败" + ((retryCount < 3) ? "," + RETRY_INTERVAL + "s后重试" : ",进入死信队列"); log.error(errorTip); Thread.sleep(RETRY_INTERVAL * 1000); } } if (success) { // 消费成功,确认 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("创建订单数据消费成功"); } else { // 重试多次之后仍失败,发送到死信队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); log.info("创建订单数据消费失败"); }}