RabbitMQ初步使用

前置知识

发送消息的通信的模式

1
2
1.简单模式  一个生产者发送一个消息 给消费者来消费  
2.工作模式 一个生产者发送多个消息,由不同的消费者来消费同一个队列中的消息,但是不同的消费者消费的消息也是不一样的。消息一旦被消费就没了。

广播模式

image-20230922154941590

路由模式

image-20230922154946834

主题模式

image-20230922154950546

总结

1
2
3
4
5
6
7
8
1.简单模式  一个生产者发送一个消息 一个消费者来消费,消费完了之后消息就没.
2.工作模式 一个生产者发送多个消息 ,多个消费者来消费不同的消息,消费完了消息就没了
===推荐==
交换机:只做消息的转发,不做消息的存储,一旦队列没有绑定到交换机,消息被生产者发送过来,就会丢失。

3.广播模式 一个生产者发送一个消息,多个消费者都可以消费同一个消息
4.路由模式 一个生产者发送一个消息,需要指定routingkey ,交换机和队列 通过routingkey进行绑定,如果指定的routingkey 和绑定的routingkey 一致 则将消息转发给对应的队列中
5.主题模式 生成者发送一个消息,需要指定routingkey ,交换机和队列 通过通配符的方式 进行绑定,如果指定的routingkey符合绑定的通配符,则将消息转发到对应的队列中。

RabbitMQ基础入门

1. 消息中间件(MQ)

1.1 消息队列回顾

MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。

发送消息方成为生产者,接收消息方成为消费者。

分布式直接通信分为:

  • 直接远程调用

image-20211120130250699

  • 借助第三方完成间接通信

image-20211120130304706

​ 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ。消息中间件到底该如何使用,何时使用这是一个问题,胡乱地使用消息中间件增加了系统的复杂度,如果用不好消息中间件还不如不用。

​ 消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)Consumer(消费者)

1.2 消息队列特点有哪些?

1.2.1 先进先出

先进先出是队列的一个特性

​ 不能先进先出,都不能说是队列了。消息队列的顺序在入队的时候就基本已经确定了,一般是不需人工干预的。而且,最重要的是,数据是只有一条数据在使用中。 这也是MQ在诸多场景被使用的原因。

​ ps:除非只有一个生产者,一个消费者,且MQ是顺序的,不然没有办法保证绝对的顺序消费。因为生产者给MQ发消息可能因为网络延迟等问题,顺序乱了。而消费者消费消息也有可能因为多线程消费而不是顺序消费。

1.2.2 订阅

一般是MQ的广播模式,类似于java的观察者模式

​ 发布订阅是一种很高效的处理方式,如果不发生阻塞,基本可以当做是同步操作。这种处理方式能非常有效的提升服务器利用率,这样的应用场景非常广泛。

1.2.3 持久化

​ 持久化确保MQ的使用不只是一个部分场景的辅助工具,而是让MQ能像数据库一样存储核心的数据。

1.2.4 分布式

​ 在现在大流量、大数据的使用场景下,只支持单体应用的服务器软件基本是无法使用的,支持分布式的部署,才能被广泛使用。而且,MQ的定位就是一个高性能的中间件。

1.3 消息队列通讯模式是什么?

1.3.1 点对点通讯点

​ 点对点方式是最为传统和常见的通讯方式,它支持一对一、一对多、多对多、多对一等多种配置方式,支持树状、网状等多种拓扑结构。

1.3.2 发布/订阅(Publish/Subscribe)模式

​ 发布/订阅功能使消息的分发可以突破目的队列地理指向的限制,使消息按照特定的主题甚至内容进行分发,用户或应用程序可以根据主题或内容接收到所需要的消息。发布/订阅功能使得发送者和接收者之间的耦合关系变得更为松散,发送者不必关心接收者的目的地址,而接收者也不必关心消息的发送地址,而只是根据消息的主题进行消息的收发。在MQ家族产品中,MQEventBroker是专门用于使用发布/订阅技术进行数据通讯的产品,它支持基于队列和直接基于TCP/IP两种方式的发布和订阅。

1.3.3 群集(Cluster)

​ 为了简化点对点通讯模式中的系统配置,MQ提供Cluster(群集)的解决方案。群集类似于一个域(Domain),群集内部的队列管理器之间通讯时,不需要两两之间建立消息通道,而是采用群集(Cluster)通道与其它成员通讯,从而大大简化了系统配置。此外,群集中的队列管理器之间能够自动进行负载均衡,当某一队列管理器出现故障时,其它队列管理器可以接管它的工作,从而大大提高系统的高可靠性。

1.4 为什么使用消息队列(优点)?

其实就是问问你消息队列都有哪些使用场景,然后你项目里具体是什么场景,说说你在这个场景里用消息队列是什么?

先说一下消息队列常见的使用场景吧,其实场景有很多,但是比较核心的有 3 个:解耦异步削峰

在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高系统吞吐量

1.4.1 应用解耦

​ 在快递柜业务流程中,快递员投柜后需要经历扣减系统费、短信通知用户和推送通知快递公司三个业务动作。传统做法需要依次执行这些业务东西,如果其中某一步异常(例如用户手机未开机或者快递公司接口故障),将会延迟甚至中断整个投柜流程,严重影响用户体验。

image-20230922160524788

​ 如果接口层收到投柜数据后,写入消息到MQ,后续三个子系统各自消费处理,将可以完美解决该问题,并且子系统故障不影响上游系统!此为 解耦

再举个例子:

MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。

  • 未使用MQ

image-20211120153104058

  • 加入MQ

image-20211120153346015

1.4.2 异步

​ 比如快递投柜后,用户马上就结束了,不会等待到发送短信或者通知快递公司结束的,直接将消息投递到MQ,然后就直接结束,具体到扣减系统费以及后续的通知,都是异步操作的,不需要用户关心的,着就是将用户的同步操作转换为异步操作。

image-20230922160540156

如果全部同步操作需要15S,而发送到MQ后交给系统异步处理用户只需要1S就可以完成操作。

在举个例子:

将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。

  • 未使用MQ

image-20211120153505510

互联网应用,用户点击玩下单按钮,应该在200ms内完成响应。用户体验会好。

  • 使用MQ

image-20211120153907931

吞吐量:未使用之前大概1s处理一个请求,而使用mq后,25ms处理一个请求。1s处理40个请求。大大提升了吞吐量

1.4.3 流量削峰

就像用户投递快递,高峰到40W每秒,但是我们的后续处理业务每秒只能20W,还剩下20W在MQ进行堆积,这就是MQ很重要的流量削峰的能力,将用户的洪峰流量,让后台慢慢来处理,MQ承担一个缓冲的作用

image-20230922160548627

​ 就像这个波形图一样,如果用户请求的并发量的最高峰时40W,系统的承载能力只能达到30W,就可以使用MQ进行削峰,将系统最高40W的并发削峰为最高只有20万,就是时间换空间的做法。

image-20230922160338916

秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。

  1. 可以控制活动人数,超过此一定阀值的订单直接丢弃
  2. 可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)
  3. 用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.
  4. 秒杀业务根据消息队列中的请求信息,再做后续处理.

再举个例子:

如订单系统,在下单的时候就会往数据库写数据。但是数据库只能支撑每秒1000左右的并发写入,并发量再高就容易宕机。低峰期的时候并发也就100多个,但是在高峰期时候,并发量会突然激增到5000以上,这个时候数据库肯定卡死了。

image-20211120152128954

消息被MQ(5000QPS对MQ来说小意思)保存起来了,然后系统就可以按照自己的消费能力来消费,比如每秒1000个数据,这样慢慢写入数据库,这样就不会卡死数据库了。

image-20211120152249065

但是使用了MQ之后,限制消费消息的速度为1000,但是这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了。但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000QPS,直到消费完积压的消息,这就叫做“填谷”

image-20211120152717148

使用MQ后,可以提高系统稳定性

1.4.4 消息驱动系统

1.4 消息队列有什么缺点?

优点上面已经说了,就是在特殊场景下有其对应的好处解耦异步削峰

1.4.1 系统可用性降低

​ 消息队列在系统中充当一个中间人的身份,如果该中间人突然失联了,那其他两方就不知所措了,最后也就导致系统之间无法互动。

image-20230922160347132

就像我们投递快递,如果MQ出现问题,那麽我们整个系统调用链路就会断开,前后端将无法通讯

系统引入的外部依赖越多,系统稳定性越差(原先要保证AB可用,现在要保证AB和MQ可用,要多保证一个)。一旦 MQ 宕机,就会对业务造成影响。

  • 如何保证MQ的高可用?
1.4.2 系统复杂性提高

​ 在使用消息队列的过程中,难免会出现生产者、MQ、消费者宕机不可用的情况,那么随之带来的问题就是消息重复、消息乱序、消息堆积等等问题都需要我们来控制。

MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。

  • 如何保证消息没有被重复消费?

  • 怎么处理消息丢失情况?

  • 那么保证消息传递的顺序性?

1.4.3 一致性问题

​ 如下图所示,系统需要保证快递投递,扣减系统费,通知等之间的数据一致性,如果系统短信通知,快递通知执行成功,扣减系统费执行失败时,就会出现数据不一致问题

image-20230922160352766

如果出现这种问题,我们需要有应对方案,比如重试等方案。

​ 所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,做好之后,你会发现,妈呀,系统复杂度提升了一个数量级,也许是复杂了 10 倍。但是关键时刻,用,还是得用的。

A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。

  • 如何保证消息数据处理的一致性?

1.5 什么时候可以用MQ?

既然 MQ 有优势也有劣势,那么使用 MQ 需要满足什么条件呢?

  • 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。

  • 容许短暂的不一致性

  • 确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本

2. 常用消息队列有哪些?

消息队列是分布式应用间交换信息的重要组件,消息队列可驻留在内存或磁盘上, 队列可以存储消息直到它们被应用程消费。

​ 通过消息队列,应用程序可以在不知道彼此位置的情况下独立处理消息,或者在处理消息前不需要等待接收此消息。

​ 所以消息队列可以解决应用解耦、异步消息、流量削锋等问题,是实现高性能、高可用、可伸缩和最终一致性架构中不可以或缺的一环。

​ 现在比较常见的消息队列产品主要有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、RocketMQ等

​ 还有Apache新的pulsar,为云原生准备的。

2.1 ActiveMQ

image-20230922160357951

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线,ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

出现时间比较早的一款消息队列的中间件组件 , 目前整个社区活跃度比较低, 此软件的使用人群也在不断的降低

2.1.1 特性
  1. 多种语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
  3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
  4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
  5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  6. 支持通过JDBC和journal提供高速的消息持久化
  7. 从设计上保证了高性能的集群,客户端-服务器,点对点
  8. 支持Ajax
  9. 支持与Axis的整合
  10. 可以很容易得调用内嵌JMS provider,进行测试
2.1.2 使用建议

​ 一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了;

2.2 RabbitMQ

image-20230922160405331

RabbitMQ是流行的开源消息队列系统,用erlang语言开发,RabbitMQ是AMQP(高级消息队列协议)的标准实现。目前在Java领域中使用非常频繁的一款消息队列的中间件产品, 其社区活跃度比较高的, 支持多种语言的开发

​ 支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗

image-20230922160013084

2.2.1 特性
  • erlang语言开发,性能极其好,延时很低
  • 吞吐量到万级,MQ功能比较完备
  • 开源提供的管理界面非常棒,用起来很好用
  • 社区相对比较活跃,几乎每个月都发布几个版本分
  • 在国内一些互联网公司近几年用rabbitmq也比较多一些
2.2.2 使用建议

​ 大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高,所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择,对自家技术没有过高自信的话,可以使用RabbitMQ,人家有活跃的开源社区,绝对不会黄。

2.3 Kafka

image-20230922160016750

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 是一款大数据领域下的消息队列的中间件产品, 主要应用在大数据领域方向上, 在后端 以及业务领域使用较少(用基本都是非重要数据传输上使用)

​ 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。

2.3.1 特性
  • 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。(文件追加的方式写入数据,过期的数据定期删除)
  • 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息
  • 支持通过Kafka服务器和消费机集群来分区消息
  • 支持Hadoop并行数据加载
2.3.2 使用建议

​ 如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

2.3.3 应用场景

一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用。

2.3.3.1 日志收集

​ 一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer;

2.3.3.2 消息系统

​ 解耦生产者和消费者、缓存消息等;

2.3.3.3 用户活动跟踪

​ kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后消费者通过订阅这些topic来做实时的监控分析,亦可保存到数据库;

2.3.3.4 运营指标

​ kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告;

2.3.3.5 流式处理

​ 比如spark streaming和storm。

2.4 RocketMQ

image-20230922160021782

RocketMQ是阿里开源的消息中间件,纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。

​ RocketMQ思路起源于Kafka,但并不是简单的复制,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景,支撑了阿里多次双十一活动

​ 因为是阿里内部从实践到产品的产物,因此里面很多接口、api并不是很普遍适用,可靠性毋庸置疑,而且与Kafka一脉相承(甚至更优),性能强劲,支持海量堆积。

2.4.1 特点
  • 接口简单易用,源码是阿里出品,可自定义MQ
  • topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降
  • 消息可用性极高,经过参数优化配置,可以做到0丢失
  • MQ功能较为完善,还是分布式的,扩展性好
2.4.2 使用建议

​ 现在确实越来越多的公司会去用 RocketMQ,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险(目前 RocketMQ 已捐给 Apache,但 GitHub 上的活跃度其实不算高)对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。

2.5 Pulsar

最近一两年新起的一款消息队列的中间件产品, 也是Apache顶级开源项目, 目前主要是有StreamNative公司进行维护

3. 怎样选型MQ?

到底应该哪个方案,还是要看具体的需求。在我们的设计中,MQ的功能与业务无关,因此优先考虑使用已有的中间件搭建。那么具本选择哪个中间件呢?

3.1 需求分析

3.1.1 功能需求

​ 除了最基本生产消费模型,还需要MQ能支持REQUEST-REPLY模型,以提供对同步调用的支持。 此外,如果MQ能提供PUBLISH-SUBSCRIBE模型,则事件代理的实现可以更加简单。

3.1.2 性能需求

​ 考虑未来一到两年内产品的发展,消息队列的呑吐量预计不会超过 1W qps,但由单条消息延迟要求较高,希望尽量的短。

3.1.3 可用性需求

​ 因为是在线服务,因此需要较高的可用性,但充许有少量消息丢失。

3.1.4 易用性需求

​ 包括学习成本、初期的开发部署成本、日常的运维成本等。

3.2 横向对比

特性 ActiveMQ RabbitMQ Kafka RocketMQ
PRODUCER-COMSUMER 支持 支持 支持 支持
PUBLISH-SUBSCRIBE 支持 支持 支持 支持
REQUEST-REPLY 支持 支持 - 支持
API完备性 低(静态配置)
多语言支持 支持,JAVA优先 语言无关 支持,JAVA优先 支持
单机呑吐量 万级 万级 十万级 单机万级
消息延迟 毫秒级 微秒级 毫秒级 毫秒级
可用性 高(主从) 高(主从) 很高(分布式) 非常高(分布式)
消息丢失 - 理论上不会丢失 理论上不会丢失
消息重复 - 可控制 理论上会有重复 允许重复
文档的完备性
提供快速入门
首次部署难度 -

注: - 表示尚未查找到准确数据

image-20211120195932148

image-20211202205346780

1
2
如果消息topic设置不大,几十左右 => 选择kafka
如果消息体量大,可以预见设置很多队列 => 选rocketmq

image-20211202205649959

1
2
真正能解决顺序的只有rocket,但是这个时候性能降低严重,几乎没法使用
rabbitmq和kafka也能,需要额外的手段。
  • kafka 性能好 吞吐量高 100w 丢数据 延迟性高(毫秒)

  • rocketmq 性能好 吞吐量高 50w 理论上不丢数据 延迟性高(毫秒)

  • rabbitmq 性能相比低 吞吐量 低 10W 丢数据 延迟性低(微秒)

3.2.1 个人建议
  • 中小型公司,技术一般,可以考虑用 RabbitMQ;
  • 大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择
  • 实时计算、日志采集:使用 kafka;

3.3 如何选型?

MQ 描述
RabbitMQ erlang开发,对消息堆积的支持并不好,当大量消息积压的时候,会导致 RabbitMQ 的性能急剧下降。每秒钟可以处理几万到十几万条消息。
RocketMQ java开发,面向互联网集群化功能丰富,对在线业务的响应时延做了很多的优化,大多数情况下可以做到毫秒级的响应,每秒钟大概能处理几十万条消息。
Kafka Sc ala开发,面向日志功能丰富,性能最高。当你的业务场景中,每秒钟消息数量没有那么多的时候,Kafka 的时延反而会比较高。所以,Kafka 不太适合在线业务场景。
ActiveMQ java开发,简单,稳定,性能不如前面三个。小型系统用也ok,但是不推荐。推荐用互联网主流的。

4. RabbitMQ基本使用

RabbitMQ 与 AMQP 遵循相同的模型架构,其架构示例图如下

image-20230922160438138

4.1 重要概念

4.1.1 Publisher

消息生产者,就是投递消息的程序

​ 发布者 (或称为生产者) 负责生产消息并将其投递到指定的交换器上。

4.1.2 Message

​ 消息由消息头和消息体组成,消息头用于存储与消息相关的元数据:如目标交换器的名字 (exchange_name) 、路由键 (RountingKey)和其他可选配置 (properties) 信息。消息体为实际需要传递的数据。

4.1.3 Exchange

交换器负责接收来自生产者的消息,并将消息路由到一个或者多个队列中,如果路由不到,则返回给生产者或者直接丢弃,这取决于交换器的 mandatory 属性:

  • 当 mandatory 为 true 时:如果交换器无法根据自身类型和路由键找到一个符合条件的队列,则会将该消息返回给生产者;
  • 当 mandatory 为 false 时:如果交换器无法根据自身类型和路由键找到一个符合条件的队列,则会直接丢弃该消息。
4.1.4 BindingKey

​ 交换器与队列通过 BindingKey 建立绑定关系。

4.1.5 Routingkey

基于交换器类型的规则相匹配时,消息被路由到对应的队列中

​ 生产者将消息发给交换器的时候,一般会指定一个 RountingKey,用来指定这个消息的路由规则。当 RountingKey 与 BindingKey

4.1.6 Queue

消息队列载体,每个消息都会被投入到一个或多个队列。

​ 用于存储路由过来的消息。多个消费者可以订阅同一个消息队列,此时队列会将收到的消息将以轮询 (round-robin) 的方式分发给所有消费者。即每条消息只会发送给一个消费者,不会出现一条消息被多个消费者重复消费的情况。

4.1.7 Consumer

消息消费者,就是接受消息的程序

​ 消费者订阅感兴趣的队列,并负责消费存储在队列中的消息。为了保证消息能够从队列可靠地到达消费者,RabbitMQ 提供了消息确认机制 (messageacknowledgement),并通过 autoAck 参数来进行控制

  • 当 autoAck 为 true 时:此时消息发送出去 (写入TCP套接字) 后就认为消费成功,而不管消费者是否真正消费到这些消息。当 TCP 连接或 channel 因意外而关闭,或者消费者在消费过程之中意外宕机时,对应的消息就丢失。因此这种模式可以提高吞吐量,但会存在数据丢失的风险。
  • 当 autoAck 为 false 时:需要用户在数据处理完成后进行手动确认,只有用户手动确认完成后,RabbitMQ 才认为这条消息已经被成功处理。这可以保证数据的可靠性投递,但会降低系统的吞吐量。
4.1.8 Connection

​ 用于传递消息的 TCP 连接。

4.1.9 Channel

消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

​ RabbitMQ 采用类似 NIO (非阻塞式 IO ) 的设计,通过 Channel 来复用 TCP 连接,并确保每个 Channel的隔离性,就像是拥有独立的 Connection 连接。当数据流量不是很大时,采用连接复用技术可以避免创建过多的 TCP 连接而导致昂贵的性能开销。

4.1.10 Virtual Host

虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离

​ RabbitMQ 通过虚拟主机来实现逻辑分组和资源隔离,一个虚拟主机就是一个小型的 RabbitMQ服务器,拥有独立的队列、交换器和绑定关系。用户可以按照不同业务场景建立不同的虚拟主机,虚拟主机之间是完全独立的,你无法将 vhost1 上的交换器与vhost2 上的队列进行绑定,这可以极大的保证业务之间的隔离性和数据安全。默认的虚拟主机名为 /

4.1.11Broker

​ 简单来说就是消息队列服务器实体。

4.2 RabbitMQ安装

参考:https://baiyp.ren/RabbitMQ%E5%AE%89%E8%A3%85.html

4.3 如何使用RabbitMQ发送消息?

exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

  1. 客户端连接到消息队列服务器,打开一个channel。
  2. 客户端声明一个exchange,并设置相关属性。
  3. 客户端声明一个queue,并设置相关属性。
  4. 客户端使用routing key,在exchange和queue之间建立好绑定关系。
  5. 客户端投递消息到exchange。

4.4 消息怎么路由?

​ 从概念上来说,消息路由必须有三部分:交换器路由绑定。生产者把消息发布到交换器上;绑定决定了消息如何从路由器路由到特定的队列;消息最终到达队列,并被消费者接收。

​ 消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定,通过队列路由键,可以把队列绑定到交换器上。

​ 消息到达交换器后,RabbitMQ会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则)。如果能够匹配到队列,则消息会投递到相应队列中;如果不能匹配到任何队列,消息将进入 “黑洞”。

4.5 常用交换器有哪些?

交换器是消息被发送的 AMQP 实体,交换器拿到消息然后把它路由给0或多个队列,路由算法基于交换器的类型和 bindings。

常用的交换器主要分为一下三种:

Exchange type(交换器类型) Default pre-declared names(预声明默认名称)
Direct exchange(直连交换器) (Empty string) and amq.direct
Fanout exchange(扇形交换器) amq.fanout
Topic exchange(主题交换器) amq.topic
Headers exchange(头信息交换器) amq.match (and amq.headers in RabbitMQ)
4.5.1 直连交换机

如果路由键完全匹配,消息就被投递到相应的队列

​ 直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应绑定键的队列。

​ 直连交换机是一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个routing_key,当消息被发送的时候,需要指定一个binding_key,这个消息被送达交换机的时候,就会被这个交换机送到指定的队列里面去。同样的一个binding_key也是支持应用到多个队列中的。

image-20230922160444932

直连型交换机图例:

image-20230922160033980

​ 当生产者(P)发送消息时 Rotuing key=booking 时,这时候将消息传送给 Exchange,Exchange 获取到生产者发送过来消息后,会根据自身的规则进行与匹配相应的 Queue,这时发现 Queue1 和 Queue2 都符合,就会将消息传送给这两个队列。

​ 如果我们以 Rotuing key=create 和 Rotuing key=confirm 发送消息时,这时消息只会被推送到 Queue2 队列中,其他 Routing Key 的消息将会被丢弃。

4.5.2 扇型交换机

如果交换器收到消息,将会广播到所有绑定的队列上

