RELATEED CONSULTING
相关咨询
选择下列产品马上在线沟通
服务时间:8:30-17:00
你可能遇到了下面的问题
关闭右侧工具栏

新闻中心

这里有您想知道的互联网营销解决方案
RabbitMQ中怎么保证消息的可靠投递

今天就跟大家聊聊有关RabbitMQ中怎么保证消息的可靠投递,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

网站建设哪家好,找创新互联!专注于网页设计、网站建设、微信开发、重庆小程序开发、集团企业网站建设等服务项目。为回馈新老客户创新互联还提供了高县免费建站欢迎大家使用!

RabbitMQ整合Spring Boot,我们只需要增加对应的starter即可

   org.springframework.boot   spring-boot-starter-amqp 

基于注解

在application.yaml的配置如下

spring:   rabbitmq:     host: myhost     port: 5672     username: guest     password: guest     virtual-host: /  log:   exchange: log.exchange   info:     queue: info.log.queue     binding-key: info.log.key   error:     queue: error.log.queue     binding-key: error.log.key   all:     queue: all.log.queue     binding-key: '*.log.key'

消费者代码如下

@Slf4j @Component public class LogReceiverListener {      /**      * 接收info级别的日志      */     @RabbitListener(             bindings = @QueueBinding(                     value = @Queue(value = "${log.info.queue}", durable = "true"),                     exchange = @Exchange(value = "${log.exchange}", type = ExchangeTypes.TOPIC),                     key = "${log.info.binding-key}"             )     )     public void infoLog(Message message) {         String msg = new String(message.getBody());         log.info("infoLogQueue 收到的消息为: {}", msg);     }      /**      * 接收所有的日志      */     @RabbitListener(             bindings = @QueueBinding(                     value = @Queue(value = "${log.all.queue}", durable = "true"),                     exchange = @Exchange(value = "${log.exchange}", type = ExchangeTypes.TOPIC),                     key = "${log.all.binding-key}"             )     )     public void allLog(Message message) {         String msg = new String(message.getBody());         log.info("allLogQueue 收到的消息为: {}", msg);     } }

生产者如下

@RunWith(SpringRunner.class) @SpringBootTest public class MsgProducerTest {      @Autowired     private AmqpTemplate amqpTemplate;     @Value("${log.exchange}")     private String exchange;     @Value("${log.info.binding-key}")     private String routingKey;      @SneakyThrows     @Test     public void sendMsg() {         for (int i = 0; i < 5; i++) {             String message = "this is info message " + i;             amqpTemplate.convertAndSend(exchange, routingKey, message);         }          System.in.read();     } }

Spring Boot针对消息ack的方式和原生api针对消息ack的方式有点不同

原生api消息ack的方式

消息的确认方式有2种

自动确认(autoAck=true)

手动确认(autoAck=false)

消费者在消费消息的时候,可以指定autoAck参数

String basicConsume(String queue, boolean autoAck, Consumer callback)

autoAck=false: RabbitMQ会等待消费者显示回复确认消息后才从内存(或者磁盘)中移出消息

autoAck=true: RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正的消费了这些消息

手动确认的方法如下,有2个参数

basicAck(long deliveryTag, boolean multiple)

deliveryTag: 用来标识信道中投递的消息。RabbitMQ  推送消息给Consumer时,会附带一个deliveryTag,以便Consumer可以在消息确认时告诉RabbitMQ到底是哪条消息被确认了。

RabbitMQ保证在每个信道中,每条消息的deliveryTag从1开始递增

multiple=true: 消息id<=deliveryTag的消息,都会被确认

myltiple=false: 消息id=deliveryTag的消息,都会被确认

消息一直不确认会发生啥?

如果队列中的消息发送到消费者后,消费者不对消息进行确认,那么消息会一直留在队列中,直到确认才会删除。

如果发送到A消费者的消息一直不确认,只有等到A消费者与rabbitmq的连接中断,rabbitmq才会考虑将A消费者未确认的消息重新投递给另一个消费者

Spring Boot中针对消息ack的方式

有三种方式,定义在AcknowledgeMode枚举类中

方式解释
NONE没有ack,等价于原生api中的autoAck=true
MANUAL用户需要手动发送ack或者nack
AUTO方法正常结束,spring boot 框架返回ack,发生异常spring boot框架返回nack

spring boot针对消息默认的ack的方式为AUTO。

在实际场景中,我们一般都是手动ack。

application.yaml的配置改为如下

spring:   rabbitmq:     host: myhost     port: 5672     username: guest     password: guest     virtual-host: /     listener:       simple:         acknowledge-mode: manual # 手动ack,默认为auto

相应的消费者代码改为

