RocketMQ 5.0:云原生“消息、事件、流”实时数据处理平台,覆盖云边端一体化数据处理场景。

核心特性

1. 基本概念

Apache RocketMQ 5.0 笔记

1、消息由生产者初始化并发送到Apache RocketMQ 服务端。

2、消息按照到达Apache RocketMQ 服务端的顺序存储到主题的指定队列中。

3、消费者按照指定的订阅关系从Apache RocketMQ 服务端中获取消息并消费。

1.1. 消息

消息是 Apache RocketMQ 中的最小数据传输单元。生产者将业务数据的负载和拓展属性包装成消息发送到 Apache RocketMQ 服务端,服务端按照相关语义将消息投递到消费端进行消费。

RocketMQ 消息构成非常简单,如下所示:

消息内部属性

字段名 必填 说明
主题名称

当前消息所属的主题的名称。集群内全局唯一。

消息体 消息体
消息类型

Normal:普通消息,消息本身无特殊语义,消息之间也没有任何关联。

FIFO:顺序消息,Apache RocketMQ 通过消息分组MessageGroup标记一组特定消息的先后顺序,可以保证消息的投递顺序严格按照消息发送时的顺序。

Delay:定时/延时消息,通过指定延时时间控制消息生产后不要立即投递,而是在延时间隔后才对消费者可见。

Transaction:事务消息,Apache RocketMQ 支持分布式事务消息,支持应用数据库更新和消息调用的事务一致性保障。

过滤标签Tag 方便服务器过滤使用,消费者可通过Tag对消息进行过滤,仅接收指定标签的消息。目前只支持每个消息设置一个。
索引Key 消息的索引键,可通过设置不同的Key区分消息和快速查找消息。
定时时间   定时场景下,消息触发延时投递的毫秒级时间戳。
消费重试次数 否   消息消费失败后,Apache RocketMQ 服务端重新投递的次数。每次重试后,重试次数加1。
业务自定义属性   生产者可以自定义设置的扩展信息。

系统默认的消息最大限制如下:

1.2. Tag

Topic 与 Tag 都是业务上用来归类的标识,区别在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。使用 Tag 可以实现对 Topic 中的消息进行过滤。

提示:

  • Topic:消息主题,通过 Topic 对不同的业务消息进行分类。
  • Tag:消息标签,用来进一步区分某个 Topic 下的消息分类,消息从生产者发出即带上的属性。

Topic 和 Tag 的关系如下图所示:

Apache RocketMQ 5.0 笔记

什么时候该用 Topic,什么时候该用 Tag?

可以从以下几个方面进行判断:

通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息,例如全集和子集的关系、流程先后的关系。

1.3. Keys

Apache RocketMQ 每个消息可以在业务层面的设置唯一标识码 keys 字段,方便将来定位消息丢失问题。 Broker 端会为每个消息创建索引(哈希索引),应用可以通过 topic、key 来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。

// 订单Id
String orderId = "20034568923546";
message.setKeys(orderId);

1.4. 队列

一个 Topic 可能有多个队列,并且可能分布在不同的 Broker 上。

队列天然具备顺序性,即消息按照进入队列的顺序写入存储,同一队列间的消息天然存在顺序关系,队列头部为最早写入的消息,队列尾部为最新写入的消息。消息在队列中的位置和消息之间的顺序通过位点(Offset)进行标记管理。

Apache RocketMQ 5.0 笔记

Apache RocketMQ 默认提供消息可靠存储机制,所有发送成功的消息都被持久化存储到队列中,配合生产者和消费者客户端的调用可实现至少投递一次的可靠性语义。

Apache RocketMQ 队列模型和Kafka的分区(Partition)模型类似。在 Apache RocketMQ 消息收发模型中,队列属于主题的一部分,虽然所有的消息资源以主题粒度管理,但实际的操作实现是面向队列。例如,生产者指定某个主题,向主题内发送消息,但实际消息发送到该主题下的某个队列中。

Apache RocketMQ 中通过修改队列数量,以此实现横向的水平扩容和缩容。