​ 扇型交换机(fanout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果 N 个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的 N 个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。

image-20230922160037645

因为扇型交换机投递消息的拷贝到所有绑定到它的队列,所以他的应用案例都极其相似:

  • 大规模多用户在线(MMO)游戏可以使用它来处理排行榜更新等全局事件
  • 体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端
  • 分发系统使用它来广播各种状态和配置更新
  • 在群聊的时候,它被用来分发消息给参与群聊的用户。(AMQP 没有内置 presence 的概念,因此 XMPP 可能会是个更好的选择)

扇型交换机图例:

image-20230922160041562

​ 上图所示,生产者(P)生产消息 1 将消息 1 推送到 Exchange,由于 Exchange Type=fanout 这时候会遵循 fanout 的规则将消息推送到所有与它绑定 Queue,也就是图上的两个 Queue 最后两个消费者消费。

4.5.3 主题交换机

可以使来自不同源头的消息能够到达同一个队列。使用topic交换器时,可以使用通配符。

​ 基于消息的 routing key 与绑定到该交换器的队列的 pattern 进行匹配,路由消息到一个或多个队列。常用于复杂的发布/订阅场景。 当出现多消费者/应用的场景,消费者选择性地接收消息时,应该考虑使用 topic exchange

​ 前面提到的 direct 规则是严格意义上的匹配,换言之 Routing Key 必须与 Binding Key 相匹配的时候才将消息传送给 Queue.

​ 而Topic 的路由规则是一种模糊匹配,可以通过通配符满足一部分规则就可以传送。

4.5.3.1 约束条件
  1. binding key 中可以存在两种特殊字符 “” 与“#”,用于做模糊匹配,其中 “” 用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
  2. routing key 为一个句点号 “.” 分隔的字符串(我们将被句点号 “. ” 分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit” binding key 与 routing key 一样也是句点号 “.” 分隔的字符串

主题交换机图例:

image-20230922160045195

​ 当生产者发送消息 Routing Key=F.C.E 的时候,这时候只满足 Queue1,所以会被路由到 Queue 中,如果 Routing Key=A.C.E 这时候会被同是路由到 Queue1 和 Queue2 中,如果 Routing Key=A.F.B 时,这里只会发送一条消息到 Queue2 中。

​ 主题交换机拥有非常广泛的用户案例。无论何时,当一个问题涉及到那些想要有针对性的选择需要接收消息的 多消费者 / 多应用(multiple consumers/applications) 的时候,主题交换机都可以被列入考虑范围。

4.5.3.2 使用案例
  • 分发有关于特定地理位置的数据,例如销售点
  • 由多个工作者(workers)完成的后台任务,每个工作者负责处理某些特定的任务
  • 股票价格更新(以及其他类型的金融数据更新)
  • 涉及到分类或者标签的新闻更新(例如,针对特定的运动项目或者队伍)
  • 云端的不同种类服务的协调
  • 分布式架构 / 基于系统的软件封装,其中每个构建者仅能处理一个特定的架构或者系统。
4.5.4 头交换机(不常用)

headers 类型的 Exchange 不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。

​ 头交换机可以视为直连交换机的另一种表现形式。但直连交换机的路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等。灵活性更强(但实际上我们很少用到头交换机)。工作流程:

  1. 绑定一个队列到头交换机上时,会同时绑定多个用于匹配的头(header)。
  2. 传来的消息会携带header,以及会有一个 “x-match” 参数。当 “x-match” 设置为 “any” 时,消息头的任意一个值被匹配就可以满足条件,而当 “x-match” 设置为 “all” 的时候,就需要消息头的所有值都匹配成功。
4.5.5 交换机小结
类型名称 路由规则
Default 自动命名的直交换机
Direct 把消息路由到BindingKey和RoutingKey完全匹配的队列中,Routing Key==Binding Key,严格匹配
Fanout 发送到该交换机的消息都会路由到与该交换机绑定的所有队列上,可以用来做广播
Topic topic和direct类似,也是将消息发送到RoutingKey和BindingKey相匹配的队列中,只不过可以模糊匹配
Headers 根据发送的消息内容中的 headers 属性进行匹配,性能差,基本不会使用

RabbitMQ进阶使用

RabbitMQ 简介

AMQP

AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议 的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中 间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP

我们也接触了很多协议了,TCP,HTTP等,只要遵循协议就可以做通信

image-20211120200348320

​ 2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。

​ Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。

RabbitMQ 基础架构如下图:

image-20211120200438022

RabbitMQ基本概念

  • Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker

  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网 络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多 个vhost,每个用户在自己的 vhost 创建 exchange/queue 等。

    好比MySQL中有很多数据库,相当于逻辑分区的概念。

  • Connection:publisher/consumer 和 broker 之间的 TCP 连接

  • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线 程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销

  • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

  • Queue:消息最终被送到这里等待 consumer 取走

  • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存 到 exchange 中的查询表中,用于 message 的分发依据

6 种工作模式

RabbitMQ 提供了 6 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。

官网对应模式介绍:https://www.rabbitmq.com/getstarted.htm

image-20211120201017954

JMS

  • JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件(消息队列)的API
  • JMS 是 JavaEE 规范中的一种,类比JDBC
  • 很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ 官方没有提供 JMS 的实现包,但是开源社区有

安装配置RabbitMQ

1.安装erlang依赖环境

1
2
# rpm包安装前需要安装下面的依赖包
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

2.离线安装Erlang

1
2
3
4
5
6
# 上传rpm安装包
erlang-18.3-1.el7.centos.x86_64.rpm
socat-1.7.3.2-5.el7.lux.x86_64.rpm
rabbitmq-server-3.6.5-1.noarch.rpm
# 安装
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm

如果出现如下错误,则升级,否则直接跳转到第三步安装RabbitMQ

image-20211120201606600

  • 说明gblic 版本太低。我们可以查看当前机器的gblic 版本
1
strings /lib64/libc.so.6 | grep GLIBC

image-20211120201657547

当前最高版本2.12,需要2.15.所以需要升级glib

  • 使用yum更新安装依赖
1
sudo yum install zlib-devel bzip2‐devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make -y
  • 下载rpm包
1
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6- x86_64/glibc-2.17-55.fc20/glibc-utils-2.17-55.el6.x86_64.rpm & wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6- x86_64/glibc-2.17-55.fc20/glibc-static-2.17-55.el6.x86_64.rpm & wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6- x86_64/glibc-2.17-55.fc20/glibc-2.17-55.el6.x86_64.rpm & wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6- x86_64/glibc-2.17-55.fc20/glibc-common-2.17-55.el6.x86_64.rpm & wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6- x86_64/glibc-2.17-55.fc20/glibc-devel-2.17-55.el6.x86_64.rpm & wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6- x86_64/glibc-2.17-55.fc20/glibc-headers-2.17-55.el6.x86_64.rpm & wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6- x86_64/glibc-2.17-55.fc20/nscd-2.17-55.el6.x86_64.rpm &
  • 安装rpm包
1
sudo rpm -Uvh *-2.17-55.el6.x86_64.rpm --force --nodeps
  • 安装完毕后再查看glibc版本,发现glibc版本已经到2.17了
1
strings /lib64/libc.so.6 | grep GLIBC

3.安装RabbitMQ

1
2
3
# 安装 
rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm # rabbitmq依赖环境 # 如果不行,--force --nodeps
rpm -ivh rabbitmq‐server‐3.6.5‐1.noarch.rpm

4.开启管理界面及配置

1
2
3
4
5
# 开启管理界面 
rabbitmq-plugins enable rabbitmq_management
# 修改默认配置信息
vim /usr/lib/rabbitmq/lib/rabbitmq_server‐3.6.5/ebin/rabbit.app
# 比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest (搜索:/guest)

image-20220303231940219

5.启动

1
2
3
4
5
6
service rabbitmq‐server start # 启动服务 
service rabbitmq‐server stop # 停止服务
service rabbitmq‐server restart # 重启服务

访问不到可能是防火墙拦截了。
service iptables stop # 关闭防火墙

6.设置配置文件

image-20211120210048331

1
2
3
# 拷贝一份配置文件
cd /usr/share/doc/rabbitmq‐server‐3.6.5/
cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

7.配置虚拟主机及用户

默认用户角色

创建自定义用户

image-20211120203534970

自定义用户可以分配一下角色:

  • 1、 超级管理员(administrator)

​ 可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。

  • 2、 监控者(monitoring)

​ 可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

  • 3、 策略制定者(policymaker)

​ 可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

  • 4、 普通管理者(management)

​ 仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

  • 5、 其他

​ 无法登陆管理控制台,通常就是普通的生产者和消费者。

Virtual Hosts配置

​ 像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。RabbitMQ也有类似的权限管理;在 RabbitMQ中可以虚拟消息服务器Virtual Host,每个Virtual Hosts相当于一个相对独立的RabbitMQ服务器,每个 VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。Virtual Name一般以/开头。

  • 创建Virtual Hosts

image-20211120203745770

  • 设置Virtual Hosts权限

image-20211120203816372

image-20211120203819996

RabbitMQ工作模式

工作模式其实就是指消息路由的一种分发模式

简单模式

image-20211120212714320

步骤:

  • ① 创建工程(生成者、消费者)

image-20211120212925835

  • ② 分别添加依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<dependencies>
<!--rabbitmq java 客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
<!--解决启动日志警告>-->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
  • ③ 编写生产者发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ManProducer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置参数
factory.setHost("192.168.88.128"); // ip 默认localhost
factory.setPort(5672); // 端口 默认值5672
factory.setVirtualHost("/pointink"); // 虚拟机 默认/
factory.setUsername("pointink"); // 用户名 默认guest
factory.setPassword("pointink"); // 密码 默认guest
// 3.创建连接connection
Connection connection = factory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.创建队列queue
// 参数说明:
// String queue 队列名称,如果有改名队列,则不创建,如果没有叫改名队列,则创建
// boolean durable 是否持久化,当mq重启之后还在
// boolean exclusive 是否独占,只能有一个消费者来监听这个队列。当connection关闭时,是否删除队列。一般设置为false
// boolean autoDelete 是否自动删除,当没有consumer时,自动删除掉。一般也不自动删除。消息就是要来消费的
// Map<String,Object> arguments 参数
channel.queueDeclare("love",true,false,false,null);
// 6.发送消息
// 参数说明:
// String exchange 交换机,简单模式下交换机会使用默认的""
// String routingKey 路由名称,简单模式下路由和队列的名称一样就行
// BasicProperties props 配置信息,设置为null即可
// byte[] body 发送消息数据
channel.basicPublish("","love",null,"两情若是久长时".getBytes(StandardCharsets.UTF_8));
// 7.释放资源
channel.close();
connection.close();
}
}

image-20211120215901615

执行mian方法,此处发送了2条消息

image-20211120220741985

  • ④ 编写消费者接收消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class WomanConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置参数
factory.setHost("192.168.88.128"); // ip 默认localhost
factory.setPort(5672); // 端口 默认值5672
factory.setVirtualHost("/pointink"); // 虚拟机 默认/
factory.setUsername("pointink"); // 用户名 默认guest
factory.setPassword("pointink"); // 密码 默认guest
// 3.创建连接connection
Connection connection = factory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.创建队列queue
// 参数说明:
// String queue 队列名称,如果有改名队列,则不创建,如果没有叫改名队列,则创建
// boolean durable 是否持久化,当mq重启之后还在
// boolean exclusive 是否独占,只能有一个消费者来监听这个队列。当connection关闭时,是否删除队列。一般设置为false
// boolean autoDelete 是否自动删除,当没有consumer时,自动删除掉。一般也不自动删除。消息就是要来消费的
// Map<String,Object> arguments 参数
channel.queueDeclare("love",true,false,false,null);
// 6.发送消息
// 参数说明:
// String queue 队列名称
// boolean autoAck 是否自动确认,消费者消息一旦收到自动给MQ说一声消息收到了
// Consumer callback 回调对象
channel.basicConsume("love",true,new DefaultConsumer(channel) {
// 回调方法,当收到消息后,会自动执行该方法
// 参数说明:
// String consumerTag 标识
// Envelope envelope 获取一些信息,交换机,路由Key...
// AMQP.BasicProperties properties 配置信息
// byte[] body 数据
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag = " + consumerTag);
System.out.println("envelope.getExchange() = " + envelope.getExchange());
System.out.println("envelope.getRoutingKey() = " + envelope.getRoutingKey());
System.out.println("properties = " + properties);
System.out.println("body = " + new String(body));
}
});
// 7.释放资源? 消费者相当于监听程序,需要一直监听MQ中是否有消息,关闭资源了还怎么监听,所以不要关闭
}
}

image-20211120222012585

image-20211120222124629

Work queues工作队列

image-20211121085559859

  • Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。消费者之间对于同一个消息的关系是竞争的关系。

  • 应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:涉及短信的应用服务部署了多个,只需要有一个节点成功发送即可。

Work Queues 与入门程序的简单模式的代码几乎是一样的。消息发送发多条消息,复制多个消费者同时对消费消息做测试。

可以观察到2个消费者顺序消费消息,可以分担任务压力。注意这里一个任务只能由一个消费者消费。

image-20220305101830550

Pub/Sub 订阅模式

生产者把消息发送给交换机,==交换机把消息路由分发给队列==。消费者监听队列消费消息。

image-20211121090756051

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

  • C:消费者,消息的接收者,会一直等待消息到来

  • Queue:消息队列,接收消息、缓存消息

  • Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

    上面3种交换机类型对应着3种工作模式

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

Fanout广播模式

消息发送方(生产者):

image-20211121092704130

消费者消费队列1:

image-20211121095852182

消费者消费队列2:

image-20220305102755737

消费结果:

image-20211121095803160

小结:

  • 交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到
  • 发布订阅模式与工作队列模式的区别:
  • 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机
  • 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机

Routing路由模式

说明:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息

image-20211121100126772

是这个队列具有接收error的能力或者接收info/error/warning的能力。而消费者是在队列接收到消息后去拿过来做自己的处理,这里的routing相当于限制了这个队列能不能拿到消息。

图解:

  • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
  • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息

