消息队列
写在前面的话
网上Java的资料零零散散,甚至有一些错误,作者希望能结合自己的实际开发经验和面试经验,对Spring知识体系进行系统梳理。
本文参考主要【引用】中的内容,并结合自己的日常积累,欢迎留言交流指正。
为什么使用消息队列?
(1)解耦
:可以在多个系统之间进行解耦,将原本通过网络之间的调用的方式改为使用MQ进行消息的异步通讯,只要该操作不是需要同步的,就可以改为使用MQ进行不同系统之间的联系,这样项目之间不会存在耦合,系统之间不会产生太大的影响,就算一个系统挂了,也只是消息挤压在MQ里面没人进行消费而已,不会对其他的系统产生影响。
我用线程,线程池去做不是一样的么?
你一个订单流程,你扣积分,扣优惠券,发短信,扣库存。。。等等这么多业务要调用这么多的接口,每次加一个你要调用一个接口然后还要重新发布系统,而且加消息队列出问题易于分离定位.
(2)异步
:加入一个操作设计到好几个步骤,这些步骤之间不需要同步完成,比如客户去创建了一个订单,还要去客户轨迹系统添加一条轨迹、去库存系统更新库存、去客户系统修改客户的状态等等。这样如果这个系统都直接进行调用,那么将会产生大量的时间,这样对于客户是无法接收的;并且像添加客户轨迹这种操作是不需要去同步操作的,如果使用MQ将客户创建订单时,将后面的轨迹、库存、状态等信息的更新全都放到MQ里面然后去异步操作,这样就可加快系统的访问速度,提供更好的客户体验。
应用:发送邮件、短信、微信推送
(3)削峰
:一个系统访问流量有高峰时期,也有低峰时期,比如说,中午整点有一个抢购活动等等。比如系统平时流量并不高,一秒钟只有100多个并发请求,系统处理没有任何压力,一切风平浪静,到了某个抢购活动时间,系统并发访问了剧增,比如达到了每秒5000个并发请求,而我们的系统每秒只能处理2000个请求,那么由于流量太大,我们的系统、数据库可能就会崩溃。这时如果使用MQ进行流量削峰,将用户的大量消息直接放到MQ里面,然后我们的系统去按自己的最大消费能力去消费这些消息,就可以保证系统的稳定,只是可能要跟进业务逻辑,给用户返回特定页面或者稍后通过其他方式通知其结果。
技术选型?
RabbitMQ
==优点==
- 支持多种语言、文档齐全;
- 并发性能高,性能极其好,延时很低,达到微秒级;
- 管理界面 较丰富,在互联网公司也有较大规模的应用,社区活跃度高;
- 有很好的消息异常机制、消息确认机制、持久化机制,可靠性高。
- 如果 消息异常,RabbitMQ提供消息跟踪机制,使用者可以找出发生了什么;队列可以在集群中的机器上 进行镜像,以确保在硬件问题下还保证 消息安全;
- 管理界面较丰富,在互联网公司也有较大规模的应用,社区活跃度高。
==缺点==
基于 erlang 开发,很难进行二次定制开发
吞吐量方面虽然稍逊于 Kafka 和 RocketMQ
RocketMQ
==优点==
Java语言开发,代码我们可以直接阅读,便于二次开发
- 吞吐量较好,支持千万级的吞吐量。
- 可用性高,支持分布式。
- 支持分布式事务
==缺点==
- 支持的客户端语言 不多,目前是Java 及 C++,其中C++ 还不成熟;
- RocketMQ 社区关注度及成熟度也不及前两者;
- 没有 Web 管理界面,提供了一个 CLI (命令行界面) 管理工具带来 查询、管理 和诊断各种问题;
- 没有在 MQ 核心里实现 JMS 等接口,有些系统要迁移需要修改大量代码;
Kafka
==优点==
但是提供超高的吞吐量,ms 级的延迟
- 极高的可用性以及可靠性,而且分布式可以任意扩展。
- 如果是大数据领域的实时数据同步、实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题。
==缺点==
- 重复消息。Kafka保证每条消息至少送达一次,虽然几率很小,但一条消息可能被送达多次。
- 支持消息顺序,但是一台代理宕机后,就会产生消息乱序;
- 复杂性。Kafka需要Zookeeper的支持,Topic一般需要人工创建,部署和维护比一般MQ成本更高。
ActiveMQ
==缺点==
社区活跃度不及 RabbitMQ 高;
根据其他用户反馈,会出莫名其妙的问题,会丢失消息;
目前重心放到 activemq 6.0 产品 Apollo,对 5.x 的维护较少;
业务场景落地
在API模块,在接口正确返回时需要需要干几件事情:
- 图像存储(耗时20ms)
- 扣减剩余次数(耗时100ms)
- 日志记录(耗时80ms)
- 接口统计(耗时50ms)
- 邮件发送(耗时几秒)。
那你的流程走完了,你不用管别人是否成功么?比如你下单了积分没加,优惠券没扣怎么办?
问题是个好问题,但是没必要考虑,业务系统本身就是自己的开发人员维护的,你积分扣失败关我下单的什么事情?你管好自己下单系统的就好了。
Tip:话是这么说,但是这其实是用了消息队列的一个缺点,涉及到分布式事务的知识点,把下单,优惠券,积分。。。都放在一个事务里面一样,要成功一起成功,要失败一起失败。
集群崩溃,如何保证数据不丢失?
持久化队列
@RabbitListener(queuesToDeclare = @Queue(name = "hello",durable = "true") )
但是给hello队列做持久化,而hello1不做,并重启rabbitmq
可以看到重启后,hello队列还在,hello1队列消失了,但是原本hello中的一条消息也没有保存下来。所以在这边我们仅仅做到了消息队列的持久化,还没有做消息持久化。
消息持久化
RabbitMQ遵循AMQP协议,使用RabbitTemplate 默认消息是持久化的,但只有在队列也是持久化时才有作用.RabbitMQ会将消息写入磁盘上的一个持久化日志文件
正常消费,如何保障消息不会丢失
生产者端
RabbitMQ 提供了两种可靠性的确认策略 Confrim / Transaction.
Confrim
异步确认模式:发送线程不会立即得到MQ反馈结果,发送后通过callback确认成功失败,类似线程池,效率高
发送线程:由于是异步确认模式,当RabbitMQ Server突然失联,发送线程仍会继续发送多条消息,之后发现链接断开,抛出异常
假设RabbitMQ Server 接收500挂掉
发送线程:700
实际接收 RabbitMQ Server:500
callback线程 失败:200(预期 未具体测试)
缺点:发送线程认为已经发送成功,但是却失败了,反馈结果只能通callback获得,多线程问题,如未处理callback,则消息丢失
优点:性能好
消息确认模式在SpringBoot中的使用:https://blog.csdn.net/ABIGJUN/article/details/93385838
Transaction
事务确认模式:
发送线程会立即得到MQ反馈结果,同一线程中,多个发送阻塞进行,同db Transaction一样支持失败回滚等,效率高
发送线程:由于是事务确认模式,当RabbitMQ Server突然失联,发送线程得不到当前正在发送消息的回执,直接抛出异常
假设RabbitMQ Server 接收500挂掉
发送线程:500
实际接收 RabbitMQ Server:500
缺点:同步发送,逐条确认,效率低
优点:同步发送,发送线程可以立即得到反馈结果,对于主线程消息不丢失
队列
RabbitMQ遵循AMQP协议,使用RabbitTemplate 默认消息是持久化的,但只有在队列也是持久化时才有作用.RabbitMQ会将消息写入磁盘上的一个持久化日志文件.
自定义的Rabbit工具类,发送消息的时候将deliveryMode=2
消费者端
关闭自动确认机制,设置消息的ack,当消费者消费一个消息的时候,会返回给rabbitmq对应queue一个ack消息,这样就保证了消息消费完成.
如何保证消息的幂等性
场景:
一般消息队列的使用,我们都是有重试机制的,就是说我下游的业务发生异常了,我会抛出异常并且要求你重新发一次。
我这个活动这里发生错误,你要求重发肯定没问题。但是大家仔细想一下问题在哪里?
是的,不止你一个人监听这个消息啊,还有别的服务也在监听,他们也会失败啊,他一失败他也要求重发,但是你这里其实是成功的,重发了,你的钱不就加了两次了?
解决
在消息生产
时,MQ内部针对每条生产者发送的消息生成一个唯一,作为去重的依据(消息投递失败并重传),避免重复的消息进入队列;
在消息消费
时,要求消息体中必须要有一个bizId(对于同一业务全局唯一,如支付ID、订单ID、帖子ID等)作为去重的依据,避免同一条消息被重复消费。
这个问题针对业务场景来答分以下几点:
1.比如,你拿到这个消息做数据库的insert操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
2.再比如,你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。
3.如果上面两种情况还不行,上大招。准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。
如何保证消费顺序性
场景1:多个consumer
一个queue,多个consumer,这不明显乱了,如下代码:
@Component
public class WorkConsumer {
@RabbitListener(queuesToDeclare = @Queue(name = "work") )
// @RabbitHandler // 不需要此注解了
public void receiver1(String message){
System.out.println("receiver1:" + message);
}
@RabbitListener(queuesToDeclare = @Queue(name = "work") )
public void receiver2(String message){
System.out.println("receiver2:" + message);
}
}
生产者根据商品ID计算出hash值,对队列数取余,就可以让相同id会压到同一队列;拆分多个queue,每个queue一个consumer,就是多一些queue而已,确实是麻烦点,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理.
场景2:多线程
有几百万消息持续积压几小时,说说怎么解决?
场景1:大量消息在 mq 里积压了几个小时了还没解决
现象
你看这问法,其实本质针对的场景,都是说可能你的消费端出了问题,不消费了;或者消费的速度极其慢。接着就坑爹了,可能你的消息队列集群的磁盘都快写满了,都没人消费,这个时候怎么办?或者是这整个就积压了几个小时,你这个时候怎么办?或者是你积压的时间太长了,导致比如 RabbitMQ 设置了消息过期时间后就没了怎么办?
解决
一个消费者一秒是 1000 条,一秒 3 个消费者是 3000 条,一分钟就是 18 万条。所以如果你积压了几百万到上千万的数据,即使消费者恢复了,也需要大概 1 小时的时间才能恢复过来。
一般这个时候,只能临时紧急扩容了,具体操作步骤和思路如下:
- 先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉。
- 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
- 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
- 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。==这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。==
- 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
场景2:mq 中的消息过期失效了
现象:
假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。
解决
这个情况下,就不是说要增加 consumer 消费积压的消息,因为实际上没啥积压,而是丢了大量的消息。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。
假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。
场景3:mq 都快写满了
如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。
其他
死信队列和延迟队列的使用
死信消息:
- 消息被拒绝(Basic.Reject或Basic.Nack)并且设置 requeue 参数的值为 false
- 消息过期了
- 队列达到最大的长度
过期消息:
在 rabbitmq 中存在2种方可设置消息的过期时间,第一种通过对队列进行设置,这种设置后,该队列中所有的消息都存在相同的过期时间,第二种通过对消息本身进行设置,那么每条消息的过期时间都不一样。如果同时使用这2种方法,那么以过期时间小的那个数值为准。当消息达到过期时间还没有被消费,那么那个消息就成为了一个 死信 消息。
队列设置:在队列申明的时候使用 x-message-ttl 参数,单位为 毫秒
单个消息设置:是设置消息属性的 expiration 参数的值,单位为 毫秒
延时队列
:在rabbitmq中不存在延时队列,但是我们可以通过设置消息的过期时间和死信队列来模拟出延时队列。消费者监听死信交换器绑定的队列,而不要监听消息发送的队列。
有了以上的基础知识,我们完成以下需求:
需求:用户在系统中创建一个订单,如果超过时间用户没有进行支付,那么自动取消订单。
分析:
1、上面这个情况,我们就适合使用延时队列来实现,那么延时队列如何创建
2、延时队列可以由 过期消息+死信队列 来时间
3、过期消息通过队列中设置 x-message-ttl 参数实现
4、死信队列通过在队列申明时,给队列设置 x-dead-letter-exchange 参数,然后另外申明一个队列绑定x-dead-letter-exchange对应的交换器。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(AMQP.PROTOCOL.PORT);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个接收被删除的消息的交换机和队列
String EXCHANGE_DEAD_NAME = "exchange.dead";
String QUEUE_DEAD_NAME = "queue_dead";
channel.exchangeDeclare(EXCHANGE_DEAD_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_DEAD_NAME, false, false, false, null);
channel.queueBind(QUEUE_DEAD_NAME, EXCHANGE_DEAD_NAME, "routingkey.dead");
String EXCHANGE_NAME = "exchange.fanout";
String QUEUE_NAME = "queue_name";
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
Map<String, Object> arguments = new HashMap<String, Object>();
// 统一设置队列中的所有消息的过期时间
arguments.put("x-message-ttl", 30000);
// 设置超过多少毫秒没有消费者来访问队列,就删除队列的时间
arguments.put("x-expires", 20000);
// 设置队列的最新的N条消息,如果超过N条,前面的消息将从队列中移除掉
arguments.put("x-max-length", 4);
// 设置队列的内容的最大空间,超过该阈值就删除之前的消息
arguments.put("x-max-length-bytes", 1024);
// 将删除的消息推送到指定的交换机,一般x-dead-letter-exchange和x-dead-letter-routing-key需要同时设置
arguments.put("x-dead-letter-exchange", "exchange.dead");
// 将删除的消息推送到指定的交换机对应的路由键
arguments.put("x-dead-letter-routing-key", "routingkey.dead");
// 设置消息的优先级,优先级大的优先被消费
arguments.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
String message = "Hello RabbitMQ: ";
for(int i = 1; i <= 5; i++) {
// expiration: 设置单条消息的过期时间
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder()
.priority(i).expiration( i * 1000 + "");
channel.basicPublish(EXCHANGE_NAME, "", properties.build(), (message + i).getBytes("UTF-8"));
}
channel.close();
connection.close();
rabbitmq的镜像集群(后续拓展)
这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。
那么如何开启这个镜像集群模式呢?其实很简单,RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。
这样的话,好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!第二,这么玩儿,不是分布式的,就没有扩展性可言了,如果某个 queue 负载很重,你加机器,新增的机器也包含了这个 queue 的所有数据,并没有办法线性扩展你的 queue。你想,如果这个 queue 的数据量很大,大到这个机器上的容量无法容纳了,此时该怎么办呢?
RocketMQ入门
https://github.com/AobingJava/JavaFamily/blob/master/docs/mq/RocketMQ.md