一般来说一条消息,如果没有重复发送(比如因为服务端没有响应而进行重试),则只会存在在 Topic 的其中一个队列中,消息在队列中按照先进先出的原则存储,每条消息会有自己的位点,每个队列会统计当前消息的总条数,这个称为最大位点 MaxOffset;队列的起始位置对应的位置叫做起始位点 MinOffset。队列可以提升消息发送和消费的并发度。

注意:按照实际业务消耗设置队列数,队列数量的设置应遵循少用够用原则,避免随意增加队列数量。

1.5. 生产者

生产者(Producer)就是消息的发送者,Apache RocketMQ 拥有丰富的消息类型,可以支持不同的应用场景,在不同的场景中,需要使用不同的消息进行发送。比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略,此时就需要用到延迟消息;电商场景中,业务上要求同一订单的消息保持严格顺序,此时就要用到顺序消息。在日志处理场景中,可以接受的比较大的发送延迟,但对吞吐量的要求很高,希望每秒能处理百万条日志,此时可以使用批量消息。在银行扣款的场景中,要保持上游的扣款操作和下游的短信通知保持一致,此时就要使用事务消息。

注意:不要在同一个主题内使用多种消息类型

生产者通常被集成在业务系统中,将业务消息按照要求封装成 Apache RocketMQ 的消息(Message)并发送至服务端。

生产者和主题的关系为多对多关系,即同一个生产者可以向多个主题发送消息,同一个主题也可以接收多个生产者的消息。

Apache RocketMQ 5.0 笔记

注意:不建议频繁创建和销毁生产者

Producer p = ProducerBuilder.build();
for (int i =0;i<n;i++){
Message m= MessageBuilder.build();
p.send(m);
}
p.shutdown();

1.6. 消费者与消费者组

如果多个消费者设置了相同的Consumer Group,我们认为这些消费者在同一个消费组内。同一个消费组的多个消费者必须保持消费逻辑和配置一致,共同分担该消费组订阅的消息,实现消费能力的水平扩展。

在 Apache RocketMQ 有两种消费模式,分别是:

Apache RocketMQ 5.0 笔记

Apache RocketMQ 5.0 笔记

负载均衡

RocketMQ的负载均衡策略与Kafka极其类似,几乎一毛一样

集群模式下,同一个消费组内的消费者会分担收到的全量消息,这里的分配策略是怎样的?如果扩容消费者是否一定能提升消费能力?

Apache RocketMQ 提供了多种集群模式下的分配策略,包括平均分配策略、机房优先分配策略、一致性hash分配策略等,可以通过如下代码进行设置相应负载均衡策略。

consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());

默认的分配策略是平均分配,这也是最常见的策略。平均分配策略下消费组内的消费者会按照类似分页的策略均摊消费。

在平均分配的算法下,可以通过增加消费者的数量来提高消费的并行度。比如下图中,通过增加消费者来提高消费能力。

Apache RocketMQ 5.0 笔记

Apache RocketMQ 5.0 笔记

但也不是一味地增加消费者就能提升消费能力的,比如下图中Topic的总队列数小于消费者的数量时,消费者将分配不到队列,即使消费者再多也无法提升消费能力。

Apache RocketMQ 5.0 笔记

1.7. 消费者分类

Apache RocketMQ 5.0 笔记

如上图所示, Apache RocketMQ 的消费者处理消息时主要经过以下阶段:消息获取--->消息处理--->消费状态提交。

针对以上几个阶段,Apache RocketMQ 提供了不同的消费者类型: PushConsumer 、SimpleConsumer 和 PullConsumer。这几种类型的消费者通过不同的实现方式和接口可满足您在不同业务场景下的消费需求。具体差异如下:

注:在实际使用场景中,PullConsumer 仅推荐在流处理框架中集成使用,大多数消息收发场景使用 PushConsumer 和 SimpleConsumer 就可以满足需求。

Apache RocketMQ 5.0 笔记

PushConsumer

PushConsumers是一种高度封装的消费者类型,消费消息仅通过消费监听器处理业务并返回消费结果。消息的获取、消费状态提交以及消费重试都通过 Apache RocketMQ 的客户端SDK完成。

SimpleConsumer