如果生产者发送了info路由规则的消息,则只有C2可以接收到并消费。

image-20211121101928910

如果生产者发送了error路由规则的消息,则C1,C2都可以接收到并消费。

Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。

Topics通配符模式

说明:

  • Topic类型和Direct相比,都是可以根据RountingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定RoutingKey的时候使用通配符
  • RoutingKey一般都是有一个或多个单词组成,多个单词之间以.分割,例如:item.insert
  • 通配符规则:#匹配一个或多个次,*匹配不多不少恰好一个词,例如:item.#,能够匹配item.insert.abc或者item.insert,item.*只能匹配item.insert

image-20211121102305478

图解:

  • 红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到

  • 黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配

image-20211121103643453

image-20211121103949140

Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活。

小结

  • 简单模式 HelloWorld

一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。

  • 工作队列模式 Work Queue

一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。

  • 发布订阅模式 Publish/subscribe

需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。

  • 路由模式 Routing

需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。

  • 通配符模式 Topic

需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。

Spring整合RabbitMQ

上面学习的时候,都是以amqp-client的方式编码,比较繁琐。和Spring整合之后会方便许多。

  • 生产者

① 创建生产者工程

② 添加依赖

image-20211121110615349

③ 配置整合

rabbitmq.properties

image-20211121110503924

spring-rabbitmq-producer.xml

image-20211121110357387

④ 编写代码发送消息

image-20211121111443001

MQ中的队列:发送成功~

image-20211121111618412

  • 消费者

① 创建生产者工程

② 添加依赖

同生产者

③ 配置整合

rabbitmq.properties同生产者

spring-rabbitmq-consumer.xml

image-20211121112055134

④ 编写消息监听器

image-20211121112947679

MQ中的队列:消费成功~

image-20211121112808726

SpringBoot整合RabbitMQ

当然是更简单的了,简化上面spring整合的一堆配置

生产端

  • 1.创建生产者SpringBoot工程

  • 2.引入start,依赖坐标

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 3.编写yml配置,基本信息配置
1
2
3
4
5
6
7
8
# application.yml
spring:
rabbitmq:
host: 192.168.88.128
username: pointink
password: pointink
virtual-host: /pointink
port: 5672
  • 4.定义交换机,队列以及绑定关系的配置类

RabbitMQConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME="boot_topic_exchange";
public static final String QUEUE_NAME="boot_queue";
// 1.交换机
@Bean("bootExchange")
public Exchange bootExchange() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
// 2.queue队列
@Bean("bootQueue")
public Queue bootQueue() {
return QueueBuilder.durable(QUEUE_NAME).build();
}
// 3.队列和交换机绑定
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}

image-20211121115648636

  • 5.注入RabbitTemplate,调用方法,完成消息发送
1
2
3
4
5
6
7
8
9
10
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend() {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.hah","boot mq~~~");
}
}

image-20211121115910411

消息发送成功~~~

image-20211121120142430

消费端

  • 1.创建消费者SpringBoot工程

  • 2.引入start,依赖坐标

  • 3.编写yml配置,基本信息配置

前面同生产者

  • 4.定义监听类,使用@RabbitListener注解完成队列监听

image-20211121121936920

image-20211121122009214

RabbitMQ高级特性

Productor 消息可靠性投递

​ 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式

  • confirm 确认模式
  • return 退回模式

rabbitmq整个消息投递的路径:

producer -> rabbitmq broker -> exchange -> queue -> consumer

  • 消息从 producer 到 exchange 则会返回一个 confifirmCallback

  • 消息从 exchange 到 queue 投递失败则会返回一个 returnCallback

我们将利用这两个 callback 控制消息的可靠性投递

confirm模式

  • 创建生产者工程rabbitmq-producer-spring
  • 添加依赖pom.xml

image-20211121154504509

  • 增加配置

rabbitmq.properties

image-20211121154605336

spring-rabbitmq-producer.xml

image-20211121154651059

  • 测试类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testConfirm() {
// 1.确认模式开启:ConnectionFactory中开启publisher‐confirms="true
// 2.在rabbitTemplate定义ConfirmCallBack回调函数
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 相关配置信息
* @param ack exchange是否成功收到了消息,true 成功,false 失败
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ProducerTest.confirm回调被执行了...");
if (ack) { // exchange成功接收消息
System.out.println("成功接收 = " + cause);
} else { // exchange接收消息失败
System.out.println("失败原因 = " + cause);
//做一些处理,让消息再次发送。
}
}
});
// 交换机exchange的名字写错了,到不了交换机=>报错
rabbitTemplate.convertAndSend("text_exchange_confirm111","confirm","hello message");
}

}
  • 执行结果

image-20211121154758157

return模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 回退模式:当消息发送给exchange后,exchange路由到queue失败了才会执行
* 1.开启回退模式
* 2.设置returnCallBack
* 3.设置exchange处理消息的模式
* 1.如果消息没有路由到queue,则丢弃消息(默认)
* 2.如果消息没有路由到queue,返回给消息发送方returnCallBack
*/
@Test
public void testReturn() {
// 设置交换机处理失败消息的模式,默认是丢弃的.设置该项可以拿到return
rabbitTemplate.setMandatory(true);

// 2.定义回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
*
* @param message 消息对象
* @param replyCode 错误码
* @param replyText 错误信息
* @param exchange 交换机
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("message = " + message);
System.out.println("replyCode = " + replyCode);
System.out.println("replyText = " + replyText);
System.out.println("exchange = " + exchange);
System.out.println("routingKey = " + routingKey);
System.out.println("ProducerTest.returnedMessage回调执行了...");
// 处理 ....
}
});
// 路由的名字写错了,exchange自然到不了queue,可以触发return
rabbitTemplate.convertAndSend("test_exchange_confirm","confirm11","hello message");
}

image-20211121160248101

小结

  • 对于确认模式
    • 设置ConnectionFactory的publisher-confirms=”true”开启确认模式
    • 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功.如果为false,则发送失败.需要处理
  • 对于退回模式
    • 设置ConnectionFactory的publisher-returns=”true”开启退回模式
    • 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage.

在RabbitMQ中也提供了事务机制,但是性能较差,此处不做讲解。

使用channel列方法,完成事务控制:

txSelect(), 用于将当前channel设置成transaction模式

txCommit(),用于提交事务

txRollback(),用于回滚事务

Consumer ACK

ack指 Acknowledge,确认。 表示消费端收到消息后的确认方式。

有三种确认方式:

• 自动确认:acknowledge=”none

• 手动确认:acknowledge=”manual

• 根据异常情况确认:acknowledge=”auto“,(这种方式使用麻烦,不作讲解)

​ 其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失

​ 如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck()手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息

配置文件

image-20211121163756323

测试ack代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component // 这里注册到容器,默认的bean名字是ackListener,所以上面的rabbit:listener ref设置的是ackListener
public class AckListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 1.接收转换消息
System.out.println("AckListener.onMessage" + new String(message.getBody()));
// 2.处理业务逻辑 long deliveryTag 标识, boolean multiple 允许多条消息被签收
System.out.println("处理业务逻辑....");
int i = 1 / 0;
// 3.手动签收
channel.basicAck(deliveryTag,true);
} catch (IOException e) {
// 4.出现异常,拒绝签收.
// 第三个参数requeue,重回队列:如果设置为true,则消息重新回调queue,broker会重新发送消息给消费端
// (推荐)
channel.basicNack(deliveryTag,true,true);
// 了解
// channel.basicReject(deliveryTag,true);
}
}
}

image-20211121163736797

测试结果

出现业务夜异常,没有手动签收,消息重回队列.这样得业务异常解决后.可以重新消费消息

image-20211121163930503

小结:

  • 在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认

  • 如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息

  • 如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。

如何保证消息的高可靠性传输?

  • 1.持久化

    • exchange要持久化
    • queue要持久化
    • message要持久化
  • 2.生产方确认confirm

  • 3.消费方确认Ack

  • 4.Broker高可用

消费端限流

image-20211121165050688

​ 如上图所示:如果在A系统中需要维护相关的业务功能,可能需要将A系统的服务停止,那么这个时候消息的生产者还是一直会向MQ中发送待处理的消息,消费者此时服务已经关闭,导致大量的消息都会在MQ中累积。如果当A系统成功启动后,默认情况下消息的消费者会一次性将MQ中累积的大量的消息全部拉取到自己的服务,导致服务在短时间内会处理大量的业务,可能会导致系统服务的崩溃。 所以消费端限流是非常有必要的。

​ 可以通过MQ中的 listener-container 配置属性 perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。

image-20211121165920896

  • 在rabbit:listener-container 中配置 prefetch属性设置消费端一次拉取多少消息 (比如配置1000条,就是5000/1000限流的效果)

  • 消费端的确认模式一定为手动确认。acknowledge=”manual”

TTL

​ TTL 全称 Time To Live(存活时间/过期时间)。当消息到达存活时间后,还没有被消费,会被自动清除。

​ RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

image-20211121170146467

可以在RabbitMQ管理控制台设置过期时间,此处不做重点讲解。

管理者界面设置过期

image-20211121171334814

代码设置队列过期

  • 1.在消息的生产方中,在 spring-rabbitmq-producer.xml 配置文件中,添加如下配置

image-20211121170247042

  • 2.编写发送消息测试方法

image-20211121172127273

测试结果:当消息发送成功后,过100s后在RabbitMQ的管理控制台会看到消息会自动删除

设置单个消息过期

并且设置队列的过期时间为100s, 单个消息的过期时间为5s

image-20211121172438090

如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。

队列过期后,会将队列所有消息全部移除

消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉) =>符合正常思维,总不能每次还没到消费这条消息,我就要去遍历整个队列。判断哪条过期了吧(效率低,性能低)

小结:

  • 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。

  • 设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。

  • 如果两者都进行了设置,以时间短的为准。

死信队列

​ 死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

image-20211121172938918

消息成为死信的三种情况:

  • 1.队列消息长度到达限制;

  • 2.消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;(不重回队列)

  • 3.原队列存在消息过期设置,消息到达超时时间未被消费;

队列绑定死信交换机:

给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

相当于给队列发送消息,需要交换机和队列~~

image-20211121173048785

死信队列配置

image-20211121174259369

死信队列测试:

image-20211121174103144

image-20211121174833659

小结:

  • 死信交换机和死信队列和普通的没有区别

  • 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

  • 消息成为死信的三种情况:

    • 队列消息长度到达限制;
    • 消费者拒接消费消息,并且不重回队列;
    • 原队列存在消息过期设置,消息到达超时时间未被消费;

延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

提出需求场景:

  • 1.下单后,30分钟未支付,取消订单,回滚库存。

ps:下完订单,过时未消费,进入死信队列,订单状态修改为超时

  • 2.新用户注册成功7天后,发送短信问候。

实现方式:

  • 1.定时器 (不优雅)

  • 2.延迟队列

image-20211121175756092

注意:在RabbitMQ中并未提供延迟队列功能。

但是可以使用:**TTL+**死信队列 组合实现延迟队列的效果

image-20211121175833555

image-20211121175623251

测试代码

image-20211121180050936

image-20211121180306397

消费端监听的时候死信队列哈!

image-20211121180507430

image-20220305115328324

小结

  • 延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。

  • RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + DLX 来实现延迟队列效果。

日志与监控

RabbitMQ日志

RabbitMQ默认日志存放路径: /var/log/rabbitmq/rabbit@xxx.log

RabbitMQ 日志所在的目录

image-20211121180732705

RabbitMQ日志详细信息:

​ 日志包含了RabbitMQ的版本号、Erlang的版本号、RabbitMQ服务节点名称、cookie的hash值、RabbitMQ配置文件地址、内存限制、磁盘限制、默认账户guest的创建以及权限配置等等

web管控台监控

​ 直接访问当前的IP:15672,输入用户名和密码(默认是 guest),就可以查看RabbitMQ的管理控制台。当然也可通过命令的形式来查看。如下:

  • 查看主机:rabbitmqctl list_vhosts

image-20211121181142572

  • 查看用户: rabbitmqctl list_users

image-20211121181220076

  • 查看连接:rabbitmqctl list_connections
  • 其它相关命令(了解):
  • 查看exchanges:rabbitmqctl list_exchanges
  • 查看消费者信息:rabbitmqctl list_consumers
  • 查看环境变量:rabbitmqctl environment
  • 查看未被确认的队列:rabbitmqctl list_queues name messages_unacknowledged
  • 查看单个队列的内存使用:rabbitmqctl list_queues name memory
  • 查看准备就绪的队列:rabbitmqctl list_queues name messages_ready

