Rabbitmq如何保证不丢消息,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
成都网络公司-成都网站建设公司创新互联十载经验成就非凡,专业从事网站建设、网站设计,成都网页设计,成都网页制作,软文营销,一元广告等。十载来已成功提供全面的成都网站建设方案,打造行业特色的成都网站建设案例,建站热线:13518219792,我们期待您的来电!
背景介绍:
笔者最近研究了下rabbitmq,便很好奇它是怎么保证不丢失消息的呢?于是便整理了这篇文章来跟大家分享下,自己的理解,如有不准确的地方或者不同的意见,还请各位能够给出反馈,我们可以讨论,相互学习,相互成长。
基础知识:
在开始探讨这个问题之前,笔者还是觉得很有必要将rabbitmq的架构等基础知识回顾下,如下所示:
对于使用rabbitmq的服务来说,主要由三部分构成,它们分别是:生产者,rabbitmq,消费者。这三者之间是通过网络来进行通讯的,其中与生产者对应的是exchange,与消费者对应的是connection,而rabbitmq内部又由exchange,queue,connection三部分构成。
消息的流程:消息是由生产者生产了之后,上报给exchange,exchange绑定并存储到queue中,再传递给最终的消费者手里。
如此以来,整个过程就分成了三大场景:
场景1: 生产者与exchange的上报消息,如何保证不丢失?
对于网络通讯来说,解决丢数据最好的办法就是,消息确认机制,而rabbitmq里面是通过两个方式来保证:一种是事务机制,这个是在amqp协议层面保证的,具体操作如下所示:
RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback(), txSelect用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务,在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。(备注:采用事务机制实现会降低RabbitMQ的消息吞吐量。)
步骤为:
1.client----发送----->Tx.Select2.broker----发送----->Tx.Select-Ok(之后publish)3.client------发送----->Tx.Commit4.broker------发送---->Tx.Commit-Ok
一种是confrim机制:
原理:消息响应机制,
生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号。
confrim的优势是,它是异步的,在生产者发送完一个消息之后,不必要等这个消息的返回,就可以继续处理另外一个消息,等待消息的ack返回之后,再去处理前面发过的消息,类似于多路复用的做法。rabbitmq在收到之后,会回复ack,如果因为rabbitmq自身的问题导致的,会回复nack消息。
对于生产者来说,为了方便确认消息有没有真正到达rabbitmq端,还需要在生产者端设置超时重发,毕竟网络里面是可能丢失消息的。
confrim方式使用的API:
https://godoc.org/github.com/streadway/amqp#Channel.Confirm
场景2: 消费者从queue中获取消息如何保证不丢失?
消息在到达了rabbitmq之后,会将数据保存到queue里面,queue是存到内存里面的,不过rabbitmq提供了持久化的操作,这个策略如下所示:
1.buffer大约1M左右,写满之后,就会写到磁盘中。2.25ms超时时间,buffer不满的话,超时也会写到磁盘中。
尽管如此,也有可能会丢数据,特别是当rabbitmq在buffer没有写到磁盘的时候,就死掉了。不过rabbitmq也提供了镜像队列的方式,利用主备的方式来防止消息丢掉,不过当master和salve同时挂掉的话,还是会丢数据,只不过这种同时挂掉的概率会小很多。(笔者觉得,没有百分之百的不丢消息,只是丢消息的概率变的很低而已。)
参考文章:https://blog.csdn.net/u013256816/article/details/60875666
场景3: rabbitmq内部如何保证不丢失消息?
对于消费者来说,同样也是采用了消息响应的方式来防止消息不丢失,不过在这一层使用的是ack机制来处理,不过这里的ack可以设置成不等待ack和等待ack两种,在这里我们使用的是设置ack。
消费者对于消息的响应通常有下面三个场景:
1.消费者在接收到rabbitmq的消息之后,等处理完消息之后,会主动回复ack消息,rabbitmq在收到ack之后,便会继续给这个消费者分配下一个消息进行处理。
2.当然rabbitmq也可以回复unack消息,如此以来消息队列下一次还会继续将这个消息分配给消费者,来实现消息重处理。
3.消费者先把ack消息回复掉,然后在重新将这个消息放到rabbitmq之中,如此以来通过rabbitmq的队列特性来实现,消息的重试,这里的重试,不是一直处理这一个消息,而是要等到队列里面的消息排队到它才行。
除此之外,rabbitmq还实现了批量的概念,通过qos来设置一次性分配给一个消费者的最大数量,让消费者一次性消费一批消息,等到处理完了这一批消息,再去分配下一批消息给这一个消费者。
问题1:一旦消费者长时间不回复Ack消息或者消费者卡死了呢,这种场景如何处理?
理论上需要消费者需要实现一个超时处理机制,在一定时间内没有处理完毕,需要超时回复ack或者unack消息给rabbitmq。
问题2:就算消费者有超时机制,可是一旦消费者在发送ack给rabbitmq的时候,消息丢失,rabbitmq这个消息一直收不到响应消息的话,会怎么办呢?
rabbitmq提供了一个可以针对消费者挂掉之后的处理机制,在消费者挂掉之后,会探测出来,然后进行后续处理。
rabbitmq还有一个ttl的功能,可以针对消息队列或者单个消息设置对应的ttl值,一旦ttl超时,消息就会变为dead message, 不会再分配给消费者。在这里我们可以采用这个策略,在消息变成死消息之后,我们可以让生产者再次生产相同的消息放到rabbitmq当中,如果确定这个消息不在使用了,就直接丢弃这个消息。
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注创新互联行业资讯频道,感谢您对创新互联的支持。
本文标题:Rabbitmq如何保证不丢消息
转载注明:http://scpingwu.com/article/gchdsp.html