SimpleConsumer 是一种接口原子型的消费者类型,消息的获取、消费状态提交以及消费重试都是通过消费者业务逻辑主动发起调用完成。

补充:

rocketmq-client中定义的:

  • DefaultMQProducer
  • DefaultMQPushConsumer
  • DefaultLitePullConsumer

rocketmq-client-java中定义的:

  • Producer
  • PushConsumer
  • SimpleConsumer

1.8. 消费位点

消息是按到达Apache RocketMQ 服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点。一条消息被某个消费者消费完成后不会立即从队列中删除,Apache RocketMQ 会基于每个消费者分组记录消费过的最新一条消息的位点,即消费位点

Apache RocketMQ 5.0 笔记

如上图所示,在Apache RocketMQ中每个队列都会记录自己的最小位点、最大位点。针对于消费组,还有消费位点的概念,在集群模式下,消费位点是由客户端提给交服务端保存的,在广播模式下,消费位点是由客户端自己保存的。一般情况下消费位点正常更新,不会出现消息重复,但如果消费者发生崩溃或有新的消费者加入群组,就会触发重平衡,重平衡完成后,每个消费者可能会分配到新的队列,而不是之前处理的队列。为了能继续之前的工作,消费者需要读取每个队列最后一次的提交的消费位点,然后从消费位点处继续拉取消息。但在实际执行过程中,由于客户端提交给服务端的消费位点并不是实时的,所以重平衡就可能会导致消息少量重复。

1.9. 订阅关系

一个订阅关系指的是指定某个消费者分组对于某个主题的订阅。

不同消费者分组对于同一个主题的订阅相互独立如下图所示,消费者分组Group A和消费者分组Group B分别以不同的订阅关系订阅了同一个主题Topic A,这两个订阅关系互相独立,可以各自定义,不受影响。

Apache RocketMQ 5.0 笔记

同一个消费者分组对于不同主题的订阅也相互独立如下图所示,消费者分组Group A订阅了两个主题Topic A和Topic B,对于Group A中的消费者来说,订阅的Topic A为一个订阅关系,订阅的Topic B为另外一个订阅关系,且这两个订阅关系互相独立,可以各自定义,不受影响。

Apache RocketMQ 5.0 笔记

2. 消息类型

1、顺序消息(FIFO):这类消息必须设置 message group,这种类型的消息需要与FIFO消费者组一起使用

2、延迟消息(DELAY):消息被发送后不会立即对消费者可见,这种类型的消息必须设置delivery timestamp以决定对消费者可见的时间;

3、事务消息(TRANSACTIONAL):将一个或多个消息的发布包装到一个事务中,提供提交/回滚方法来决定消息的可见性;

4、普通消息(NORMAL):默认类型

不同的类型是互斥的,当意味着要发布的消息不能同时是FIFO类型和DELAY类型。实际上,主题的类型决定了消息的类型。例如,FIFO主题不允许发布其他类型的消息。

Apache RocketMQ 5.0 笔记

2.1. 普通消息

普通消息一般应用于微服务解耦、事件驱动、数据集成等场景,这些场景大多数要求数据传输通道具有可靠传输的能力,且对消息的处理时机、处理顺序没有特别要求。

典型场景一:微服务异步解耦

Apache RocketMQ 5.0 笔记

如上图所示,以在线的电商交易场景为例,上游订单系统将用户下单支付这一业务事件封装成独立的普通消息并发送至Apache RocketMQ服务端,下游按需从服务端订阅消息并按照本地消费逻辑处理下游任务。每个消息之间都是相互独立的,且不需要产生关联。

典型场景二:数据集成传输

Apache RocketMQ 5.0 笔记

如上图所示,以离线的日志收集场景为例,通过埋点组件收集前端应用的相关操作日志,并转发到 Apache RocketMQ 。每条消息都是一段日志数据,Apache RocketMQ 不做任何处理,只需要将日志数据可靠投递到下游的存储系统和分析系统即可,后续功能由后端应用完成。

2.2. 顺序消息

应用场景

在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。在这类场景下使用 Apache RocketMQ 的顺序消息可以有效保证数据传输的顺序性。

