文章目录
前言
定制开发小程序在工作中使用传输数据时,定制开发小程序可能会因为数据、定制开发小程序网络等问题,定制开发小程序导致数据发送或者接收失败;
定制开发小程序如果对此类问题没有做好处理,定制开发小程序就会存在丢失数据的问题,为此,引入了ConfirmCallback
与ReturnCallback
,来保证系统能够做到更好的数据、以及消费失败的数据做好相应的补偿;
ConfirmCallback
与ReturnCallback
也被称为Rabbitmq的消息确认机制;
有哪些问题
首先,下面为消息从生产者 ——> 消费者的流程图:
不过如果应用到生产环境中会出现两个问题:
- 生产者发出的消息可能因为种种原因,并没有发送到交换器,而生产者却不知道;
- 交换器接收到的消息,并没有发送到队列中,而生产者却不知道;
如何解决
为了解决以上两个问题,系统引入了ConfirmCallback
与ReturnCallback
:
ConfirmCallback
为发送Exchange
(交换器)时回调,成功或者失败都会触发;ReturnCallback
为路由不到队列时触发,成功则不触发;
也就是说,前者是为了监听消息是否到达了Exchange
,后者是为了监听消息是否到达了队列,如果这两个步骤遇到了问题,则生产者也好做出相应处理(例如:消息补偿,不过这并不是本篇的重点);
如果消息在消费端消费失败了怎么办?
失败就失败了,在实际场景中,数据库是需要为发送成功的消息做标记的,如果消息没有做标记(消费失败),则会采用定时任务重新发送,不过会涉及到幂等性的问题,这里会另起一篇文章:基于RabbitMQ实现最终一致性解决方案,在此不再赘述;
Demo
注入回调
@PostConstruct public void init() { //消息未送达队列触发回调 rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.error("消息发送失败,未送达队列,message:{},replyCode:{},replyText:{},exchange:{},exchange:{}", JSON.toJSONString(message), replyCode, replyText, exchange, routingKey); MqMsg msg = JSON.parseObject(new String(message.getBody()), MqMsg.class); // 更新数据库 设置消息的状态为发送失败 }); //消息进入到Exchange触发回调 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { String id = Objects.requireNonNull(correlationData.getId()); if (!ack) { log.error("消息未发送成功,返回信息:{}", cause); //设置消息的状态为发送失败 } else { // 更新数据库 设置消息的状态为发送成功 } }); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
生产者
@ResponseBody@GetMapping("/send")public String send() { UserVo userVo = new UserVo(); //组装消息内容 MessageProperties properties = new MessageProperties(); //消息唯一ID,用力防止幂等性 properties.setMessageId(userVo.getId().toString()); Message message = new Message(JSON.toJSONString(userVo).getBytes(StandardCharsets.UTF_8), properties); // 发送消息时,需要根据业务设置唯一id,发送方确认时,还需要使用唯一id去修改数据状态 rabbitTemplate.convertAndSend("demoExchange", "demoRoutingKey", message); return "发送成功";}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
消费者
@Slf4j@Component@RabbitListener(queues = "demo_data_queue")public class HelloReceiver { int status = 0; @RabbitHandler public void process(JSONObject jsonObject, Channel channel, Message message) throws Exception { // 单条消息的大小限制,一般设为0或不设置,不限制大小 int prefecthSize = 0; // 不要同时给消费端推送n条消息,一旦有n个消息还没ack,则该consumer将block掉,直到有ack 注意在自动应答下不生效 int prefecthCount = 1; // 表示是否应用于channel上,即是channel级别还是consumer级别 boolean global = false; channel.basicQos(prefecthSize,prefecthCount,global); log.info("收到消息:{}", jsonObject); Thread.sleep(10000); try { log.info("message:{}", message.getMessageProperties().getDeliveryTag()); } catch (Exception e) { status = 1; e.printStackTrace(); log.info("message:{}", message.getMessageProperties().getDeliveryTag()); } finally { // 在这里执行成功或失败 if (status == 0) { //成功消费消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } else if (status == 1) { //丢弃这条消息,如果最后一个参数设置为true的话,消息将重回队列末尾,重复消费 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
哪里不清晰的朋友欢迎留言