@Slf4j @Component public class LogListenerManual {      /**      * 接收info级别的日志      */     @RabbitListener(             bindings = @QueueBinding(                     value = @Queue(value = "${log.info.queue}", durable = "true"),                     exchange = @Exchange(value = "${log.exchange}", type = ExchangeTypes.TOPIC),                     key = "${log.info.binding-key}"             )     )     public void infoLog(Message message, Channel channel) throws Exception {         String msg = new String(message.getBody());         log.info("infoLogQueue 收到的消息为: {}", msg);         try {             // 这里写各种业务逻辑             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);         } catch (Exception e) {             channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);         }     } }

我们上面用到的注解,作用如下

注解作用
RabbitListener消费消息,可以定义在类上,方法上,当定义在类上时需要和RabbitHandler配合使用
QueueBinding定义绑定关系
Queue定义队列
Exchange定义交换机
RabbitHandlerRabbitListener定义在类上时,需要用RabbitHandler指定处理的方法

基于JavaConfig

既然用注解这么方便,为啥还需要JavaConfig的方式呢?

JavaConfig方便自定义各种属性,比如同时配置多个virtual host等

具体代码看GitHub把

RabbitMQ如何保证消息的可靠投递

一个消息往往会经历如下几个阶段

RabbitMQ中怎么保证消息的可靠投递

在这里插入图片描述

所以要保证消息的可靠投递,只需要保证这3个阶段的可靠投递即可

生产阶段

这个阶段的可靠投递主要靠ConfirmListener(发布者确认)和ReturnListener(失败通知)

前面已经介绍过了,一条消息在RabbitMQ中的流转过程为

producer -> rabbitmq broker cluster -> exchange -> queue ->  consumer

ConfirmListener可以获取消息是否从producer发送到broker

ReturnListener可以获取从exchange路由不到queue的消息

我用Spring Boot Starter 的api来演示一下效果

application.yaml

spring:   rabbitmq:     host: myhost     port: 5672     username: guest     password: guest     virtual-host: /     listener:       simple:         acknowledge-mode: manual # 手动ack,默认为auto  log:   exchange: log.exchange   info:     queue: info.log.queue     binding-key: info.log.key

发布者确认回调

@Component public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {      @Autowired     private MessageSender messageSender;      @Override     public void confirm(CorrelationData correlationData, boolean ack, String cause) {         String msgId = correlationData.getId();         String msg = messageSender.dequeueUnAckMsg(msgId);         if (ack) {             System.out.println(String.format("消息 {%s} 成功发送给mq", msg));         } else {             // 可以加一些重试的逻辑             System.out.println(String.format("消息 {%s} 发送mq失败", msg));         }     } }

失败通知回调

@Component public class ReturnCallback implements RabbitTemplate.ReturnCallback {      @Override     public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {         String msg = new String(message.getBody());         System.out.println(String.format("消息 {%s} 不能被正确路由,routingKey为 {%s}", msg, routingKey));     } }
@Configuration public class RabbitMqConfig {      @Bean     public ConnectionFactory connectionFactory(             @Value("${spring.rabbitmq.host}") String host,             @Value("${spring.rabbitmq.port}") int port,             @Value("${spring.rabbitmq.username}") String username,             @Value("${spring.rabbitmq.password}") String password,             @Value("${spring.rabbitmq.virtual-host}") String vhost) {         CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);         connectionFactory.setPort(port);         connectionFactory.setUsername(username);         connectionFactory.setPassword(password);         connectionFactory.setVirtualHost(vhost);         connectionFactory.setPublisherConfirms(true);         connectionFactory.setPublisherReturns(true);         return connectionFactory;     }      @Bean     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,                                          ReturnCallback returnCallback, ConfirmCallback confirmCallback) {         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);         rabbitTemplate.setReturnCallback(returnCallback);         rabbitTemplate.setConfirmCallback(confirmCallback);         // 要想使 returnCallback 生效,必须设置为true         rabbitTemplate.setMandatory(true);         return rabbitTemplate;     } }

这里我对RabbitTemplate做了一下包装,主要就是发送的时候增加消息id,并且保存消息id和消息的对应关系,因为RabbitTemplate.ConfirmCallback只能拿到消息id,并不能拿到消息内容,所以需要我们自己保存这种映射关系。在一些可靠性要求比较高的系统中,你可以将这种映射关系存到数据库中,成功发送删除映射关系,失败则一直发送

@Component public class MessageSender {      @Autowired     private RabbitTemplate rabbitTemplate;      public final Map unAckMsgQueue = new ConcurrentHashMap<>();      public void convertAndSend(String exchange, String routingKey, String message) {         String msgId = UUID.randomUUID().toString();         CorrelationData correlationData = new CorrelationData();         correlationData.setId(msgId);         rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);         unAckMsgQueue.put(msgId, message);     }      public String dequeueUnAckMsg(String msgId) {         return unAckMsgQueue.remove(msgId);     }  }

测试代码为

@RunWith(SpringRunner.class) @SpringBootTest public class MsgProducerTest {      @Autowired     private MessageSender messageSender;     @Value("${log.exchange}")     private String exchange;     @Value("${log.info.binding-key}")     private String routingKey;      /**      * 测试失败通知      */     @SneakyThrows     @Test     public void sendErrorMsg() {         for (int i = 0; i < 3; i++) {             String message = "this is error message " + i;             messageSender.convertAndSend(exchange, "test", message);         }         System.in.read();     }      /**      * 测试发布者确认      */     @SneakyThrows     @Test     public void sendInfoMsg() {         for (int i = 0; i < 3; i++) {             String message = "this is info message " + i;             messageSender.convertAndSend(exchange, routingKey, message);         }         System.in.read();     } }

先来测试失败者通知

输出为

消息 {this is error message 0} 不能被正确路由,routingKey为 {test} 消息 {this is error message 0} 成功发送给mq 消息 {this is error message 2} 不能被正确路由,routingKey为 {test} 消息 {this is error message 2} 成功发送给mq 消息 {this is error message 1} 不能被正确路由,routingKey为 {test} 消息 {this is error message 1} 成功发送给mq

消息都成功发送到broker,但是并没有被路由到queue中

再来测试发布者确认

输出为

消息 {this is info message 0} 成功发送给mq infoLogQueue 收到的消息为: {this is info message 0} infoLogQueue 收到的消息为: {this is info message 1} 消息 {this is info message 1} 成功发送给mq infoLogQueue 收到的消息为: {this is info message 2} 消息 {this is info message 2} 成功发送给mq

看完上述内容,你们对RabbitMQ中怎么保证消息的可靠投递有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注创新互联行业资讯频道,感谢大家的支持。


网站名称:RabbitMQ中怎么保证消息的可靠投递
文章位置:http://scpingwu.com/article/johppi.html