典型场景一:撮合交易

Apache RocketMQ 5.0 笔记

以证券、股票交易撮合场景为例,对于出价相同的交易单,坚持按照先出价先交易的原则,下游处理订单的系统需要严格按照出价顺序来处理订单。

典型场景二:数据实时增量同步

Apache RocketMQ 5.0 笔记

以数据库变更增量同步场景为例,上游源端数据库按需执行增删改操作,将二进制操作日志作为消息,通过 Apache RocketMQ 传输到下游搜索系统,下游系统按顺序还原消息数据,实现状态数据按序刷新。如果是普通消息则可能会导致状态混乱,和预期操作结果不符,基于顺序消息可以实现下游状态和上游操作结果一致。

功能原理

顺序消息是 Apache RocketMQ 提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。 相比其他类型消息,顺序消息在发送、存储和投递的处理过程中,更多强调多条消息间的先后顺序关系。

Apache RocketMQ 顺序消息的顺序关系通过消息组(MessageGroup)判定和识别,发送顺序消息时需要为每条消息设置归属的消息组,相同消息组的多条消息之间遵循先进先出的顺序关系,不同消息组、无消息组的消息之间不涉及顺序性。

基于消息组的顺序判定逻辑,支持按照业务逻辑做细粒度拆分,可以在满足业务局部顺序的前提下提高系统的并行度和吞吐能力。

如何保证消息的顺序性?

Apache RocketMQ 的消息的顺序性分为两部分,生产顺序性和消费顺序性。

1、生产顺序性

如需保证消息生产的顺序性,则必须满足以下条件:

满足以上条件的生产者,将顺序消息发送至 Apache RocketMQ 后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:

Apache RocketMQ 5.0 笔记

2、消费顺序性

如需保证消息消费的顺序性,则必须满足以下条件:

生产顺序性和消费顺序性组合

如果消息需要严格按照先进先出(FIFO)的原则处理,即先发送的先消费、后发送的后消费,则必须要同时满足生产顺序性和消费顺序性。

一般业务场景下,同一个生产者可能对接多个下游消费者,不一定所有的消费者业务都需要顺序消费,您可以将生产顺序性和消费顺序性进行差异化组合,应用于不同的业务场景。例如发送顺序消息,但使用非顺序的并发消费方式来提高吞吐能力。更多组合方式如下表所示:

生产顺序 消费顺序 顺序性效果
设置消息组,保证消息顺序发送。 顺序消费 按照消息组粒度,严格保证消息顺序。 同一消息组内的消息的消费顺序和发送顺序完全一致。
设置消息组,保证消息顺序发送。 并发消费 并发消费,尽可能按时间顺序处理。
未设置消息组,消息乱序发送。 顺序消费 按队列存储粒度,严格顺序。 基于 Apache RocketMQ 本身队列的属性,消费顺序和队列存储的顺序一致,但不保证和发送顺序一致。
未设置消息组,消息乱序发送。 并发消费 并发消费,尽可能按照时间顺序处理。

2.3. 定时/延时消息

注:定时消息和延时消息本质相同,都是服务端根据消息设置的定时时间在某一固定时刻将消息投递给消费者消费。

应用场景

在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的定时事件触发。使用 Apache RocketMQ 的定时消息可以简化定时调度任务的开发逻

辑,实现高性能、可扩展、高可靠的定时触发能力。

典型场景一:分布式定时调度

Apache RocketMQ 5.0 笔记

在分布式定时调度场景下,需要实现各类精度的定时任务,例如每天5点执行文件清理,每隔2分钟触发一次消息推送等需求。基于 Apache RocketMQ 的定时消息可以封装出多种类型的定时触发器。

典型场景二:任务超时处理

以电商交易场景为例,订单下单后暂未支付,此时不可以直接关闭订单,而是需要等待一段时间后才能关闭订单。使用 Apache RocketMQ 定时消息可以实现超时任务的检查触发。

基于定时消息的超时任务处理具备如下优势:

功能原理

定时时间设置原则