消息可靠性与追踪

​ 而言,可能是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。

​ 在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。

Firehose

​ fifirehose的机制是将生产者投递给rabbitmq的消息,rabbitmq投递给消费者的消息按照指定的格式发送到默认的exchange上。这个默认的exchange的名称为 amq.rabbitmq.trace,它是一个topic类型的exchange。发送到这个exchange上的消息的routing key为 publish.exchangename 和 deliver.queuename。其中exchangename和queuename为实际exchange和queue的名称,分别对应生产者投递到exchange的消息,和消费者从queue上获取的消息。

image-20211121181458945

image-20211121181515021

注意:打开 trace 会影响消息写入功能,适当打开后请关闭。

rabbitmqctl trace_on:开启Firehose命令

rabbitmqctl trace_on [-p <vhost>] 不指定vhost默认是/

消息追踪验证

  • 1.创建一个队列 test_trace,并将当前的队列绑定到 amq.rabbitmq.trace 交换机上,设置RoutingKey为:#

image-20211121181613141

  • 2.未开启消息追踪之前,我们发送一个消息

image-20211121181632053

当前消息发送成功后,在控制台我们可以看到当前消息的具体信息

  • 设置开启消息追踪,在发送一条消息

image-20211121181649593

image-20211121181710876

​ 我们发现当前消息也正常存在,并且开启消息追踪后,会多出一条消息是 amq.rabbitmq.trace 交换机发给当前队列的消息,消息中的内容是比较完整的。

建议:在开发阶段我们可以开启消息追踪,在实际生产环境建议将其关闭

rabbitmqctl trace_off:关闭Firehose命令

rabbitmq_tracing

rabbitmq_tracing和Firehose在实现上如出一辙,只不过rabbitmq_tracing的方式比Firehose多了一层GUI的包装,更容易使用和管理。

  • rabbitmq-plugins list查看插件列表

启用插件:rabbitmq-plugins enable rabbitmq_tracing

image-20211121182906339

发送消息成功后,我们点击日志文件,要求输入RabbitMQ的登录用户名和密码。

image-20211121182926860

建议:在开发阶段我们可以开启消息追踪插件,在实际生产环境不建议建议开启,除非是非常特殊的业务场景,大家根据实际情况选择开启即可

RabbitMQ应用问题

消息可靠性保障

提出需求:如何能够保证消息的 100% 发送成功?

首先大家要明确任何一个系统都不能保证消息的 100% 投递成功,我们是可以保证消息以最高最可靠的发送给目标方。

在RabbitMQ中采用 消息补充机制 来保证消息的可靠性

image-20211121183850784

步骤分析:

参与部分:消息生产者、消息消费者、数据库、三个队列(Q1、Q2、Q3)、交换机、回调检查服务、定时检查服务

  1. 消息的生产者将业务数据存到数据库中

  2. 发送消息给 队列Q1

  3. 消息的生产者等待一定的时间后,再发送一个延迟消息给队列 Q3

  4. 消息的消费方监听 Q1 队列消息,成功接收后

  5. 消息的消费方会 发送 一条确认消息给 队列Q2

  6. 回调检查服务监听 队列Q2 发送的确认消息

  7. 回调检查服务接收到确认消息后,将消息写入到 消息的数据库表中

  8. 回调检查服务同时也会监听 队列Q3延迟消息, 如果接收到消息会和数据库比对消息的唯一标识

  9. 如果发现没有接收到确认消息,那么回调检查服务就会远程调用 消息生产者,重新发送消息

  10. 重新执行 2-7 步骤,保证消息的可靠性传输

  11. 如果发送消息和延迟消息都出现异常,定时检查服务会监控 消息库中的消息数据,如果发现不一致的消息然后远程调用消息的生产者重新发送消息

消息幂等性处理

​ 幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。

​ 在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。

​ 在本教程中使用 乐观锁机制 保证消息的幂等操作

场景:假设说下面的流程中consumer由于未知原因没有发送确认消息。MDB中和DB中不一致。定时检查/回调检查服务会让producer再发送一条。这时候就会有2条一样的消息。

image-20211121184440344

RabbitMQ集群搭建

摘要:实际生产应用中都会采用消息队列的集群方案,如果选择RabbitMQ那么有必要了解下它的集群方案原理

​ 一般来说,如果只是为了学习RabbitMQ或者验证业务工程的正确性那么在本地环境或者测试环境上使用其单实例部署就可以了,但是出于MQ中间件本身的可靠性、并发性、吞吐量和消息堆积能力等问题的考虑,在生产环境上一般都会考虑使用RabbitMQ的集群方案

​ 如果你真用到了消息队列,那么你的业务应该是相对重要的。必须要保障可用性。

集群方案的原理

​ RabbitMQ这款消息队列中间件产品本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的magic cookie来实现)。因此,RabbitMQ天然支持Clustering。这使得RabbitMQ本身不需要像ActiveMQ、Kafka那样通过ZooKeeper分别来实现HA方案和保存集群的元数据。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的。

image-20211121185350859

单机多实例部署

​ 由于某些因素的限制,有时候你不得不在一台机器上去搭建一个rabbitmq集群,这个有点类似zookeeper的单机版。真实生成环境还是要配成多机集群的。有关怎么配置多机集群的可以参考其他的资料,这里主要论述如何在单机中配置多个rabbitmq实例。

主要参考官方文档:https://www.rabbitmq.com/clustering.html

首先确保RabbitMQ运行没有问题:

1
rabbitmqctl status

停止rabbitmq服务:

1
service rabbitmq‐server stop 

启动第一个节点:(5672留给HAproxy用)

1
RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit1 rabbitmq‐server start

image-20220305121753640

启动第二个节点:web管理插件端口占用,所以还要指定其web插件占用的端口号

1
RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="‐rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit2 rabbitmq‐server start

image-20220305121824915

结束命令:

1
2
rabbitmqctl ‐n rabbit1 stop 
rabbitmqctl ‐n rabbit2 stop

rabbit1操作作为主节点:

1
2
3
4
5
6
7
[root@super ~]# rabbitmqctl ‐n rabbit1 stop_app 
Stopping node rabbit1@super ...
[root@super ~]# rabbitmqctl ‐n rabbit1 reset
Resetting node rabbit1@super ...
[root@super ~]# rabbitmqctl ‐n rabbit1 start_app
Starting node rabbit1@super ...
[root@super ~]#

rabbit2操作为从节点:

1
2
3
4
5
6
7
8
[root@super ~]# rabbitmqctl ‐n rabbit2 stop_app 
Stopping node rabbit2@super ...
[root@super ~]# rabbitmqctl ‐n rabbit2 reset
Resetting node rabbit2@super ...
[root@super ~]# rabbitmqctl ‐n rabbit2 join_cluster rabbit1@'super' # ''内是主机名换成自己的
Clustering node rabbit2@super with rabbit1@super ...
[root@super ~]# rabbitmqctl ‐n rabbit2 start_app
Starting node rabbit2@super ...

查看集群状态:

1
rabbitmqctl cluster_status ‐n rabbit1

web监控:

image-20211121185918179

集群管理

rabbitmqctl join_cluster {cluster_node} [–ram] 将节点加入指定集群中。在这个命令执行前需要停止RabbitMQ应用并重置节点。

rabbitmqctl cluster_status 显示集群的状态。

rabbitmqctl change_cluster_node_type {disc|ram} 修改集群节点的类型。在这个命令执行前需要停止RabbitMQ应用。

rabbitmqctl forget_cluster_node [–offlfflffline] 将节点从集群中删除,允许离线执行。

rabbitmqctl update_cluster_nodes {clusternode}

​ 在集群中的节点应用启动前咨询clusternode节点的最新信息,并更新相应的集群信息。这个和join_cluster不同,它不加入集群。考虑这样一种情况,节点A和节点B都在集群中,当节点A离线了,节点C又和节点B组成了一个集群,然后节点B又离开了集群,当A醒来的时候,它会尝试联系节点B,但是这样会失败,因为节点B已经不在集群中了。

rabbitmqctl cancel_sync_queue [-p vhost] {queue} 取消队列queue同步镜像的操作。

rabbitmqctl set_cluster_name {name} 设置集群名称。集群名称在客户端连接时会通报给客户端。Federation和Shovel插件也会有用到集群名称的地方。集群名称默认是集群中第一个节点的名称,通过这个命令可以重新设置。

RabbitMQ镜像集群配置

同步队列的数据

​ 上面已经完成RabbitMQ默认集群模式,但并不保证队列的高可用性,尽管交换机、绑定这些可以复制到集群里的任何一个节点,但是队列内容不会复制。虽然该模式解决一项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能等待重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,必须要创建镜像队列。

​ 镜像队列是基于普通的集群模式的,然后再添加一些策略,所以你还是得先配置普通集群,然后才能设置镜像队列,我们就以上面的集群接着做。

设置的镜像队列可以通过开启的网页的管理端Admin->Policies,也可以通过命令。

  • 命令操作方式:
1
rabbitmqctl set_policy my_ha "^" '{"ha-mode":"all"}'
  • 界面操作方式:

image-20211121190407321

  • Name:策略名称

  • Pattern:匹配的规则,如果是匹配所有的队列,是^.

  • Defifinition:使用ha-mode模式中的all,也就是同步所有匹配的队列。问号链接帮助文档

负载均衡-HAProxy

如果没有做HAProxy负载均衡,就不知道要访问那个节点的消息

​ HAProxy提供高可用性、负载均衡以及基于TCP和HTTP应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案,包括Twitter,Reddit,StackOverflflow,GitHub在内的多家知名互联网公司在使用。HAProxy实现了一种事件驱动、单一进程模型,此模型支持非常大的并发连接数

  • 安装HAProxy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//下载依赖包 
yum install gcc vim wget
//上传haproxy源码包
//解压
tar -zxvf haproxy-1.6.5.tar.gz -C /usr/local
//进入目录、进行编译、安装
cd /usr/local/haproxy-1.6.5
make TARGET=linux31 PREFIX=/usr/local/haproxy
make install PREFIX=/usr/local/haproxy
mkdir /etc/haproxy
//赋权
groupadd -r -g 149 haproxy
useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy
//创建haproxy配置文件
mkdir /etc/haproxy
vim /etc/haproxy/haproxy.cfg
  • 配置HAProxy

配置文件路径:/etc/haproxy/haproxy.cfg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#logging options
global
log 127.0.0.1 local0 info
maxconn 5120
chroot /usr/local/haproxy
uid 99
gid 99
daemon
quiet
nbproc 20
pidfile /var/run/haproxy.pid

defaults
log global
#使用4层代理模式,”mode http”为7层代理模式
mode tcp
#if you set mode to tcp,then you nust change tcplog into httplog
option tcplog
option dontlognull
retries 3
option redispatch
maxconn 2000
contimeout 10s
##客户端空闲超时时间为 60秒 则HA 发起重连机制
clitimeout 10s
##服务器端链接超时时间为 15秒 则HA 发起重连机制
srvtimeout 10s
#front-end IP for consumers and producters

listen rabbitmq_cluster
bind 0.0.0.0:5672 # 对外提供服务的端口
#配置TCP模式
mode tcp
#balance url_param userid
#balance url_param session_id check_post 64
#balance hdr(User-Agent)
#balance hdr(host)
#balance hdr(Host) use_domain_only
#balance rdp-cookie
#balance leastconn
#balance source //ip
#简单的轮询
balance roundrobin
#rabbitmq集群节点配置 #inter 每隔五秒对mq集群做健康检查, 2次正确证明服务器可用,2次失败证明服务器不可用,并且配置主备机制
server bhz71 192.168.11.71:5672 check inter 5000 rise 2 fall 2
server bhz72 192.168.11.72:5672 check inter 5000 rise 2 fall 2
server bhz73 192.168.11.73:5672 check inter 5000 rise 2 fall 2
#配置haproxy web监控,查看统计信息
listen stats
bind 192.168.11.74:8100
mode http
option httplog
stats enable
#设置haproxy监控地址为http://localhost:8100/rabbitmq-stats
stats uri /rabbitmq-stats
stats refresh 5s

启动HAproxy负载

1
2
3
4
5
/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
# 查看haproxy进程状态
ps -ef | grep haproxy
# 访问如下地址对mq节点进行监控
http://172.16.98.133:8100/rabbitmq‐stats

代码中访问mq集群地址,则变为访问haproxy地址:5672

