1、 RocketMQ安装测试
1.1 下载解压
下载地址:https://rocketmq.apache.org/release-notes/
rocketmq-all-5.0.0-bin-release.zip
下载后上传到服务器;
解压命令# unzip rocketmq-all-5.0.0-bin-release.zip
1.2 启动 测试
RocketMQ默认配置是比较好的,这样可以直接应用于生产环境,所以如果机器内存较小,启动会因为内存不足失败,为了避免后面启动失败,选择先修改其内存大小,一般阿里云服务器是满足不了默认内存。
手动调整JVM的配置,单位从g改为m
1.2.1 启动nameserver
1.2.1.1 修改runbroker.sh和runserver.sh
1.2.1.2 runbroker.sh
-server -Xms256m -Xmx256m -Xmn128m
1.2.1.3 runserver.sh
-server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m
1.2.1.4 启动nameserver
解压目录执行# nohup ./bin/mqnamesrv -n 1.117.75.57(自己的ip):9876 &
1.2.2 启动broker
1.2.2.1 修改broker.conf
添加namesrvAddr 和 brokerIP1:
1.2.2.3 启动 borker
解压目录执行# nohup ./bin/mqbroker -n 1.117.75.57:9876 -c ./conf/broker.conf &
1.2.2.4 查看启动情况
jps
1.2.3 测试
由于服务器内存可能比较小,建议先关闭其他应用,比如rabbitmq,docker的容器等;
还需要开启几个端口:9876,10909,10910,10911;
1.2.3.1 生产者
导出环境变量# export NAMESRV_ADDR=1.117.75.57:9876
发送消息# bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
1.2.3.2 消费者
导出环境变量# export NAMESRV_ADDR=1.117.75.57:9876
消费消息# bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
1.2.4 关闭命令
关闭nameserver# bin/mqshutdown namesrv
关闭broker# bin/mqshutdown broker
1.2.5 RocketMQ控制台
1.2.5.1 下载,解压,修改配置信息
1.2.5.2 访问控制台
localhost:9696
2、RocketMQ框架原理
2.1 框架
2.2 概念
整体可以分成4个角色,分别是:NameServer,Broker,Producer,Consumer:
- Broker(邮递员):Broker是RocketMQ的核心,负责消息的接收,存储,投递等功能;
- NameServer(邮局):消息队列的协调者,Broker向它注册路由信息,同时Producer和Consumer向其获取路由信息Producer(寄件人)消息的生产者,需要从NameServer获取Broker信息,然后与Broker建立连接,向Broker发送消息;
- Consumer(收件人) :消息的消费者,需要从NameServer获取Broker信息,然后与Broker建立连接,从Broker获取消息;
- Topic(地区):用来区分不同类型的消息,发送和接收消息前都需要先创建Topic,针对Topic来发送和接收消息Message Queue(邮件)为了提高性能和吞吐量,引入了Message Queue,一个Topic可以设置一个或多个Message Queue,这样消息就可以并行往各个Message Queue发送消息,消费者也可以并行的从多个Message Queue读取消息;
- Message:Message 是消息的载体;
- Producer Group:生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。
- Consumer Group:消费者组,消费同一类消息的多个 consumer 实例组成一个消费者组。
3、RocketMQ整合
3.1 rocketmq模块 发送消息
3.2.1.1 依赖
<!-- rocket -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
3.2.1.2 配置
# rocketmq配置
rocketmq:
#rocketMQ服务的地址
name-server: 1.117.75.57:9876
# 生产者组
producer:
group: kh96-sendsms-group
3.2.1.3 请求
@Autowired(required = false)
private RocketMQTemplate rocketMQTemplate;
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试发送消息到用户中心,用户中心给手机号发信息
*/
@RequestMapping("/testRocketMQSendMsg")
public String testRocketMQSendMsg(@RequestParam String phoneNo) {
log.info("------ 使用RocketMQ,测试给手机:{},发送消息 -------", phoneNo);
//使用RocketMQ发送消息
rocketMQTemplate.convertAndSend("rocketmq-send-sms-kh96", phoneNo);
return "send msg success";
}
3.2 user模块 消费消息
1.添加加rocketmq的依赖;
2.用户服务,监听发送短信的请求发送消息:
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 用户服务,监听发送短信的请求发送消息
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "rocketmq-user-sms-group", //组 随便写
topic = "rocketmq-send-sms-kh96" //消息队列,发送的时候指定的
)
public class SendSmsListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("***** 接收发送信息的请求,给手机:{},发送消息 ******", message);
}
}
3.3 测试
3.3.1 发送请求
3.3.2 发送消息模块日志
3.3.3 接收消息模块日志
3.3.4 控制台查看消息详情
4、发送不同类型的消息
4.1 发送可靠同步消息
4.1.1 请求
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试roketmq-发送可靠同步消息
*/
@RequestMapping("/testRocketMQSendMsgSync")
public String testRocketMQSendMsgSync(@RequestParam String syncMsg) {
log.info("------ 使用RocketMQ,发送可靠同步消息{} -------", syncMsg);
//使用RocketMQ发送消息,拿到同步结果
SendResult sendResult = rocketMQTemplate.syncSend("rocketmq-sync-msg-kh96", syncMsg);
log.info("------ 使用RocketMQ,发送可靠同步消息结果:{} -------", sendResult);
return "send sync msg success";
}
4.1.2 发送请求
4.1.3 同步结果
4.2 发送可靠异步消息
4.2.1 请求
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试roketmq-发送可靠异步消息
*/
@RequestMapping("/testRocketMQSendMsgAsync")
public String testRocketMQSendMsgAsync(@RequestParam String asyncMsg) {
log.info("------ 使用RocketMQ,发送可靠异步消息:{} -------", asyncMsg);
//使用RocketMQ发送消息
rocketMQTemplate.asyncSend("rocketmq-sync-msg-kh96",
asyncMsg,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("------ 可靠异步发送成功回调 ------");
}
@Override
public void onException(Throwable throwable) {
log.info("------ 可靠异步发送失败回调 ------");
}
});
return "send async msg success";
}
4.2.2 发送请求
4.2.3 回调结果
4.3 发送单项消息,只发不收结果
4.3.1 请求
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试roketmq-发送单向消息,只发不收结果
*/
@RequestMapping("/testRocketMQSendMsgOneWay")
public String testRocketMQSendMsgOneWay(@RequestParam String oneWayMsg) {
log.info("------ 使用RocketMQ,发送单向消息给:{} -------", oneWayMsg);
//使用RocketMQ发送消息
rocketMQTemplate.sendOneWay("rocketmq-oneWay-msg-kh96", oneWayMsg);
return "send oneWay msg success";
}
4.3.2 发送请求
4.3.3 日志查看
4.4 发送顺序消息
4.4.1 请求
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试roketmq-发送顺序消息
*/
@RequestMapping("/testRocketMQSendMsgOrderly")
public String testRocketMQSendMsgOrderly(@RequestParam String orderlyMsgs) {
log.info("------ 使用RocketMQ,发送顺序消息:{} -------", orderlyMsgs);
//使用RocketMQ发送顺序消息,必须要提供一个唯一的标识di,比如用户编号等
String userId = UUID.randomUUID().toString().replace("-", "");
//发送多条顺序消息,模拟iang消息分割成多个符号发送
Arrays.asList(orderlyMsgs.split("")).
forEach(orderlyMsg -> rocketMQTemplate.syncSendOrderly("rocketmq-orderly-msg-kh96", orderlyMsg, userId));
return "send orderly msg success";
}
4.4.2 监听器
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: RocketMQ 监听顺序消息
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "rocketmq-orderly-sms-group",
topic = "rocketmq-orderly-msg-kh96",
consumeMode = ConsumeMode.ORDERLY)
public class RocketMQOrderlyMsgListener implements RocketMQListener<String> {
@Override
public void onMessage(String orderlyMsg) {
log.info("------接收顺序消息 :{} ------", orderlyMsg);
}
}
4.2.3 发送请求
4.2.4 消费消息
4.5 发送事务消息(重点)
4.5.1 发送事务消息流程
4.5.1.1 流程图
4.5.1.2 流程解析
- 正常事务发送与提交阶段
- 生产者发送一个半消息给broker(半消息是指的暂时不能消费的消息);
- 服务端响应;
- 开始执行本地事务;
- 根据本地事务的执行情况执行Commit或者Rollback
- 事务信息的补偿流程
- 如果broker长时间没有收到本地事务的执行状态,会向生产者发起一个确认会查的操作请求;
- 生产者收到确认会查请求后,检查本地事务的执行状态;
- 根据检查后的结果执行Commit或者Rollback操作 补偿阶段主要是用于解决生产者在发送Commit或者Rollbacke操作时发生超时或失败的情况;
4.5.2 RocketMQ事务流程关键
-
事务消息在一阶段对用户不可见、
事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的,也就是说消费者不能直接消费.这里RocketMQ实现方法是原消息的主题与消息消费队列,然后把主题改成RMQ_SYS_TRANS_HALF_TOPIC.这样由于消费者没有订阅这个主题,所以不会消费;
-
如何处理第二阶段的发送消息?
在本地事务执行完成后回向Broker发送Commit或者Rollback操作,此时如果在发送消息的时候生产者出故障了,要保证这条消息最终被消费,broker就会向服务端发送回查请求,确认本地事务的执行状态.当然RocketMQ并不会无休止的发送事务状态回查请求,默认是15次,如果15次回查还是无法得知事务的状态,RocketMQ默认回滚消息(broker就会将这条半消息删除);
4.5.3 RocketMQ事务消息原理
-
设计思想
在RocketMQ事务消息的主要流程中,一阶段的消息如何对用户不可见。其中,事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的。那么,如何做到写入消息但是对用户不可见呢?RocketMQ事务消息的做法是:如果消息是half消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费half类型的消息。
-
如何实现事务回查?
Broker会启动一个消息回查的定时任务,定时从事务消息queue中读取所有待反查的消息。针对每个需要反查的半消息,Broker会给对应的Producer发一个要求执行事务状态反查的RPC请求。然后根据RPC返回响应中的反查结果,来决定这个半消息是需要提交还是回滚,或者后续继续来反查。最后,提交或者回滚事务,将半消息标记为已处理状态【将消息存储在主题为:RMQ_SYS_TRANS_OP_HALF_TOPIC的主题中,代表这些消息已经被处理(提交或回滚)】。 如果是提交事务,就把半消息从半消息队列中复制到该消息真正的topic和queue中; 如果是回滚事务,则什么都不做。
参考博客1:https://blog.csdn.net/Weixiaohuai/article/details/123733518
参考博客2:https://blog.csdn.net/qq_42877546/article/details/125404307
4.5.4 实现代码
4.5.4.1 业务层
4.5.4.1.1 接口
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 事务消息的业务 接口
*/
public interface RocketMQTxService {
/**
* @param : [kgcMallOrder]
* @return : void
* @author : huayu
* @date : 30/11/2022
* @description : 发送生成订单的半事务消息
*/
void sendCreateOrderHalfTx(KgcMallOrder kgcMallOrder);
/**
* @param : [txId, kgcMallOrder]
* @return : void
* @author : huayu
* @date : 30/11/2022
* @description : 执行本地生成订单的事务操作
*/
void executeCreateOrderLocalTx(String txId, KgcMallOrder kgcMallOrder);
}
4.5.4.1.2 实现类
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 事务消息的业务 处理类
*/
@Service
@Slf4j
public class RocketMQTxServiceImpl implements RocketMQTxService {
@Autowired(required = false)
private RocketMQTemplate rocketMQTemplate;
@Autowired
private KgcMallOrderRepository kgcMallOrderRepository;
@Autowired
private KgcMallTxlogrepisitory kgcMallTxlogrepisitory;
@Override
@Transactional
public void sendCreateOrderHalfTx(KgcMallOrder kgcMallOrder) {
log.info("###### 1. 开始发送生成订单的半事务消息 到 rocketmq服务端 ######");
//自定义事务编号
String txId = UUID.randomUUID().toString().substring(0, 8);
//发送半事务消息,返回发送结果
TransactionSendResult transactionSendResult =
rocketMQTemplate.sendMessageInTransaction("rocketmq-tx-msg-group", //组
"rocketmq-tx-msg-kh96", //队列
MessageBuilder.withPayload(kgcMallOrder).setHeader("txId", txId).build(), // 消息体
kgcMallOrder); //发送内容
log.info("###### 2. 开始发送生成订单的半事务消息rocketmq服务端成功,响应:{} ######", transactionSendResult);
}
@Override
@Transactional
public void executeCreateOrderLocalTx(String txId, KgcMallOrder kgcMallOrder) {
log.info("###### 3.1 本地开始执行生成订单的事务操作 ######");
//开始插入订单
kgcMallOrderRepository.save(kgcMallOrder);
log.info("###### 3.2 本地执行生成订单的事务操作 成功 ######");
// 模拟本地事务处理失败
// int a = 10 / 0;
log.info("###### 3.3开始生成用于事务回查的本地事务日志 ######");
//创建事务对象
KgcMallTxlog kgcMallTxlog = KgcMallTxlog.builder()
.id(txId)
.txDetail("本地事务日志")
.txTime(new Date())
.build();
//事务日志入库
kgcMallTxlogrepisitory.save(kgcMallTxlog);
log.info("###### 3.4 生成用于事务回查的本地事务日志成功 ######");
}
}
4.5.4.2 监听器
4.5.4.2.1 RocketMQExecuteLocalTxListener
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 事务消息,本地执行事务监听,半事务消息发送成功后,此监听会收到本地事务处理的通知
*/
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "rocketmq-tx-msg-group")
public class RocketMQExecuteLocalTxListener implements RocketMQLocalTransactionListener {
@Autowired
private RocketMQTxService rocketMQTxService;
@Autowired
private KgcMallTxlogrepisitory kgcMallTxlogrepisitory;
//执行本地事务
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
//调用本地事务执行的业务处理接口
log.info("###### 3 半事务消息发送成功, 执行本地事务 ######");
rocketMQTxService.executeCreateOrderLocalTx((String) msg.getHeaders().get("txId"), (KgcMallOrder) arg);
//响应本地事务执行成功结果给服务端,服务端接收到此提交结果,会投递消息
log.info("###### 4 本地事务处理成功,响应事务处理结果给服务端 ######");
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("###### 本地事务执行异常:{} ######", e.getMessage());
}
//响应本地事务执行失败结果给服务端,服务端接收到此回滚结果,不会投递消息(缓存,并定期删除)
log.info("###### 4 本地事务处理失败,响应事务处理结果给服务端 #######");
return RocketMQLocalTransactionState.ROLLBACK;
}
//回查本地事务
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
log.info("###### 5 未收到第4步本地事务处理结果,回查事务状态 ######");
//在网络闪断或者服务重启,没有及时通知服务断事务处理结果,进行会查操作
//如果回查本地事务执行成功(看事物日志是否存在,如果存在就是处理成功如果不存在就是处理失败),通知服务端投递消息,否则不能投递
log.info("###### 6 检查本地事务处理结果,响应事务处理结果给服务端 ######");
if (kgcMallTxlogrepisitory.findById((String) msg.getHeaders().get("txId")).orElse(null) == null) {
//本地事务入库失败,代表本地事务没有处理成功,步投递消息(缓存,并定期删除)
log.info("###### 7 检查本地事务处理结果失败 ######");
return RocketMQLocalTransactionState.ROLLBACK;
}
//本地事务入库成功,代表本地事务处理成功,投递消息
log.info("###### 7 检查本地事务处理结果成功 ######");
return RocketMQLocalTransactionState.COMMIT;
}
}
4.5.4.2.2 RocketMQConsumerTxMsgListener
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 事务消息,消费监听,如果本地事务处理成功,会收到投递的消息,如果失败,收不到消息
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "rocket-tx-msg-consumer-group",
topic = "rocket-tx-msg-kh96"
)
public class RocketMQConsumerTxMsgListener implements RocketMQListener<Object> {
@Override
public void onMessage(Object message) {
log.info("###### 8 消费端,收到生成订单成功的事务消息:{} ###### ", message);
}
}
4.5.4.3 控制器
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: RocketMQ 消息队列 测试消息入口
*/
@Slf4j
@RestController
public class RocketMQController {
@Autowired(required = false)
private RocketMQTemplate rocketMQTemplate;
@Autowired
private RocketMQTxService rocketMQTxService;
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试发送消息到用户中心,用户中心给手机号发信息
*/
@RequestMapping("/testRocketMQSendMsg")
public String testRocketMQSendMsg(@RequestParam String phoneNo) {
log.info("------ 使用RocketMQ,测试给手机:{},发送消息 -------", phoneNo);
//使用RocketMQ发送消息
rocketMQTemplate.convertAndSend("rocketmq-send-sms-kh96", phoneNo);
return "send msg success";
}
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试roketmq-发送可靠同步消息
*/
@RequestMapping("/testRocketMQSendMsgSync")
public String testRocketMQSendMsgSync(@RequestParam String syncMsg) {
log.info("------ 使用RocketMQ,发送可靠同步消息{} -------", syncMsg);
//使用RocketMQ发送消息,拿到同步结果
SendResult sendResult = rocketMQTemplate.syncSend("rocketmq-sync-msg-kh96", syncMsg);
log.info("------ 使用RocketMQ,发送可靠同步消息结果:{} -------", sendResult);
return "send sync msg success";
}
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试roketmq-发送可靠异步消息
*/
@RequestMapping("/testRocketMQSendMsgAsync")
public String testRocketMQSendMsgAsync(@RequestParam String asyncMsg) {
log.info("------ 使用RocketMQ,发送可靠异步消息:{} -------", asyncMsg);
//使用RocketMQ发送消息
rocketMQTemplate.asyncSend("rocketmq-sync-msg-kh96",
asyncMsg,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("------ 可靠异步发送成功回调 ------");
}
@Override
public void onException(Throwable throwable) {
log.info("------ 可靠异步发送失败回调 ------");
}
});
return "send async msg success";
}
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试roketmq-发送单向消息,只发不收结果
*/
@RequestMapping("/testRocketMQSendMsgOneWay")
public String testRocketMQSendMsgOneWay(@RequestParam String oneWayMsg) {
log.info("------ 使用RocketMQ,发送单向消息:{} -------", oneWayMsg);
//使用RocketMQ发送消息
rocketMQTemplate.sendOneWay("rocketmq-oneWay-msg-kh96", oneWayMsg);
return "send oneWay msg success";
}
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试roketmq-发送顺序消息
*/
@RequestMapping("/testRocketMQSendMsgOrderly")
public String testRocketMQSendMsgOrderly(@RequestParam String orderlyMsgs) {
log.info("------ 使用RocketMQ,发送顺序消息:{} -------", orderlyMsgs);
//使用RocketMQ发送顺序消息,必须要提供一个唯一的标识di,比如用户编号等
String userId = UUID.randomUUID().toString().replace("-", "");
//发送多条顺序消息,模拟iang消息分割成多个符号发送
Arrays.asList(orderlyMsgs.split("")).
forEach(orderlyMsg -> rocketMQTemplate.syncSendOrderly("rocketmq-orderly-msg-kh96", orderlyMsg, userId));
return "send orderly msg success";
}
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试roketmq-发送事务消息
*/
@RequestMapping("/testRocketMQSendMsgTx")
public String testRocketMQSendMsgTx(@RequestParam String txmsg) {
log.info("------ 使用RocketMQ,发送事务消息:{} -------", txmsg);
//使用RocketMQ发送事务消息,模拟生成一笔订单
KgcMallOrder kgcMallOrder = KgcMallOrder.builder()
.userId(2)
.userName("RocketMQ_tx")
.prodId(2)
.prodName(txmsg)
.totalPrice(96.0)
.build();
//发送事务消息
rocketMQTxService.sendCreateOrderHalfTx(kgcMallOrder);
return "send tx msg success";
}
}