Apache RocketMQ 定时消息设置的定时时间是一个预期触发的系统时间戳,延时时间也需要转换成当前系统时间后的某一个时间戳,而不是一段延时时长。

投递等级

Apache RocketMQ 一共支持18个等级的延迟投递,具体时间如下:

Apache RocketMQ 5.0 笔记

2.4. 事务消息

Apache RocketMQ 5.0 笔记

以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:

使用普通消息和订单事务无法保证一致的原因,本质上是由于普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。而基于 RocketMQ 的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。

Apache RocketMQ 5.0 笔记

事务消息发送分为两个阶段。第一阶段会发送一个半事务消息,半事务消息是指暂不能投递的消息,生产者已经成功地将消息发送到了 Broker,但是Broker 未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,如果发送成功则执行本地事务,并根据本地事务执行成功与否,向 Broker 半事务消息状态(commit或者rollback),半事务消息只有 commit 状态才会真正向下游投递。如果由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,Broker 端会通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback)。这样最终保证了本地事务执行成功,下游就能收到消息,本地事务执行失败,下游就收不到消息。总而保证了上下游数据的一致性。(PS:重点是两阶段提交

事务消息处理流程

Apache RocketMQ 5.0 笔记

1、生产者将消息发送至Apache RocketMQ服务端。

2、Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。

3、生产者开始执行本地事务逻辑。

4、生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

5、在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。

6、生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

7、生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

3. 机制

3.1. 消息发送重试机制

Apache RocketMQ 客户端连接服务端发起消息发送请求时,可能会因为网络故障、服务异常等原因导致调用失败。为保证消息的可靠性, Apache RocketMQ 在客户端SDK中内置请求重试逻辑,尝试通过重试发送达到最终调用成功的效果。

同步发送和异步发送模式均支持消息发送重试。

重试触发条件:

重试流程:

生产者在初始化时设置消息发送最大重试次数,当出现上述触发条件的场景时,生产者客户端会按照设置的重试次数一直重试发送消息,直到消息发送成功或达到最大重试次数重试结束,并在最后一次重试失败后返回调用错误响应。

重试间隔

3.2. 消息流控机制

消息流控指的是系统容量或水位过高, Apache RocketMQ 服务端会通过快速失败返回流控错误来避免底层资源承受过高压力。

触发条件

流控行为

当系统触发消息发送流控时,客户端会收到系统限流错误和异常,错误码信息如下:

3.3. 消费重试

消费者出现异常,消费某条消息失败时, Apache RocketMQ 会根据消费重试策略重新投递该消息。消费重试主要解决的是业务处理逻辑失败导致的消费完整性问题,是一种为业务兜底的策略,不应该被用做业务流程控制。

推荐使用消息重试场景如下:

消费重试策略

消费重试指的是,消费者在消费某条消息失败后,Apache RocketMQ 服务端会根据重试策略重新消费该消息,超过一次定数后若还未消费成功,则该消息将不再继续重试,直接被发送到死信队列中。

消息重试的触发条件

重试策略差异

Apache RocketMQ 5.0 笔记

3.4. 消费进度

消息位点(Offset)

消息是按到达服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点。

任意一个消息队列在逻辑上都是无限存储,即消息位点会从0到Long.MAX无限增加。通过主题、队列和位点就可以定位任意一条消息的位置,具体关系如下图所示:

Apache RocketMQ 5.0 笔记

Apache RocketMQ 定义队列中最早一条消息的位点为最小消息位点(MinOffset);最新一条消息的位点为最大消息位点(MaxOffset)。虽然消息队列逻辑上是无限存储,但由于服务端物理节点的存储空间有限, Apache RocketMQ 会滚动删除队列中存储最早的消息。因此,消息的最小消费位点和最大消费位点会一直递增变化。

Apache RocketMQ 5.0 笔记

消费位点(ConsumerOffset)

Apache RocketMQ 领域模型为发布订阅模式,每个主题的队列都可以被多个消费者分组订阅。若某条消息被某个消费者消费后直接被删除,则其他订阅了该主题的消费者将无法消费该消息。

因此,Apache RocketMQ 通过消费位点管理消息的消费进度。每条消息被某个消费者消费完成后不会立即在队列中删除,Apache RocketMQ 会基于每个消费者分组维护一份消费记录,该记录指定消费者分组消费某一个队列时,消费过的最新一条消息的位点,即消费位点。

当消费者客户端离线,又再次重新上线时,会严格按照服务端保存的消费进度继续处理消息。如果服务端保存的历史位点信息已过期被删除,此时消费位点向前移动至服务端存储的最小位点。

注:消费位点的保存和恢复是基于 Apache RocketMQ 服务端的存储实现,和任何消费者无关。

队列中消息位点MinOffset、MaxOffset和每个消费者分组的消费位点ConsumerOffset的关系如下:

Apache RocketMQ 5.0 笔记

ConsumerOffset≤MaxOffset:

ConsumerOffset≥MinOffset:

消费位点初始值

消费位点初始值指的是消费者分组首次启动消费者消费消息时服务端保存的消费位点的初始值。Apache RocketMQ 定义消费位点的初始值为消费者首次获取消息时,该时刻队列中的最大消息位点。相当于消费者将从队列中最新的消息开始消费。

3.5. 消息存储机制

Apache RocketMQ 使用存储时长作为消息存储的依据,即每个节点对外承诺消息的存储时长。在存储时长范围内的消息都会被保留,无论消息是否被消费;超过时长限制的消息则会被清理掉。

Apache RocketMQ 5.0 笔记

4. 架构

4.1. 技术架构

Apache RocketMQ 5.0 笔记

RocketMQ架构上主要分为四部分,如上图所示:

4.2. 部署架构

Apache RocketMQ 5.0 笔记

RocketMQ 网络部署特点:

结合部署架构图,描述集群工作流程:

5. 客户端

在编写客户端代码时,首先准备一个简单的环境,可以选用Local模式。这里不多介绍,只说一句,启动broker的时候可以-c指定配置文件,启动完以后通过jps查看进程

Apache RocketMQ 5.0 笔记

通过mqadmin命令创建并查看主题

mqadmin updateTopic -n localhost:9876 -b 172.16.52.116:10911 -t TEST_TOPIC
mqadmin topicList -n localhost:9876

具体命令参数,参见 https://rocketmq.apache.org/zh/docs/deploymentOperations/16admintool/

也可以通过RocketMQ Dashboard创建主题

Apache RocketMQ 5.0 笔记

Apache RocketMQ 5.0 笔记

Apache RocketMQ 5.0 笔记

5.1. rocketmq-client

引入依赖

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.0.0</version>
</dependency>

代码片段

public class AppTest extends TestCase {
private String producerGroupName = "MyProducerGroup";
private String consumerGroupName = "MyConsumerGroup";
/**
* 发送同步消息
*/
@Test
public void testSyncProducer() throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer(producerGroupName);
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
// 发送消息
Message message = new Message("TEST_TOPIC", "A", "UserID12345", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
// 关闭Producer实例
producer.shutdown();
}
/**
* 发送异步消息
*/
@Test
public void testAsyncProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(producerGroupName);
producer.setNamesrvAddr("localhost:9876");
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
Message msg = new Message("TEST_TOPIC", "B", "OrderID12346", "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
public void onException(Throwable e) {
e.printStackTrace();
}
});
//  等待5秒
TimeUnit.SECONDS.sleep(5);
}
/**
* 单向发送消息
* 这种方式主要用在不特别关心发送结果的场景,例如日志发送。
*/
@Test
public void testOnewayProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(producerGroupName);
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TEST_TOPIC", "C", "OrderID12348", "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送单向消息,没有任何返回结果
producer.sendOneway(msg);
}
/**
* 消费消息
*/
@Test
public void testConsumer() throws Exception {
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(consumerGroupName);
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TEST_TOPIC", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.start();
while (true) {
List<MessageExt> messageExts = consumer.poll();
if (messageExts.isEmpty()) {
continue;
}
messageExts.forEach(msg -> {
System.out.println(String.format("MsgId: %s, MsgBody: %s", msg.getMsgId(), new String(msg.getBody())));
});
consumer.commitSync();
}
}
}

