中间件-MQ
消息队列使用
- Qfang
- 楼盘字典、新房小程序房源分享,在楼盘基础信息变动时,发送MQ到盘客通更新楼盘信息
- 房源状态变更后发送MQ,响应的状态后处理(通知经纪人、佣金结算、上下架、不同平台同步、更新ES)。solr、es数据更新
- 投诉举报单针对不同投诉举报类目,在审核结束后发送MQ通知相应服务处理投诉结果
- 房源核验结果,发送MQ处理响应房源核验结果
- 数位大数据
- 消息推送
- 超时认领
- 分析报告生成
- 数据导入
使用消息队列有什么作用
- 解耦、异步、削峰
- 解耦:减少代码模块或者不同服务之间的依赖关系
- 异步:对于通知类的需求,不需要通知结果。通过异步进行通知
- 削峰:通过对消息的消费速度控制请求量,可以更加精准保证系统正常运行
- 缺点:多一个东西多一个麻烦,可用性减低,复杂度提高,可能存在一致性问题
消息队列怎么保证高可用性
- 一般服务的高可用无非就是通过集群或者分布式实现高可用,之前用过的rabbitMQ,是通过多个实例组成集群,然后实例之间镜像复制的形式实现高可用。
如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?
- 这种重复消费问题更多的是和业务相关的,所以一般需要开发者自己解决。在消息内容中添加消息id,这样在消息消费的时候可以进行消息已消费未消费的标记。这部分可以交给redis完成。
rabbitMQ的Comfirm机制
- 生产者发送消息到mq时,可以设置mq进行回调确认:
- 发送到exchange的确认,exchange接收到消息时,回调comfirm方法
- exchange投递消息失败时,回调returnMessage通知投递失败。(配置mandatory)
rabbitMQ的ACK机制
- ack作用是一种消费者告诉mq当前消息是否消费成功的机制,若消费成功则mq会移除这个消息,若失败则更具情况而定。如果不响应ack则mq会堆积未确认消息,直到这个消费者断开(发送给其他消费者)。
如何保证消息的可靠性传输?或者说,如何处理消息丢失的问题?
- 消息丢失一般有情况:生产者发送过程丢失、mq自己丢失、消费者消费过程丢失
- 生产者丢失:可以通过rabbitMq的confirm机制,生产者每发送一个消息,mq都会返回通知是否成功的ack回调。同时如果长时间没收到ack则认为是超时,直接重新发送。若路由匹配不上,可以设置通知mandetory,通知生产方消息失败
- mq丢失:mq丢失一般是mq服务出问题宕机,可以是用rabbitMQ的持久化,queue的持久化和每个消息的持久化。这样重启服务后会恢复消息。
- 消费者丢失:同样通过ack实现,每次成功消费消息,都向rabbitMQ发送一个消费成功的ack
rabbitMQ如何实现延迟消息以及可能出现的问题
- 延迟队列有两种方法:1、DLX+TTL;2、Delay Message 插件
- 1、原理:队列以及队列中的消息可以设置ttl;消息在ttl结束时会被推到死信队列;结合上面两点,将消息放入一个无人消费的带ttl的队列,等ttl到期自动转入死信队列,消费者只需消费死信队列即可
- 2、使用插件:插件定义一种新的exchange信使,x-delay-message类型,可以根据消息的头数据x-delay进行延迟投递
- 存在的问题:
- 第一种:无法实现任意延迟的延迟队列,或者任意延迟的消息可能会堆积,超时了无法及时被消费。原因是队列检查ttl是只检查队列头的消息,检查到队头消息未超时则不会继续检查后面的消息。适用于延迟等长时间的消息,如3秒后补偿消息,订单30分钟后取消。
- 第二种:由于插件的实现是使用本地的数据表,节点宕机可能导致消息不可用;不适合大数据量的延迟消息;支持本地磁盘节点,不支持内存节点
rabbitMQ的持久化
- 开启消息持久化和队列必须持久化
- 原理:当生产者发送过一条需要持久化的消息时,RabbitMQ会在消息提交到日志文件后才发送响应;消费者消费成功时,RabbitMQ会在持久化日志中把这条消息标记为可删除的消息。重启时rabbit会自动创建相应的exchange、queue并加载消息到队列;
rabbitMQ的集群
- 多实例节点组成集群,每个节点具有相同的元数据副本,但是每个节点负责不同的queue。队列数据只存在负责queue的节点是,queue可以启用镜像队列来应对单点故障;生产者投递消息时根据元数据转发到负责queue的节点进行投递 消费者消费数据时也会转发负责的queue的节点进行消费,转发操作有rabbitmq自己完成。对外负载均衡需要额外的组件实现。
rabbitmq的优先级队列
- 1.优先级队列必须和优先级消息一起使用,才能发挥出效果,但是会消耗性能
- queue添加参数x-max-priority进行定义,消息添加参数priority进行定义
- 2.优先级队列必须在消费者繁忙的时候,才能对消息按照优先级排序
- 3.非优先级队列发送优先级消息是不会排序的,所以向非优先级队列发送优先级是没有任何作用的
如何保证消息的顺序?
- 其实这个问题并没有完美的通用解决方案。
- 方案一:使用一个consumer消费数据,这个是整个队列全局有序,会导致吞吐量低下;也不符合分布式的设计;
- 方案二:使用多个队列对消息进行分类,把同类需要保持顺序的消息方同一个队列中,同样每个队列也使用一个consumer去消费;
- 方案三:上面都是从consumer端考虑的方案并且consumer只能一个,如果从生产者端考虑呢?
- 第一步:生成者生产消息时,将同类的有序消息加入到同一个队列中,不同类消息使用不同队列存放。这里定义这些同类队列为:
前置队列; - 第二步:需要一个充当exchange角色的服务或者机制(定义为
运行exchange),这个exchange依赖一个队列(定义为:运行队列),队列中存放着前置队列的队列名,当队列名存在于运行队列中时表示该前置队列可以继续被消费;运行exchange不断消费运行队列,将运行队列中元素对应的队列数据投递到消费者实际的队列中。 - 第三步:消费者消费完消息后,向
前置队列投放数据 - 方案四:消息中附带同类型消息顺序序号,消费者按照序号消费;发现序号跳号或者顺序不多,则将消息打回mq在延迟一定时间后重试。
如何处理消息堆积?
- 第一步:尽可能找出堆积的原因
- 第二步:停止原来的consumer,将堆积的消息按照某种规则划分转移到多个子队列中,并用consumer消费这些子队列
- 第三步:恢复原来consumer
如果让你写一个消息队列,该如何进行架构设计?说一下你的思路。
- 最主要的是要考虑可靠性,不可靠的东西是不能排上用场的。可靠性可以从服务端和客户端两个地方考虑
- 其次需要考虑伸缩拓展能力
RabbitMQ与KafKa区别
架构模型
RabbitMQ遵循AMQP协议,RabbitMQ的broker由Exchange,Binding,queue组成,其中exchange和binding组成了消息的路由键;客户端Producer通过连接channel和server进行通信,Consumer从queue获取消息进行消费(长连接,queue有消息会推送到consumer端,consumer循环从输入流读取数据)。rabbitMQ以broker为中心;有消息的确认机制。
kafka遵从一般的MQ结构,producer,broker,consumer,以consumer为中心,消息的消费信息保存的客户端consumer上,consumer根据消费的点,从broker上批量pull数据;无消息确认机制.
吞吐量
kafka具有高吞吐量,内部采用消息批量处理,零拷贝机制,数据的存储和获取是本地磁盘顺序批量操作,具有O(1)的复杂度,消息处理的效率很高。
rabbitMQ在吞吐量方面稍逊于kafka,他们的出发点不一样,rabbitMQ支持对消息的可靠传递,支持事务不支持批量的操作;基于存储的可靠性的要求存储可以采用内存或者硬盘
集群负载均衡方面,
kafka采用zookeeper对集群中的broker、consumer进行管理,可以注册topic到zookeeper上;通过zookeeper的协调机制,producer保存对应topic的broker信息,可以随机或者轮询发送到broker上;并且producer可以基于语义指定分片,消息发送到broker的某分片上。
rabbitMQ的负载均衡需要单独的loadbalancer进行支持。