image-20220305123401960

问题

rabbitMQ 消息队列消息丢失问题

1
2
Q: 启用生产者confirm和return确认,消费者的手动ack,以及队列的持久化,并且是持久化之后才返回对生产者的确认,这种配置下什么情况会消息投递失败
A: 消息不丢失在应用层面能做得事基本就是你说的三个点,生产、传输、消费。如果再丢失的话就是些极端的系统类问题了。比如kafka虽然设置了持久化,但是从PageCache到真正flush到硬盘的过程中断电了,那还是会丢失。

image-20211118134936171

1
2
3
4
5
6
7
Q:那么rabbitmq是不是在实际工作中不需要上面复杂的流程也能满足基本需求,因为感觉这个流程很影响性能
A: 对的。这种策略属于应用层面的补偿机制。借数据库来打个保障。但实际上现实中也不怎么用。因为真的太麻烦了

Q: 还有问下生产者确认中,为什么不直接弄一个生产者到队列失败的回调,而要分成return和confirm
A:这两种应对的场景不太一样。confirm是告诉你到路由器了。能不能到队列不一定。return表示的无法路由,或者路由不存在,或者找不到队列投递
Q: 对应的处理是不是confirm失败就去重试,然后return失败直接抛异常
A:对。confirm有可能是broker没收到。return一般就是队列或者路由设置有问题了

Spring Boot 依赖无法点击进入源码

image-20211118180529794

1
2
1、点击右边Maven按钮,右键点击项目名,然后Download Documentation
2、如果下载文档后还是不行,尝试重启 IntelliJ IDEA

1.熔断器的线程隔离,线程池已满直接拒绝请求,课程中没有详细讲,这个是需要配置的还是本身就有的?

feign默认依赖了hystrix,但并未开启,开启后配置了线程隔离就能生效。

2.dubbo架构里面的monitor监控中心,是后面讲的dubbo-admin管理控制台吗?

monitor和admin不是一个东西,是两个不同的应用。

3.bus消息总线是如何知道要通知哪些服务的?是哪些服务曾经调用了configServer,就把地址记录下来,然后刷新的时候就去调用记录的地址吗

默认情况下ConfigClient实例都监听MQ中同一个topic,默认是SpringCloudBus。这东西实现的不咋滴,也少有人用。了解一下即可。配置最佳方案应该是 nacos 或者consul

4.rabbitmq集群,如果应用分布式部署,订单应用有两个,会不会出现应用1和应用2监听了集群不同rabbitmq服务的情况

不会的 rabbitmq集群的元数据是共享的,rabbitmq集群可以看成一个整体

5.如果是镜像队列集群模式会出现数据不共享的吗?如果是这种镜像集群呢,会复制队列内容到每一个节点吗?这样每个节点的那个队列都会保存同一条消息吧,会不会存在人为造成的消息重复消费的情况?

队列数据是不共享的只有元数据才共享 你的数据只会存在一个节点上其他的节点会起一个路由作用

6.又想到个问题,如果仅仅只是起路由作用的话,那么真正保存数据的那个节点挂了,其他节点不就获取不到那个数据了吗,只有等那个挂掉的节点重启了才能获取?

是的,所以才需要镜像队列来进行数据备份。

7.镜像队列的话,那不就每个节点都存了一份真实数据

这个需要你设置镜像队列啊 只有设置了 镜像队列才会同步数据。

并且如果节点没有问题不会使用镜像队列的数据 只有节点挂掉才会切换到镜像队列

8.老师,rabbitmq集群中有个haproxy做负载均衡,但是因为只有一个节点的队列负责处理真实数据,其他节点只是路由转发,所以是否可以理解为负载均衡只是在创建队列时有用,而在发消息过程中没有用,因为不管负载均衡到哪个节点,都会路由到真实数据队列的那个节点@虚竹 

不是 haporxy是为了保证高可用,其中一个rabbitmq挂了不至于还能够连接到其他的节点

9.高可用的话不需要haporxy也能保证啊,spring.rabbitmq.addresses=192.168.79.128:5672,192.168.79.135:5672,这样配置所有的节点地址不也是可以的吗

但是节点扩展呢 你这个不具有可扩展性,一般真实使用的都是配置的域名,你这个如果ip出现变化 或者增加一个节点就需要改动代码

10.嗯嗯,想确认下haproxy负载均衡是不是只会体现在创建队列上,发消息和接收消息实际上是没有负载均衡的

发消息和接收消息肯定用得到啊,haproxy 是做代理的 你用这个代理地址通讯的

11.比如a节点保存了真实队列,haproxy发给了b节点,b节点又会转发给a节点,这不还是请求的a节点吗

是的,如果站在rabbitmq的角度来说是的,但是站在全局的角度来说 肯定是需要haproxy的

12.实际工作当中,docker构建springboot的镜像,会把哪些软件一起弄进去,需要依赖的数据库,消息中间件,配置中心,es这些会一起进去吗

不会 对于中间件都是单独部署的

13.docker说是解决了跨环境迁移导致的水土不服问题,但是如果说依赖的那些软件需要单独部署的话,它说解决的除了jdk的话还有哪些环境上的问题?Linux服务器本身的配置?

对于中间件之类的服务 一般都是非常耗费资源的,需要单独占用一台服务器,如果需要做到容器化可以使用k8s做任务编排,如果只是手动启动docker的话 推荐单独部署,以为一台机器单独部署一个docker 装一个服务 没有多大意思,如果只是自己完 那随意

14.那一般docker的话主要还是用来部署微服务的工程,因为项目本来就小,资源占用小,还能服务编排批量管理是吧

是的

springboot源码无法下载

Sources not found for: org.springframework.boot:spring-boot:2.6.0

RabbitMQ高级使用

RabbitMQ高阶使用

能力目标

  • 能够理解和使用RabbitMQ延时任务
  • 能够理解和使用RabbitMQ的死信队列
  • 能够理解RabbitMQ的生产和消费的保证
  • 能够使用zset实现排队(排行榜)功能
  • 能够理解websocket消息推送服务的原理和方案

1. 从打车开始说起

image-20230922160430652

我们把滴滴打车的流程简化下

image-20230922160424029

  1. 登录app后点击打车开始进行打车
  2. 打车服务开始为司机派单
  3. 司机接单后开始给来接驾
  4. 上车乘客后处于行程中
  5. 行程结束后完成本次打车服务

1.1 需要解决的问题

我们需要实现派单服务,用户发送打车订单后需要进行进行派单,如果在指定时间内没有找到司机就会收到派单超时的通知,并且能够实时查看当前排队的抢单人数

image-20230922160416508

下面我们来介绍下涉打车涉及到的一些问题

1.1.2 打车超时

主要讲解打车服务在超时后的处理,比如打车后等待多长时间没有打到车后会通知等待超时。

image-20230922160458218

1.1.2 打车排队

因为排队我们需要获取当前的排队人数,所以需要通过redis的zset结构来实现排队功能,后面会详解讲解

image-20230922160230413

1.1.3 消息推送

我们需要将我们的异步处理结构返回到客户端,我们的客户端是使用的websocket连接的,因为websocket是点对点连接的,连接到一台固定的通知服务后,只能从这一台通知服务来获取数据,因为我们的通知服务允许分布式部署,这个问题改如何解决?

后面我们会一个个的来进行解决,我们先来学习下打车超时的问题

2. 延时队列

我们先看下如何来解决打车超时的问题

2.1 什么是延时队列

在开发中,往往会遇到一些关于延时任务的需求。例如

  • 生成订单30分钟未支付,则自动取消
  • 生成订单60秒后,给用户发短信
  • 滴滴打车订单完成后,如果用户一直不评价,48小时后会将自动评价为5星。
2.1.1 和定时任务区别

对上述的任务,我们给一个专业的名字来形容,那就是延时任务。那么这里就会产生一个问题,这个延时任务定时任务的区别究竟在哪里呢?一共有如下几点区别

  1. 定时任务有明确的触发时间,延时任务没有
  2. 定时任务有执行周期,而延时任务在某事件触发后一段时间内执行,没有执行周期
  3. 定时任务一般执行的是批处理操作是多个任务,而延时任务一般是单个任务

2.2 延时队列使用场景

那么什么时候需要用延时队列呢?考虑一下以下场景:

  1. 订单在十分钟之内未支付则自动取消。
  2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  3. 账单在一周内未支付,则自动结算。
  4. 用户注册成功后,如果三天内没有登陆则进行短信提醒。
  5. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  6. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

可以想一下美团点餐,超时时间

2.3 常见方案

下面我们来介绍下常见的延时任务的解决方案

2.3.1 数据库轮询

​ 该方案通常是在小型项目中使用,即通过一个线程定时的去扫描数据库,通过订单时间来判断是否有超时的订单,然后进行update或delete等操作

优点

​ 简单易行,支持集群操作

缺点
  1. 对服务器内存消耗大
  2. 存在延迟,比如你每隔3分钟扫描一次,那最坏的延迟时间就是3分钟
  3. 假设你的订单有几千万条,每隔几分钟这样扫描一次,数据库损耗极大
2.3.2 JDK的延迟队列

​ 该方案是利用JDK自带的DelayQueue来实现,这是一个无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素,放入DelayQueue中的对象,是必须实现Delayed接口的。

image-20230922160212084

优点

效率高,任务触发时间延迟低。

缺点
  1. 服务器重启后,数据全部消失,怕宕机
  2. 集群扩展相当麻烦
  3. 因为内存条件限制的原因,比如下单未付款的订单数太多,那么很容易就出现OOM异常
  4. 代码复杂度较高
2.3.3 netty时间轮算法

image-20230922160216924

​ 时间轮算法可以类比于时钟,如上图箭头(指针)按某一个方向按固定频率轮动,每一次跳动称为一个 tick。这样可以看出定时轮由个3个重要的属性参数,ticksPerWheel(一轮的tick数),tickDuration(一个tick的持续时间)以及 timeUnit(时间单位),例如当ticksPerWheel=60,tickDuration=1,timeUnit=秒,这就和现实中的始终的秒针走动完全类似了。

​ 如果当前指针指在1上面,我有一个任务需要4秒以后执行,那么这个执行的线程回调或者消息将会被放在5上。那如果需要在20秒之后执行怎么办,由于这个环形结构槽数只到8,如果要20秒,指针需要多转2圈。位置是在2圈之后的5上面(20 % 8 + 1)

优点

效率高,任务触发时间延迟时间比delayQueue低,代码复杂度比delayQueue低。

缺点
  • 服务器重启后,数据全部消失,怕宕机
  • 集群扩展相当麻烦
  • 因为内存条件限制的原因,比如下单未付款的订单数太多,那么很容易就出现OOM异常
2.3.4 使用消息队列

image-20230922160221122

我们可以采用rabbitMQ的延时队列,RabbitMQ具有以下两个特性,可以实现延迟队列

  • RabbitMQ可以针对Queue和Message设置 x-message-ttl,来控制消息的生存时间,如果超时,则消息变为dead letter
  • RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,用来控制队列内出现了deadletter,则按照这两个参数重新路由。
优点

​ 高效,可以利用rabbitmq的分布式特性轻易的进行横向扩展,消息支持持久化增加了可靠性。

缺点

​ 本身的易用度要依赖于rabbitMq的运维.因为要引用rabbitMq,所以复杂度和成本变高

2.4 延时队列

RabbitMQ中没有对消息延迟进行实现,但是我们可以通过TTL以及死信路由来实现消息延迟。

image-20230922160141541

2.4.1 TTL(消息过期时间)

在介绍延时队列之前,还需要先介绍一下RabbitMQ中的一个高级特性——TTL(Time To Live)

TTL是RabbitMQ中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了TTL属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为“死信”,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

2.4.1.1 配置队列TTL

一种是在创建队列的时候设置队列的“x-message-ttl”属性

1
2
3
4
5
6
@Bean
public Queue taxiOverQueue() {
Map<String, Object> args = new HashMap<>(2);
args.put("x-message-ttl", 30000);
return QueueBuilder.durable(TAXI_OVER_QUEUE).withArguments(args).build();
}

这样所有被投递到该队列的消息都最多不会存活超过30s,但是消息会到哪里呢,如果没有任何处理,消息会被丢弃,如果配置有死信队列,超时的消息会被投递到死信队列

2.5 死信队列

讲到延时消息就不能不讲死信队列

2.5.1 什么是死信队列

​ 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列;

2.5.2 死信队列使用场景