5.2. rocketmq-spring-boot-starter

依赖

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>

application.yml

配置项详见 org.apache.rocketmq.spring.autoconfigure.RocketMQProperties

rocketmq:
name-server: localhost:9876
producer:
group: MyProducerGroup
send-message-timeout: 10000
consumer:
group: MyConsumerGroup

发送消息

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author: ChengJianSheng
* @Date: 2023/1/18
*/
@RestController
@RequestMapping("/message")
public class MessageController {
private String springTopic = "SPRING_TOPIC";
private String userTopic = "USER_TOPIC";
private String orderTopic = "ORDER_TOPIC";
private String extTopic = "EXT_TOPIC";
private String reqTopic = "REQ_TOPIC";
private String objTopic = "OBJECT_TOPIC";
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/send")
public String send() {
SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "Hello World");
Message message = MessageBuilder.withPayload("Hello World!2222".getBytes()).build();
sendResult = rocketMQTemplate.syncSend(springTopic, message);
message = MessageBuilder.withPayload("Hello, World! I'm from spring message").build();
sendResult = rocketMQTemplate.syncSend(springTopic, message);
sendResult = rocketMQTemplate.syncSend(userTopic, new User("zhangsan", 20));
message = MessageBuilder.withPayload(new User("lisi", 21))
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE)
.build();
sendResult = rocketMQTemplate.syncSend(userTopic, message);
rocketMQTemplate.asyncSend(orderTopic, new Order("oid1234", "4.56"), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("async onSucess SendResult=%s %n", sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.printf("async onException Throwable=%s %n", throwable);
}
});
rocketMQTemplate.convertAndSend(extTopic + ":tag0", "I'm from tag0");
rocketMQTemplate.convertAndSend(extTopic + ":tag1", "I'm from tag1");
String replyString = rocketMQTemplate.sendAndReceive(reqTopic, "request string", String.class);
System.out.printf("receive %s %n", replyString);
User replyUser = rocketMQTemplate.sendAndReceive(objTopic, new User("wangwu", 21), User.class);
System.out.printf("receive %s %n", replyUser);
return "ok";
}
}

