[消息队列]
为什么写这篇文章?
博主有两位朋友分别是小 A 和小 B:
- 小 A,工作于传统软件行业(某社保局的软件外包公司),每天工作内容就是和产品聊聊需求,改改业务逻辑。再不然就是和运营聊聊天,写几个 SQL,生成下报表。又或者接到客服的通知,某某功能故障了,改改数据,然后下班部署上线。每天过的都是这种生活,技术零成长。
- 小 B,工作于某国企,虽然能接触到一些中间件技术。然而,他只会订阅/发布消息。通俗点说,就是调调 API。对为什么使用这些中间件啊?如何保证高可用啊?没有充分的认识。
庆幸的是两位朋友都很有上进心,于是博主写这篇文章,帮助他们复习一下关于消息队列中间件这块的要点
复习要点
本文大概围绕如下几点进行阐述:
- 为什么使用[消息队列]?
- 使用消息队列有什么缺点?
- [消息队列]如何选型?
- 如何保证消息队列是高可用的?
- 如何保证消息不被重复消费?
- 如何保证消费的可靠性传输?
- 如何保证消息的顺序性?
我们围绕以上七点进行阐述。需要说明一下,本文不是《消息队列从入门到精通》这种课程,因此只是提供一个复习思路,而不是去教你们怎么调用消息队列的 API。建议对消息队列不了解的人,去找点消息队列的博客看看,再看本文,收获更大
正文
1、为什么要使用[消息队列]?
分析:一个用消息队列的人,不知道为啥用,这就有点尴尬。没有复习这点,很容易被问蒙,然后就开始胡扯了。
回答:这个问题,咱只答三个最主要的应用场景(不可否认还有其他的,但是只答三个主要的),即以下六个字:解耦、异步、削峰
(1)解耦
传统模式:
传统模式的缺点:
- 系统间耦合性太强,如上图所示,系统 A 在代码中直接调用系统 B 和系统 C 的代码,如果将来 D 系统接入,系统 A 还需要修改代码,过于麻烦!
中间件模式:
中间件模式的的优点:
- 将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统 A 不需要做任何修改。
(2)异步
传统模式:
传统模式的缺点:
- 一些非必要的业务逻辑以同步的方式运行,太耗费时间。
中间件模式:
中间件模式的的优点:
- 将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度
(3)削峰
传统模式
传统模式的缺点:
- 并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常
中间件模式:
中间件模式的的优点:
- 系统 A 慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。
2、使用了消息队列会有什么缺点?
分析:一个使用了 MQ 的项目,如果连这个问题都没有考虑过,就把 MQ 引进去了,那就给自己的项目带来了风险。我们引入一个技术,要对这个技术的弊端有充分的认识,才能做好预防。要记住,不要给公司挖坑!
回答:回答也很容易,从以下两个个角度来答
- 系统可用性降低:你想啊,本来其他系统只要运行好好的,那你的系统就是正常的。现在你非要加个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性降低
- 系统复杂性增加:要多考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。因此,需要考虑的东西更多,系统复杂性增大。
但是,我们该用还是要用的。
3、消息队列如何选型?
先说一下,博主只会 ActiveMQ,RabbitMQ,RocketMQ,Kafka,对什么 ZeroMQ 等其他 MQ 没啥理解,因此只能基于这四种 MQ 给出回答。
分析:既然在项目中用了 MQ,肯定事先要对业界流行的 MQ 进行调研,如果连每种 MQ 的优缺点都没了解清楚,就拍脑袋依据喜好,用了某种 MQ,还是给项目挖坑。如果面试官问:"你为什么用这种 MQ?。"你直接回答"领导决定的。"这种回答就很 LOW 了。还是那句话,不要给公司挖坑。
回答:首先,咱先上 ActiveMQ 的社区,看看该 MQ 的更新频率:
- Apache ActiveMQ 5.15.3 Release
- Christopher L. Shannon posted on Feb 12, 2018
- Apache ActiveMQ 5.15.2 Released
- Christopher L. Shannon posted on Oct 23, 2017
- Apache ActiveMQ 5.15.0 Released
- Christopher L. Shannon posted on Jul 06, 2017
- 省略以下记录
- ...
我们可以看出,ActiveMq 几个月才发一次版本,据说研究重心在他们的下一代产品 Apollo。
接下来,我们再去 RabbitMQ 的社区去看一下,RabbitMQ 的更新频率
- RabbitMQ 3.7.3 release 30 January 2018
- RabbitMQ 3.6.15 release 17 January 2018
- RabbitMQ 3.7.2 release23 December 2017
- RabbitMQ 3.7.1 release21 December 2017
- 省略以下记录
- ...
我们可以看出,RabbitMQ 版本发布比 ActiveMq 频繁很多。至于 RocketMQ 和 kafka 就不带大家看了,总之也比 ActiveMQ 活跃的多。详情,可自行查阅。
再来一个性能对比表
特性 | ActiveMQ | RabbitMQ | RocketMQ | kafka |
---|---|---|---|---|
开发语言 | Java | erlang | Java | scala |
单机吞吐量 | 万级 | 万级 | 10 万级 | 10 万级 |
时效性 | ms 级 | us 级 | ms 级 | ms 级以内 |
可用性 | 高(主从架构) | 高(主从架构) | 非常高(分布式架构) | 非常高(分布式架构) |
功能特性 | 成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好 | 基于 erlang 开发,所以并发能力很强,性能极其好,延时很低;管理界面较丰富 | MQ 功能比较完备,扩展性佳 | 只支持主要的 MQ 功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广。 |
综合上面的材料得出以下两点:
(1)中小型软件公司,建议选 RabbitMQ.一方面,erlang 语言天生具备高并发的特性,而且他的管理界面用起来十分方便。正所谓,成也萧何,败也萧何!他的弊端也在这里,虽然 RabbitMQ 是开源的,然而国内有几个能定制化开发 erlang 的程序员呢?所幸,RabbitMQ 的社区十分活跃,可以解决开发过程中遇到的 bug,这点对于中小型公司来说十分重要。不考虑 rocketmq 和 kafka 的原因是,一方面中小型软件公司不如互联网公司,数据量没那么大,选消息中间件,应首选功能比较完备的,所以 kafka 排除。不考虑 rocketmq 的原因是,rocketmq 是阿里出品,如果阿里放弃维护 rocketmq,中小型公司一般抽不出人来进行 rocketmq 的定制化开发,因此不推荐。
(2)大型软件公司,根据具体使用在 rocketMq 和 kafka 之间二选一。一方面,大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量。针对 rocketMQ,大型软件公司也可以抽出人手对 rocketMQ 进行定制化开发,毕竟国内有能力改 Java 源码的人,还是相当多的。至于 kafka,根据业务场景选择,如果有日志采集功能,肯定是首选 kafka 了。具体该选哪个,看使用场景。
4、如何保证消息队列是高可用的?
分析:在第二点说过了,引入消息队列后,系统的可用性下降。在生产中,没人使用单机模式的消息队列。因此,作为一个合格的程序员,应该对消息队列的高可用有很深刻的了解。如果面试的时候,面试官问,你们的消息中间件如何保证高可用的?你的回答只是表明自己只会订阅和发布消息,面试官就会怀疑你是不是只是自己搭着玩,压根没在生产用过。请做一个爱思考,会思考,懂思考的程序员。
回答:这问题,其实要对消息队列的集群模式要有深刻了解,才好回答。
以 rcoketMQ 为例,他的集群就有多 master 模式、多 master 多 slave 异步复制模式、多 master 多 slave 同步双写模式。多 master 多 slave 模式部署架构图(网上找的,偷个懒,懒得画):
其实博主第一眼看到这个图,就觉得和 kafka 好像,只是 NameServer 集群,在 kafka 中是用 zookeeper 代替,都是用来保存和发现 master 和 slave 用的。通信过程如下:
Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Broker 发送心跳。Producer 只能将消息发送到 Broker master,但是 Consumer 则不一样,它同时和提供 Topic 服务的 Master 和 Slave 建立长连接,既可以从 Broker Master 订阅消息,也可以从 Broker Slave 订阅消息。
那么 kafka 呢,为了对比说明直接上 kafka 的拓补架构图(也是找的,懒得画)
如上图所示,一个典型的 Kafka 集群中包含若干 Producer(可以是 Web 前端产生的 Page View,或者是服务器日志,系统 CPU、Memory 等),若干 broker(Kafka 支持水平扩展,一般 broker 数量越多,集群吞吐率越高),若干 Consumer Group,以及一个 Zookeeper 集群。Kafka 通过 Zookeeper 管理集群配置,选举 leader,以及在 Consumer Group 发生变化时进行 rebalance。Producer 使用 push 模式将消息发布到 broker,Consumer 使用 pull 模式从 broker 订阅并消费消息。
至于 rabbitMQ,也有普通集群和镜像集群模式,自行去了解,比较简单,两小时即懂。
要求,在回答高可用的问题时,应该能逻辑清晰的画出自己的 MQ 集群架构或清晰的叙述出来。
5、如何保证消息不被重复消费?
分析:这个问题其实换一种问法就是,如何保证消息队列的幂等性?这个问题可以认为是消息队列领域的基本问题。换句话来说,是在考察你的设计能力,这个问题的回答可以根据具体的业务场景来答,没有固定的答案。
回答:先来说一下为什么会造成重复消费?
其实无论是那种消息队列,造成重复消费原因其实都是类似的。正常情况下,消费者在消费消息时候,消费完毕后,会发送一个确认信息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。只是不同的消息队列发送的确认信息形式不同,例如 RabbitMQ 是发送一个 ACK 确认消息,RocketMQ 是返回一个 CONSUME_SUCCESS 成功标志,kafka 实际上有个 offset 的概念,简单说一下(如果还不懂,出门找一个 kafka 入门到精通教程),就是每一个消息都有一个 offset,kafka 消费过消息后,需要提交 offset,让消息队列知道自己已经消费过了。那造成重复消费的原因?,就是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者。
如何解决?这个问题针对业务场景来答分以下几点
(1)比如,你拿到这个消息做数据库的 insert 操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
(2)再比如,你拿到这个消息做 Redis 的 set 的操作,那就容易了,不用解决,因为你无论 set 几次结果都是一样的,set 操作本来就算幂等操作。
(3)如果上面两种情况还不行,上大招。准备一个第三方介质,来做消费记录。以 Redis 为例,给消息分配一个全局 id,只要消费过该消息,将 <id,message> 以 K-V 形式写入 Redis。那消费者开始消费前,先去 Redis 中查询有没消费记录即可。
6、如何保证消费的可靠性传输?
分析:我们在使用消息队列的过程中,应该做到消息不能多消费,也不能少消费。如果无法做到可靠性传输,可能给公司带来千万级别的财产损失。同样的,如果可靠性传输在使用过程中,没有考虑到,这不是给公司挖坑么,你可以拍拍屁股走了,公司损失的钱,谁承担。还是那句话,认真对待每一个项目,不要给公司挖坑。
回答:其实这个可靠性传输,每种 MQ 都要从三个角度来分析:生产者弄丢数据、消息队列弄丢数据、消费者弄丢数据
RabbitMQ
(1)生产者丢数据
从生产者弄丢数据这个角度来看,RabbitMQ 提供 transaction 和 confirm 模式来确保生产者不丢消息。
transaction 机制就是说,发送消息前,开启事物(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事物(channel.txCommit())。
然而缺点就是吞吐量下降了。因此,按照博主的经验,生产上用 confirm 模式的居多。一旦 channel 进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ 就会发送一个 Ack 给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了。如果 rabiitMQ 没能处理该消息,则会发送一个 Nack 消息给你,你可以进行重试操作。处理 Ack 和 Nack 的代码如下所示(说好不上代码的,偷偷上了):
- channel.addConfirmListener(new ConfirmListener() {
- @Override
- public void handleNack(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("nack: deliveryTag = "+deliveryTag+" multiple: "+multiple);
- }
- @Override
- public void handleAck(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("ack: deliveryTag = "+deliveryTag+" multiple: "+multiple);
- }
- });
(2)消息队列丢数据
处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和 confirm 机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个 Ack 信号。这样,如果消息持久化磁盘之前,rabbitMQ 阵亡了,那么生产者收不到 Ack 信号,生产者会自动重发。
那么如何持久化呢,这里顺便说一下吧,其实也很容易,就下面两步
- 将 queue 的持久化标识 durable 设置为 true,则代表是一个持久的队列
- 发送消息的时候将 deliveryMode=2
这样设置以后,rabbitMQ 就算挂了,重启后也能恢复数据
(3)消费者丢数据
消费者丢数据一般是因为采用了自动确认消息模式。这种模式下,消费者会自动确认收到信息。这时 rahbitMQ 会立即将消息删除,这种情况下如果消费者出现异常而没能处理该消息,就会丢失该消息。
至于解决方案,采用手动确认消息即可。
kafka
这里先引一张 kafka Replication 的数据流向图
Producer 在发布消息到某个 Partition 时,先通过 ZooKeeper 找到该 Partition 的 Leader,然后无论该 Topic 的 Replication Factor 为多少(也即该 Partition 有多少个 Replica),Producer 只将该消息发送到该 Partition 的 Leader。Leader 会将该消息写入其本地 Log。每个 Follower 都从 Leader 中 pull 数据。
针对上述情况,得出如下分析
(1)生产者丢数据
在 kafka 生产中,基本都有一个 leader 和多个 follwer。follwer 会去同步 leader 的信息。因此,为了避免生产者丢数据,做如下两点配置
- 第一个配置要在 producer 端设置 acks=all。这个配置保证了,follwer 同步完成后,才认为消息发送成功。
- 在 producer 端设置 retries=MAX,一旦写入失败,这无限重试
(2)消息队列丢数据
针对消息队列丢数据的情况,无外乎就是,数据还没同步,leader 就挂了,这时 zookpeer 会将其他的 follwer 切换为 leader,那数据就丢失了。针对这种情况,应该做两个配置。
- replication.factor 参数,这个值必须大于 1,即要求每个 partition 必须有至少 2 个副本
- min.insync.replicas 参数,这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系
这两个配置加上上面生产者的配置联合起来用,基本可确保 kafka 不丢数据
(3)消费者丢数据
这种情况一般是自动提交了 offset,然后你处理程序过程中挂了。kafka 以为你处理好了。再强调一次 offset 是干嘛的
offset:指的是 kafka 的 topic 中的每个消费组消费的下标。简单的来说就是一条消息对应一个 offset 下标,每次消费数据的时候如果提交 offset,那么下次消费就会从提交的 offset 加一那里开始消费。
比如一个 topic 中有 100 条数据,我消费了 50 条并且提交了,那么此时的 kafka 服务端记录提交的 offset 就是 49(offset 从 0 开始),那么下次消费的时候 offset 就从 50 开始消费。
解决方案也很简单,改成手动提交即可。
ActiveMQ 和 RocketMQ
大家自行查阅吧
7、如何保证消息的顺序性?
分析:其实并非所有的公司都有这种业务需求,但是还是对这个问题要有所复习。
回答:针对这个问题,通过某种算法,将需要保持先后顺序的消息放到同一个消息队列中(kafka 中就是 partition,rabbitMq 中就是 queue)。然后只用一个消费者去消费该队列。
有的人会问:那如果为了吞吐量,有多个消费者去消费怎么办?
这个问题,没有固定回答的套路。比如我们有一个微博的操作,发微博、写评论、删除微博,这三个异步操作。如果是这样一个业务场景,那只要重试就行。比如你一个消费者先执行了写评论的操作,但是这时候,微博都还没发,写评论一定是失败的,等一段时间。等另一个消费者,先执行写评论的操作后,再执行,就可以成功。
总之,针对这个问题,我的观点是保证入队有序就行,出队以后的顺序交给消费者自己去保证,没有固定套路。
一些其他的相关连接:
https://www.cnblogs.com/williamjie/p/9481774.html RabbitMQ 基础概念详细介绍
https://blog.csdn.net/anzhsoft/article/details/19563091 RabbitMQ 消息队列
https://segmentfault.com/a/1190000016351345 https://blog.csdn.net/qq_26656329/article/details/77891793 RabbitMQ 参数调优
https://www.cnblogs.com/williamjie/p/9481780.html
当下比较知名的消息引擎,包括:
- ZeroMQ
- 推特的 Distributedlog
- ActiveMQ:Apache 旗下的老牌消息引擎
- RabbitMQ、Kafka:AMQP 的默认实现。
- RocketMQ
- Artemis:Apache 的 ActiveMQ 下的子项目
- Apollo:同样为 Apache 的 ActiveMQ 的子项目的号称下一代消息引擎
- 商业化的消息引擎 IronMQ
- 以及实现了 JMS(Java Message Service)标准的 OpenMQ
1.解耦
解耦是消息队列要解决的最本质问题。
2.最终一致性
最终一致性指的是两个系统的状态保持一致,要么都成功,要么都失败。
最终一致性不是消息队列的必备特性,但确实可以依靠消息队列来做最终一致性的事情。
2.广播
消息队列的基本功能之一是进行广播。
有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,是下游的事情,无疑极大地减少了开发和联调的工作量。
3.错峰与流控
典型的使用场景就是秒杀业务用于流量削峰场景。
1.ActiveMQ
优点
- 单机吞吐量:万级
- topic 数量都吞吐量的影响:
- 时效性:ms 级
- 可用性:高,基于主从架构实现高可用性
- 消息可靠性:有较低的概率丢失数据
- 功能支持:MQ 领域的功能极其完备
缺点:
官方社区现在对 ActiveMQ 5.x 维护越来越少,较少在大规模吞吐的场景中使用。
2.Kafka
号称大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开 Kafka,这款为大数据而生的消息中间件,以其百万级 TPS 的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥着举足轻重的作用。
Apache Kafka 它最初由 LinkedIn 公司基于独特的设计实现为一个分布式的提交日志系统( a distributed commit log),之后成为 Apache 项目的一部分。
目前已经被 LinkedIn,Uber, Twitter, Netflix 等大公司所采纳。
优点
- 性能卓越,单机写入 TPS 约在百万条/秒,最大的优点,就是吞吐量高。
- 时效性:ms 级
- 可用性:非常高,kafka 是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
- 消费者采用 Pull 方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;
- 有优秀的第三方 Kafka Web 管理界面 Kafka-Manager;
- 在日志领域比较成熟,被多家公司和多个开源项目使用;
- 功能支持:功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用
缺点:
- Kafka 单机超过 64 个队列/分区,Load 会发生明显的飙高现象,队列越多,load 越高,发送消息响应时间变长
- 使用短轮询方式,实时性取决于轮询间隔时间;
- 消费失败不支持重试;
- 支持消息顺序,但是一台代理宕机后,就会产生消息乱序;
- 社区更新较慢;
3.RabbitMQ
RabbitMQ 2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。
RabbitMQ 优点:
- 由于 erlang 语言的特性,mq 性能较好,高并发;
- 吞吐量到万级,MQ 功能比较完备
- 健壮、稳定、易用、跨平台、支持多种语言、文档齐全;
- 开源提供的管理界面非常棒,用起来很好用
- 社区活跃度高;
RabbitMQ 缺点:
- erlang 开发,很难去看懂源码,基本职能依赖于开源社区的快速维护和修复 bug,不利于做二次开发和维护。
- RabbitMQ 确实吞吐量会低一些,这是因为他做的实现机制比较重。
- 需要学习比较复杂的接口和协议,学习和维护成本较高。
4.RocketMQ
RocketMQ 出自 阿里公司的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进。
RocketMQ 在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog 分发等场景。
RocketMQ 优点:
- 单机吞吐量:十万级
- 可用性:非常高,分布式架构
- 消息可靠性:经过参数优化配置,消息可以做到 0 丢失
- 功能支持:MQ 功能较为完善,还是分布式的,扩展性好
- 支持 10 亿级别的消息堆积,不会因为堆积导致性能下降
- 源码是 Java,我们可以自己阅读源码,定制自己公司的 MQ,可以掌控
RocketMQ 缺点:
- 支持的客户端语言不多,目前是 Java 及 c++,其中 c++ 不成熟;
- 社区活跃度一般
- 没有在 mq 核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码
四、消息队列选择建议
1.Kafka
Kafka 主要特点是基于 Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。
大型公司建议可以选用,如果有日志采集功能,肯定是首选 kafka 了。
2.RocketMQ
天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。
RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双 11 已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择 RocketMQ。
3.RabbitMQ
RabbitMQ :结合 erlang 语言本身的并发优势,性能较好,社区活跃度也比较高,但是不利于做二次开发和维护。不过,RabbitMQ 的社区十分活跃,可以解决开发过程中遇到的 bug。
如果你的数据量没有那么大,小公司优先选择功能比较完备的 RabbitMQ。
🐶 你走,我不送你。你来,风雨无阻,我去接你。