RabbitMQ中的死信交换器(dead letter exchange)可以接收下面三种场景中的消息:

  • 消费者对消息使用了basicReject或者basicNack回复,并且requeue参数设置为false,即不再将该消息重新在消费者间进行投递.
  • 消息在队列中超时. RabbitMQ可以在单个消息或者队列中设置TTL属性.
  • 队列中的消息已经超过其设置的最大消息个数.
2.5.3 死信队列如何使用

​ 死信交换器不是默认的设置,这里是被投递消息被拒绝后的一个可选行为,是在创建队列的时进行声明的,往往用在对问题消息的诊断上。

​ 死信交换器仍然只是一个普通的交换器,创建时并没有特别要求和操作。在创建队列的时候,声明该交换器将用作保存被拒绝的消息即可,相关的参数是x-dead-letter-exchange

image-20230922160133767

2.5.4 相关代码
1
2
3
4
5
6
7
8
9
@Bean
public Queue taxiOverQueue() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", TAXI_DEAD_QUEUE_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", TAXI_DEAD_KEY);
return QueueBuilder.durable(TAXI_OVER_QUEUE).withArguments(args).build();
}

2.6 打车超时处理

用户通过调用打车服务将数据放进RabbitMQ的死信队列进行延时操作,等待一段时间后,正常的业务处理还没有处理到我们发起的数据,将会进行超时处理,通过通知服务将我们的处理结构通过websocket方式推送到我们的客户端。

image-20230922160057498

2.6.1 打车超时实现

在创建队列的时候配置死信交换器并设置队列的“x-message-ttl”属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Bean
public Queue taxiDeadQueue() {
return new Queue(TAXI_DEAD_QUEUE,true);
}

@Bean
public Queue taxiOverQueue() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", TAXI_DEAD_QUEUE_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", TAXI_DEAD_KEY);
// x-message-ttl 声明队列的TTL
args.put("x-message-ttl", 30000);
return QueueBuilder.durable(TAXI_OVER_QUEUE).withArguments(args).build();
}

这样所有被投递到该队列的消息都最多不会存活超过30s,超时后的消息会被投递到死信交换器

3. RabbitMQ消息可靠性保障

消息的可靠性投递是使用消息中间件不可避免的问题,不管是使用kafka、rocketMQ或者rabbitMQ,那么在RabbitMQ中如何保证消息的可靠性投递呢?

image-20230922160101898

从上面的图可以看到,消息的投递有三个对象参与:

  • 生产者
  • broker
  • 消费者

3.1 生产者保证

生产者发送消息到broker时,要保证消息的可靠性,主要的方案有以下2种

  • 失败通知
  • 发送方确认
3.1.1 RabbitMQ回顾

​ 生产者通过指定一个 exchange 和 routingkey 把消息送达到某个队列中去,然后消费者监听队列,进行消费处理, 但是在某些情况下,如果我们在发送消息时,当前的 exchange 不存在或者指定的 routingkey 路由不到,这个时候如果要监听这种不可达的消息,这个时候就需要失败通知。

image-20230922160106836

​ 不做任何配置的情况下,生产者是不知道消息是否真正到达RabbitMQ,也就是说消息发布操作不返回任何消息给生产者。

3.1.2 失败通知

如果出现消息无法投递到队列会出现失败通知

​ 那么怎么保证我们消息发布的可靠性?这里我们就可以启动失败通知,在原生编程中在发送消息时设置mandatory标志,即可开启故障检测模式。

image-20230922160109958

​ 注意:它只会让 RabbitMQ 向你通知失败,而不会通知成功。如果消息正确路由到队列,则发布者不会受到任何通知。带来的问题是无法确保发布消息一定是成功的,因为通知失败的消息可能会丢失。

3.1.2.1 实现方式

spring配置

1
2
3
4
spring:
rabbitmq:
# 消息在未被队列收到的情况下返回
publisher-returns: true

关键代码,注意需要发送者实现ReturnCallback接口方可实现失败通知

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 失败通知
* 队列投递错误应答
* 只有投递队列错误才会应答
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
//消息体为空直接返回
if (null == message) {
return;
}
TaxiBO taxiBO = JSON.parseObject(message.getBody(), TaxiBO.class);
if (null != taxiBO) {
//删除rediskey
redisHelper.handelAccountTaxi(taxiBO.getAccountId());
//记录错误日志
recordErrorMessage(taxiBO, replyText, exchange, routingKey, message, replyCode);
}
}
3.1.2.2 遇到的问题问题

​ 如果消息正确路由到队列,则发布者不会受到任何通知。带来的问题是无法确保发布消息一定是成功的,因为通知失败的消息可能会丢失。

​ 我们可以使用RabbitMQ的发送方确认来实现,它不仅仅在路由失败的时候给我们发送消息,并且能够在消息路由成功的时候也给我们发送消息。

3.1.3 发送发确认

发送方确认是指生产者投递消息后,如果 Broker 接收到消息,则会给生产者一个应答,生产者进行接收应答,用来确认这条消息是否正常的发送到 Broker,这种方式也是消息可靠性投递的核心保障

rabbitmq消息发送分为两个阶段:

  • 将消息发送到broker,即发送到exchage交换机
  • 消息通过交换机exchange被路由到队列queue

​ 一旦消息投递到队列,队列则会向生产者发送一个通知,如果设置了消息持久化到磁盘,则会等待消息持久化到磁盘之后再发送通知

注意:发送发确认只有出现RabbitMQ内部错误无法投递才会出现发送发确认失败。

发送方确认模式需要分两种情况下列来看,首先我们先来看一看消息不可路由的情况

3.1.3.1 不可路由

当前消息到达交换器后对于发送者确认是成功的

image-20230922160114774

​ 首先当RabbitMQ交换器不可路由时,消息也根本不会投递到队列中,所以这里只管到交换器的路径,当消息成功送到交换器后,就会进行确认操作

​ 另外在这过程中,生产者收到了确认消息后,那么因为消息无法路由,所以该消息也是无效的,无法投递到队列,所以一般情况下这里会结合失败通知来一同使用,这里一般会进行设置mandatory模式,失败则会调用addReturnListener监听器来进行处理。

发送方确认模式的另一种情况肯定就是消息可以进行路由

3.1.3.2 可以路由

只要消息能够到达队列即可进行确认,一般是RabbitMQ发生内部错误才会出现失败

image-20230922160119152

​ 可以路由的消息,要等到消息被投递到所有匹配的队列之后,broker会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了。

​ 如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号。

3.1.3.3 使用方式

spring配置

1
2
3
4
spring:
rabbitmq:
# 开启消息确认机制
publisher-confirm-type: correlated

关键代码,注意需要发送者实现ConfirmCallback接口方可实现失败通知

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 发送发确认
* 交换器投递后的应答
* 正常异常都会进行调用
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//只有异常的数据才需要处理
if (!ack) {
//关联数据为空直接返回
if (correlationData == null) {
return;
}
//检查返回消息是否为null
if (null != correlationData.getReturnedMessage()) {
TaxiBO taxiBO = JSON.parseObject(correlationData.getReturnedMessage().getBody(), TaxiBO.class);
//处理消息还原用户未打车状态
redisHelper.handelAccountTaxi(taxiBO.getAccountId());
//获取交换器
String exchange = correlationData.getReturnedMessage().getMessageProperties().getHeader("SEND_EXCHANGE");
//获取队列信息
String routingKey = correlationData.getReturnedMessage().getMessageProperties().getHeader("SEND_ROUTING_KEY");
//获取当前的消息体
Message message = correlationData.getReturnedMessage();
//记录错误日志
recordErrorMessage(taxiBO, cause, exchange, routingKey, message, -1);
}
}
}
3.1.4 Broker丢失消息

​ 前面我们从生产者的角度分析了消息可靠性传输的原理和实现,这一部分我们从broker的角度来看一下如何能保证消息的可靠性传输?

​ 假设有现在一种情况,生产者已经成功将消息发送到了交换机,并且交换机也成功的将消息路由到了队列中,但是在消费者还未进行消费时,mq挂掉了,那么重启mq之后消息还会存在吗?如果消息不存在,那就造成了消息的丢失,也就不能保证消息的可靠性传输了。

​ 也就是现在的问题变成了如何在mq挂掉重启之后还能保证消息是存在的?

开启RabbitMQ的持久化,也即消息写入后会持久化到磁盘,此时即使mq挂掉了,重启之后也会自动读取之前存储的额数据

3.1.4.1 持久化队列
1
2
3
4
@Bean
public Queue queue(){
return new Queue(queueName,true);
}
3.1.4.2 持久化交换器
1
2
3
4
@Bean
DirectExchange directExchange() {
return new DirectExchange(exchangeName,true,false);
}
3.1.4.3 发送持久化消息

发送消息时,设置消息的deliveryMode=2

注意:如果使用SpringBoot的话,发送消息时自动设置deliveryMode=2,不需要人工再去设置

3.1.4.4 Broker总结

通过以上方式,可以保证大部分消息在broker不会丢失,但是还是有很小的概率会丢失消息,什么情况下会丢失呢?

​ 假如消息到达队列之后,还未保存到磁盘mq就挂掉了,此时还是有很小的几率会导致消息丢失的。

​ 这就要mq的持久化和前面的confirm进行配合使用,只有当消息写入磁盘后才返回ack,那么就是在持久化之前mq挂掉了,但是由于生产者没有接收到ack信号,此时可以进行消息重发。

3.2 消费方消息可靠性

3.2.1 消费者手动确认

消费者接收到消息,但是还未处理或者还未处理完,此时消费者进程挂掉了,比如重启或者异常断电等,此时mq认为消费者已经完成消息消费,就会从队列中删除消息,从而导致消息丢失。

​ 那该如何避免这种情况呢?这就要用到RabbitMQ提供的ack机制,RabbitMQ默认是自动ack的,此时需要将其修改为手动ack,也即自己的程序确定消息已经处理完成后,手动提交ack,此时如果再遇到消息未处理进程就挂掉的情况,由于没有提交ack,RabbitMQ就不会删除这条消息,而是会把这条消息发送给其他消费者处理,但是消息是不会丢的。

3.2.1.1 配置文件
1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 手动ack
3.2.1.2 参数介绍

acknowledge-mode: manual就表示开启手动ack,该配置项的其他两个值分别是none和auto

  • auto:消费者根据程序执行正常或者抛出异常来决定是提交ack或者nack,不要把none和auto搞混了
  • manual: 手动ack,用户必须手动提交ack或者nack
  • none: 没有ack机制

​ 默认值是auto,如果将ack的模式设置为auto,此时如果消费者执行异常的话,就相当于执行了nack方法,消息会被放置到队列头部,消息会被无限期的执行,从而导致后续的消息无法消费。

3.3.1.3 消费者实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@RabbitListener(
bindings =
{
@QueueBinding(value = @Queue(value = RabbitConfig.TAXI_DEAD_QUEUE, durable = "true"),
exchange = @Exchange(value = RabbitConfig.TAXI_DEAD_QUEUE_EXCHANGE), key = RabbitConfig.TAXI_DEAD_KEY)
})
@RabbitHandler
public void processOrder(Message massage, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
TaxiBO taxiBO = JSON.parseObject(massage.getBody(), TaxiBO.class);
try {
//开始处理订单
logger.info("处理超时订单,订单详细信息:" + taxiBO.toString());
taxiService.taxiTimeout(taxiBO);
//手动确认机制
channel.basicAck(tag, false);
} catch (Exception e) {
e.printStackTrace();
}
}

3.3 业务可靠性分析

在超时订单业务中,无需过多的考虑消息丢失以及幂等性问题

1
![延时队列实现](./../../../../../工作台/笔记/04_数据存储/消息中间件/RabbitMQ/RabbitMQ/mq32.png)
3.3.1 消息丢失

​ 在这个业务场景中,用户发起打车请求,如果用户消息丢失,对整体业务是没有任何影响的,用户可以再次发起打车操作,这个消息丢失问题概率很低,可以进行简单化设计,如果出现发送失败只需要回退redis中的操作即可。

3.3.2 幂等性校验

​ 因为使用了延时队列,对于这个业务来说是不需要进行幂等性校验的,因为第一次超时时如果存在redis用户排名的key就会被删除,下一次redis没有的值在删除一次,这种操作是幂等的,所以不需要考虑幂等性

3.3.3 数据回滚

​ 虽然无需做到消息完全不丢失以及消息的幂等性,但是需要考虑如果出现问题,需要将插入Redis的的key值回滚掉,防止影响业务正常判断