接收消息

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "SPRING_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class StringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.printf("------- StringConsumer received: %s \n", message);
}
}
@Component
@RocketMQMessageListener(topic = "ORDER_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class OrderConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order message) {
System.out.printf("------- OrderConsumer received: %s [orderId : %s]\n", message, message.getOrderNo());
}
}
@Component
@RocketMQMessageListener(topic = "USER_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class UserConsumer implements RocketMQListener<User>, RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(User message) {
System.out.printf("------ UserConsumer received: %s ; age: %s ; name: %s \n", message, message.getAge(), message.getName());
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
}
}
@Component
@RocketMQMessageListener(topic = "REQ_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
@Override
public String onMessage(String message) {
System.out.printf("------- StringConsumerWithReplyString received: %s \n", message);
return "reply string";
}
}
@Component
@RocketMQMessageListener(topic = "OBJECT_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class ObjectConsumerWithReplyUser implements RocketMQReplyListener<User, User> {
@Override
public User onMessage(User message) {
System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", message);
return new User("tom", 8);
}
}
@Component
@RocketMQMessageListener(topic = "EXT_TOPIC", selectorExpression = "tag0||tag1", consumerGroup = "${rocketmq.consumer.group}")
public class MessageExtConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
System.out.printf("------- MessageExtConsumer received message, msgId: %s, body:%s \n", message.getMsgId(), new String(message.getBody()));
}
}

6. 文档

https://rocketmq.apache.org/zh/

https://rocketmq.apache.org/zh/docs/deploymentOperations/15deploy/

https://github.com/apache/rocketmq/tree/rocketmq-all-5.0.0/docs/cn

https://github.com/apache/rocketmq/blob/rocketmq-all-5.0.0/docs/cn/architecture.md

https://github.com/apache/rocketmq/blob/rocketmq-all-5.0.0/docs/cn/RocketMQ_Example.md

https://github.com/apache/rocketmq-dashboard

https://github.com/apache/rocketmq-spring

https://github.com/apache/rocketmq

发表回复