4. 排队人数

image-20230922160533111

4.1 需求

在打车的过程中如果人数较多的情况下会在派单中等待,如果想知道我的前面还有多少人呢,我们就需要一个排队人数的功能

​ 接受用户的派单数据,但因为派单处理需要一定的时间,所以只能在MQ中有序消费数据,对用户进行排队操作。当然这个排队操作,用户是不透明的,某些用户的请求可能被优先处理,但是通过MQ可以实现整体的有序。

​ 用户很关心自己派单目前的处理进度,即和我一样打车的前面还有多少人,打车APP上显示“你前面还有多少人在排队”。所以后台要能告知用户目前他的派单进度。

4.1.1 需求分析
  • 入队:可以理解为写操作,需要后端存储数据。
  • 获取进度:可以理解为读操作,而且可以预见这个读操作应该比写操作频繁。如果用户很关注她的订单进展,说不定会一直刷新查看他的订单排队情况。

4.2 实现方案

4.2.1 MySQL

​ 用户的订单数据肯定得持久化存储,MySQL是一个不错的选择。既然需求这么简单,无非一个订单数据嘛。暂且用一张表“订单表(T_Order)”来保存正在排队的订单,已经处理完毕的订单则从T_Order表迁移至“(历史订单表T_History_Order)”。这样的好处避免订单表数据量太大,提高读写性能。

4.2.1.1 入队

完成订单的入库,显然就是一个insert语句了

1
insert into T_Order(...) values(...);
4.2.1.2 获取进度

需查询自己订单的排队情况,那肯定看比自己订单时间还早的用户有多少人了。这些比自己下单时间还早的人,就是排在自己前面的人了。假设一个用户同时只能有一个订单在排队。

1
2
3
4
5
#先查出自己订单时间, 假设是1429389316
select orderTime from T_Order where uid=8888;

#再查有多少人的订单时间比自己的早
select count(orderTime) from T_Order where orderTime <= 1429389316;
4.2.1.3 遇到问题

​ 互联网的精髓就是“小步快跑,快速迭代”。用MySQL快速完成需求,面向用户服务后。初期阶段,一切ok。但是当这个业务运营得好,用户量大的时候,就会发现用户经常投诉“我查询自己的订单排队进度,经常报错”。甚至处理订单的同事,也经常抱怨从订单系统里面查看订单,非常缓慢。select count 操作基本都是全表扫描操作,看来MySQL面对这么大规模的全表查询操作,还是有点吃力。

4.2.3 Redis Zset

​ NoSQL在互联网领域的江湖地位已经很牢靠了,看来得请他老人家出来救场了。没错,使用Redis的有序集合(sorted sets)数据结构,就可以完美的解决这个问题。因为有序集合底层的实现是跳表这种数据结构,时间复杂度是logN,即使有序集合里面的订单有100万之多,耗时也基本都是纳秒级别(基本不到1毫秒)。

  1. 用户提交一个订单,我们写入redis的zset中。
  2. 用户要查询自己的订单排队情况,这时候我们只要查询redis的有序集合就可以了。命令为rank
  3. 当这个订单被处理完成后,直接一个zrem命令将订单从有序集合中删除即可

因为Redis基本都是内存操作,而且有序集合的底层实现是跳表这种效率媲美平衡树,但是实现又简单的数据结构,从而完美的释放了MySQL的读压力。

4.3 排队人数架构介绍

​ 打车如果出现排队我们需要能够对当前排队的人数进行预估,能够知道当前我们前面有多少人在排队,我们采用redis的zset来实现排队,整体架构如下。

image-20230922160151319

  1. 用户打车通过zset加入到redis的有序集合
  2. 异步将数据推送到RabbitMQ延迟以及进行正常业务处理
  3. 处理完成后在zset中删除元素
  4. 用户查询人数通过Rank命令进行查询

4.4 数据结构

4.4.2 zset结构

Redis 有序集合和集合一样也是 string 类型元素的集合,且不允许重复的成员。

​ 不同的是每个元素都会关联一个 double 类型的分数。redis 正是通过分数来为集合中的成员进行从小到大的排序。

​ 我们只需要使用rank命令统计从0-当前key对应的分数的key的数量就可以得到当前的排名了

image-20230922160156042

​ 因为Redis基本都是内存操作,而且有序集合的底层实现是跳表这种效率媲美平衡树,但是实现又简单的数据结构,从而完美的释放了MySQL的读压力。

我们如何来保证分数不重复,并且是有序递增的呢,这里就要祭出来我们的雪花算法

4.4.1 雪花算法

SnowFlake算法生成id的结果是一个64bit大小的整数,它的结构如下图:

image-20230922160200527

  1. 1bit,不用,因为二进制中最高位是符号位,1表示负数,0表示正数。生成的id一般都是用整数,所以最高位固定为0。
  2. 41bit-时间戳,用来记录时间戳,毫秒级。 - 41位可以表示2^41-1个数字, - 如果只用来表示正整数(计算机中正数包含0),可以表示的数值范围是:0 至 2^41-1,减1是因为可表示的数值范围是从0开始算的,而不是1。 - 也就是说41位可以表示2^41-1个毫秒的值,转化成单位年则是69年
  3. 10bit-工作机器id,用来记录工作机器id。 - 可以部署在2^{10} = 1024个节点,包括5位datacenterId和5位workerId - 5位(bit)可以表示的最大正整数是2^{5}-1 = 31,即可以用0、1、2、3、….31这32个数字,来表示不同的datecenterId或workerId
  4. 12bit-序列号,序列号,用来记录同毫秒内产生的不同id。 - 12位(bit)可以表示的最大正整数是2^{12}-1 = 4095,即可以用0、1、2、3、….4094这4095个数字,来表示同一机器同一时间截(毫秒)内产生的4095个ID序号。

由于在Java中64bit的整数是long类型,所以在Java中SnowFlake算法生成的id就是long来存储的。

SnowFlake可以保证
  1. 所有生成的id按时间趋势递增
  2. 整个分布式系统内不会产生重复id(因为有datacenterId和workerId来做区分)

4.5 功能实现

4.5.1 派单

使用redisTemplate操作zset将username以及workid压入zset中

1
redisTemplate.opsForZSet().add(TaxiConstant.TAXT_LINE_UP_KEY, taxiBO.getUsername(), taxiBO.getId());
4.5.2 获取排队情况

使用redisTemplate操作zset获取username对应的排名

1
redisTemplate.opsForZSet().rank(TaxiConstant.TAXT_LINE_UP_KEY, username);

4.6 演示

4.6.4 派单

image-20230922155918335

4.6.2 排队情况查询

image-20230922155922421

5. 消息推送

5.1 什么是消息推送

当我们使用http协议探知服务器上是否有内容更新,就必须频繁的从客户端到服务器端进行确认,而http一下的这些标准会成为一个瓶颈:

  • 一条连接上只可以发送一个请求
  • 请求只能从客户端开始,客户端不可以接收除了响应以外的指令。
  • 请求 / 响应首部未经过压缩就直接进行传输,首部的信息越多,那么延迟就越大。
  • 发送冗长的首部。每次互相发送相同的首部造成的浪费越多
  • 可以任意选择数据压缩格式,非强制压缩发送

5.2 方案介绍

5.2.1 ajax短轮询

image-20230922155927774

​ ajax(异步的javascript与xml技术)是一种有效利用javascript和dom的操作,以达到局部web页面的提花和加载的异步通信手段。和以前的同步通信相比,他只更新一部分页面,相应中传输饿数据量会因此的减少。

​ ajax轮询的原理是,让浏览器每隔一段时间就发送一次请求,询问服务器是否有新消息,而利用ajax实时的从服务器获取内容,有可能导致大量的请求产生。

​ 特点:实现简单、短连接、数据同步不及时、对服务器资源会造成一定压力,此模式广泛应用于:扫描登录、扫码支付、天气更新等(腾讯、京东、阿里一直都在沿用此技术并日渐成熟和稳定)

5.2.2 长轮询

image-20230922155932069

​ 原理和ajax轮询差不多,都是采用轮询的方式,不过采用的是阻塞模型。也就是说,当客户端发起连接后,如果服务器端内容没有更新,将响应至于挂起状态,一直不回复response给客户端,知道有内容更新,再返回响应。

​ 虽然可以做到实时更新,但是为了保留响应,一次连接饿持续时间也变长了。期间,为了维持连接会消费更多的资源。

​ 需要有很高的并发,也就是说同时接待客户的能力

​ 从上面两种方式中,其实可以看出是再不断的建立http连接,然后等待服务器处理,可以体现出了http的特点:被动性,即:请求只能由客户端发起。服务器端不能主动联系客户端。

​ 特点:无需浏览器或APP端任何单独插件支持、长连接,减少网络(三次)握手和四次挥手、对服务器资源要求较高等。此模式常用于实时消息轮播、金融数据即时刷新、数据图表实时刷新等。JAVA服务器端一般采用Servlet3支持的异步任务、延时结果(DeferedResult)等手段实现。

5.2.3 WebSocket

​ WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

image-20230922155935904

5.3 WS实现消息推送

通过上面我们实现了延时任务处理以及派单排队,但是如果将我们的异步处理结果推送给客户端呢

我们就需要使用消息推送技术,需要完成一下功能

  • 将消息推送到指定的用户
  • 对于未上线用户需要暂存数据,上线后推送

image-20230922155940222

5.3.1 架构介绍

因为websocket是点对点的,而服务间调用是轮询的,无法实现微服务之间点对点的消息推送,我们使用定时任务来实现消息推送

image-20230922155944628

  1. 调用接口先将消息暂存到MongoDB中
  2. 轮询任务首先拉取当前在线人员列表
  3. 轮询任务通过在线人员列表到MongoDB中拉取在线用户的通知消息
  4. 将消息通过WS推送到指定的用户
5.3.2 暂存数据

通过MongoDB将我们的消息数据暂存到数据库中,可以完成对于未上线消息暂存以及对分布式websocket的数据调度

5.3.2.1 什么是MongoDB

​ MongoDB是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的。它支持的数据结构非常松散,是类似json的bson格式,因此可以存储比较复杂的数据类型。Mongo最大的特点是它支持的查询语言非常强大,其语法有点类似于面向对象的查询语言,几乎可以实现类似关系数据库单表查询的绝大部分功能,而且还支持对数据建立索引。

5.3.2.2 插入数据
1
2
3
4
@Override
public void addMessage(PushMessage message) {
mongoTemplate.save(message);
}
5.3.2.3 查询数据
1
2
3
4
5
@Override
public List<PushMessage> getMessageByUserNames(List<String> userNameList) {
Query query = new Query(Criteria.where("username").in(userNameList));
return mongoTemplate.findAllAndRemove(query, PushMessage.class);
}
5.4.1 轮询任务

轮询任务就是不断的搜索检查是否有新的消息,然后交给WS进行处理

image-20230922155959511

5.4.1.1 代码实现

使用pull方式将MongoDB中的在线用户的暂存消息取出来,推送给在线用户

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* 定时任务 推送暂存消息
*/
@Component
public class ScheduledTask {

private static final Logger logger = LoggerFactory.getLogger(ScheduledTask.class);

@Autowired
private PushService pushService;

private static final ExecutorService executorService = Executors.newFixedThreadPool(10);

@Autowired
private WebSocketServer webSocketServer;

@PostConstruct
public void init() {
executorService.execute(() -> {
autoPushMessage();
});

}


/**
* 自动推送消息
*/
public void autoPushMessage() {
//轮询并发送消息
PollingRound.pollingPull(() -> {
//获取最新需要推送的消息
List<PushMessagePO> pushMessagesList = getPushMessages();
//校验消息
if (null != pushMessagesList && !pushMessagesList.isEmpty()) {
logger.debug("推送消息线程工作工作中,推送数据条数:{}", pushMessagesList.size());
//推送消息
webSocketServer.pushMessage(pushMessagesList);
return PollingRound.delayLoop(100);
}
logger.debug("推送消息线程工作工作中,推送数据条数:{}", 0);
return PollingRound.delayLoop(1000);
});
}

public List<PushMessagePO> getPushMessages() {
List<String> userNameList = webSocketServer.getInLineAccountIds();
if (null != userNameList && !userNameList.isEmpty()) {
//在MongoDB中获取当前在线用户的暂存消息
List<PushMessagePO> pushMessageList = pushService.getMessageByAccountIds(userNameList);
//返回消息
return pushMessageList;
}
return null;
}
}