发布时间:2022-12-02 文章分类:编程知识 投稿人:王小丽 字号: 默认 | | 超大 打印

1、 RocketMQ安装测试

1.1 下载解压

下载地址:https://rocketmq.apache.org/release-notes/

rocketmq-all-5.0.0-bin-release.zip

下载后上传到服务器;

解压命令# unzip rocketmq-all-5.0.0-bin-release.zip

1.2 启动 测试

RocketMQ默认配置是比较好的,这样可以直接应用于生产环境,所以如果机器内存较小,启动会因为内存不足失败,为了避免后面启动失败,选择先修改其内存大小,一般阿里云服务器是满足不了默认内存。

手动调整JVM的配置,单位从g改为m

1.2.1 启动nameserver

1.2.1.1 修改runbroker.sh和runserver.sh

SpringCloud Alibaba(五) - RocketMQ

1.2.1.2 runbroker.sh
 -server -Xms256m -Xmx256m -Xmn128m

SpringCloud Alibaba(五) - RocketMQ

1.2.1.3 runserver.sh
-server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m

SpringCloud Alibaba(五) - RocketMQ

1.2.1.4 启动nameserver
解压目录执行# nohup ./bin/mqnamesrv -n 1.117.75.57(自己的ip):9876 &

SpringCloud Alibaba(五) - RocketMQ

1.2.2 启动broker

1.2.2.1 修改broker.conf

SpringCloud Alibaba(五) - RocketMQ

添加namesrvAddr 和 brokerIP1:

SpringCloud Alibaba(五) - RocketMQ

1.2.2.3 启动 borker
解压目录执行# nohup ./bin/mqbroker -n 1.117.75.57:9876 -c ./conf/broker.conf &
1.2.2.4 查看启动情况
jps

SpringCloud Alibaba(五) - RocketMQ

1.2.3 测试

由于服务器内存可能比较小,建议先关闭其他应用,比如rabbitmq,docker的容器等;

还需要开启几个端口:9876,10909,10910,10911;

1.2.3.1 生产者
导出环境变量# export NAMESRV_ADDR=1.117.75.57:9876
发送消息# bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

SpringCloud Alibaba(五) - RocketMQ

1.2.3.2 消费者
导出环境变量# export NAMESRV_ADDR=1.117.75.57:9876
消费消息# bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

SpringCloud Alibaba(五) - RocketMQ

1.2.4 关闭命令

关闭nameserver# bin/mqshutdown namesrv
关闭broker# bin/mqshutdown broker

1.2.5 RocketMQ控制台

1.2.5.1 下载,解压,修改配置信息

SpringCloud Alibaba(五) - RocketMQ

1.2.5.2 访问控制台

localhost:9696

SpringCloud Alibaba(五) - RocketMQ

2、RocketMQ框架原理

2.1 框架

SpringCloud Alibaba(五) - RocketMQ

2.2 概念

整体可以分成4个角色,分别是:NameServer,Broker,Producer,Consumer:

3、RocketMQ整合

3.1 rocketmq模块 发送消息

3.2.1.1 依赖
<!-- rocket -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.2</version>
</dependency>
3.2.1.2 配置
# rocketmq配置
rocketmq:
  #rocketMQ服务的地址
  name-server: 1.117.75.57:9876
  # 生产者组
  producer:
    group: kh96-sendsms-group
3.2.1.3 请求
@Autowired(required = false)
private RocketMQTemplate rocketMQTemplate;
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试发送消息到用户中心,用户中心给手机号发信息
*/
@RequestMapping("/testRocketMQSendMsg")
public String testRocketMQSendMsg(@RequestParam String phoneNo) {
    log.info("------ 使用RocketMQ,测试给手机:{},发送消息 -------", phoneNo);
    //使用RocketMQ发送消息
    rocketMQTemplate.convertAndSend("rocketmq-send-sms-kh96", phoneNo);
    return "send msg success";
}

3.2 user模块 消费消息

1.添加加rocketmq的依赖;

2.用户服务,监听发送短信的请求发送消息:

/**
 * Created On : 30/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: 用户服务,监听发送短信的请求发送消息
 */
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "rocketmq-user-sms-group", //组 随便写
        topic = "rocketmq-send-sms-kh96" //消息队列,发送的时候指定的
)
public class SendSmsListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("*****  接收发送信息的请求,给手机:{},发送消息 ******", message);
    }
}

3.3 测试

3.3.1 发送请求

SpringCloud Alibaba(五) - RocketMQ

3.3.2 发送消息模块日志

SpringCloud Alibaba(五) - RocketMQ

3.3.3 接收消息模块日志

SpringCloud Alibaba(五) - RocketMQ

3.3.4 控制台查看消息详情

SpringCloud Alibaba(五) - RocketMQ

SpringCloud Alibaba(五) - RocketMQ

4、发送不同类型的消息

4.1 发送可靠同步消息

4.1.1 请求

/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试roketmq-发送可靠同步消息
*/
@RequestMapping("/testRocketMQSendMsgSync")
public String testRocketMQSendMsgSync(@RequestParam String syncMsg) {
    log.info("------ 使用RocketMQ,发送可靠同步消息{} -------", syncMsg);
    //使用RocketMQ发送消息,拿到同步结果
    SendResult sendResult = rocketMQTemplate.syncSend("rocketmq-sync-msg-kh96", syncMsg);
    log.info("------ 使用RocketMQ,发送可靠同步消息结果:{} -------", sendResult);
    return "send sync msg success";
}

4.1.2 发送请求

SpringCloud Alibaba(五) - RocketMQ

4.1.3 同步结果

SpringCloud Alibaba(五) - RocketMQ

4.2 发送可靠异步消息

4.2.1 请求

/**
 * @param : [phoneNo]
 * @return : java.lang.String
 * @author : huayu
 * @date : 30/11/2022
 * @description : 测试roketmq-发送可靠异步消息
 */
@RequestMapping("/testRocketMQSendMsgAsync")
public String testRocketMQSendMsgAsync(@RequestParam String asyncMsg) {
    log.info("------ 使用RocketMQ,发送可靠异步消息:{} -------", asyncMsg);
    //使用RocketMQ发送消息
    rocketMQTemplate.asyncSend("rocketmq-sync-msg-kh96",
            asyncMsg,
            new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("------  可靠异步发送成功回调  ------");
                }
                @Override
                public void onException(Throwable throwable) {
                    log.info("------  可靠异步发送失败回调  ------");
                }
            });
    return "send async msg success";
}

4.2.2 发送请求

SpringCloud Alibaba(五) - RocketMQ

4.2.3 回调结果

SpringCloud Alibaba(五) - RocketMQ

4.3 发送单项消息,只发不收结果

4.3.1 请求

/**
 * @param : [phoneNo]
 * @return : java.lang.String
 * @author : huayu
 * @date : 30/11/2022
 * @description : 测试roketmq-发送单向消息,只发不收结果
 */
@RequestMapping("/testRocketMQSendMsgOneWay")
public String testRocketMQSendMsgOneWay(@RequestParam String oneWayMsg) {
    log.info("------ 使用RocketMQ,发送单向消息给:{} -------", oneWayMsg);
    //使用RocketMQ发送消息
    rocketMQTemplate.sendOneWay("rocketmq-oneWay-msg-kh96", oneWayMsg);
    return "send oneWay msg success";
}

4.3.2 发送请求

SpringCloud Alibaba(五) - RocketMQ

4.3.3 日志查看

SpringCloud Alibaba(五) - RocketMQ

4.4 发送顺序消息

4.4.1 请求

/**
 * @param : [phoneNo]
 * @return : java.lang.String
 * @author : huayu
 * @date : 30/11/2022
 * @description : 测试roketmq-发送顺序消息
 */
@RequestMapping("/testRocketMQSendMsgOrderly")
public String testRocketMQSendMsgOrderly(@RequestParam String orderlyMsgs) {
    log.info("------ 使用RocketMQ,发送顺序消息:{} -------", orderlyMsgs);
    //使用RocketMQ发送顺序消息,必须要提供一个唯一的标识di,比如用户编号等
    String userId = UUID.randomUUID().toString().replace("-", "");
    //发送多条顺序消息,模拟iang消息分割成多个符号发送
    Arrays.asList(orderlyMsgs.split("")).
            forEach(orderlyMsg -> rocketMQTemplate.syncSendOrderly("rocketmq-orderly-msg-kh96", orderlyMsg, userId));
    return "send orderly msg success";
}

4.4.2 监听器

/**
 * Created On : 30/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: RocketMQ 监听顺序消息
 */
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "rocketmq-orderly-sms-group",
        topic = "rocketmq-orderly-msg-kh96",
        consumeMode = ConsumeMode.ORDERLY)
public class RocketMQOrderlyMsgListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String orderlyMsg) {
        log.info("------接收顺序消息 :{} ------", orderlyMsg);
    }
}

4.2.3 发送请求

SpringCloud Alibaba(五) - RocketMQ

4.2.4 消费消息

SpringCloud Alibaba(五) - RocketMQ

4.5 发送事务消息(重点)

4.5.1 发送事务消息流程

4.5.1.1 流程图

SpringCloud Alibaba(五) - RocketMQ

4.5.1.2 流程解析
  1. 生产者发送一个半消息给broker(半消息是指的暂时不能消费的消息);
  2. 服务端响应;
  3. 开始执行本地事务;
  4. 根据本地事务的执行情况执行Commit或者Rollback
  1. 如果broker长时间没有收到本地事务的执行状态,会向生产者发起一个确认会查的操作请求;
  2. 生产者收到确认会查请求后,检查本地事务的执行状态;
  3. 根据检查后的结果执行Commit或者Rollback操作 补偿阶段主要是用于解决生产者在发送Commit或者Rollbacke操作时发生超时或失败的情况;

4.5.2 RocketMQ事务流程关键

4.5.3 RocketMQ事务消息原理

参考博客1:https://blog.csdn.net/Weixiaohuai/article/details/123733518

参考博客2:https://blog.csdn.net/qq_42877546/article/details/125404307

4.5.4 实现代码

SpringCloud Alibaba(五) - RocketMQ

4.5.4.1 业务层
4.5.4.1.1 接口
/**
 * Created On : 30/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: 事务消息的业务 接口
 */
public interface RocketMQTxService {
    /**
     * @param : [kgcMallOrder]
     * @return : void
     * @author : huayu
     * @date : 30/11/2022
     * @description : 发送生成订单的半事务消息
     */
    void sendCreateOrderHalfTx(KgcMallOrder kgcMallOrder);
    /**
     * @param : [txId, kgcMallOrder]
     * @return : void
     * @author : huayu
     * @date : 30/11/2022
     * @description : 执行本地生成订单的事务操作
     */
    void executeCreateOrderLocalTx(String txId, KgcMallOrder kgcMallOrder);
}
4.5.4.1.2 实现类
/**
 * Created On : 30/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: 事务消息的业务 处理类
 */
@Service
@Slf4j
public class RocketMQTxServiceImpl implements RocketMQTxService {
    @Autowired(required = false)
    private RocketMQTemplate rocketMQTemplate;
    @Autowired
    private KgcMallOrderRepository kgcMallOrderRepository;
    @Autowired
    private KgcMallTxlogrepisitory kgcMallTxlogrepisitory;
    @Override
    @Transactional
    public void sendCreateOrderHalfTx(KgcMallOrder kgcMallOrder) {
        log.info("###### 1. 开始发送生成订单的半事务消息 到 rocketmq服务端   ######");
        //自定义事务编号
        String txId = UUID.randomUUID().toString().substring(0, 8);
        //发送半事务消息,返回发送结果
        TransactionSendResult transactionSendResult =
                rocketMQTemplate.sendMessageInTransaction("rocketmq-tx-msg-group", //组
                        "rocketmq-tx-msg-kh96",  //队列
                        MessageBuilder.withPayload(kgcMallOrder).setHeader("txId", txId).build(), // 消息体
                        kgcMallOrder); //发送内容
        log.info("###### 2. 开始发送生成订单的半事务消息rocketmq服务端成功,响应:{}   ######", transactionSendResult);
    }
    @Override
    @Transactional
    public void executeCreateOrderLocalTx(String txId, KgcMallOrder kgcMallOrder) {
        log.info("###### 3.1 本地开始执行生成订单的事务操作  ######");
        //开始插入订单
        kgcMallOrderRepository.save(kgcMallOrder);
        log.info("###### 3.2 本地执行生成订单的事务操作 成功  ######");
        // 模拟本地事务处理失败
//        int a = 10 / 0;
        log.info("###### 3.3开始生成用于事务回查的本地事务日志  ######");
        //创建事务对象
        KgcMallTxlog kgcMallTxlog = KgcMallTxlog.builder()
                .id(txId)
                .txDetail("本地事务日志")
                .txTime(new Date())
                .build();
        //事务日志入库
        kgcMallTxlogrepisitory.save(kgcMallTxlog);
        log.info("###### 3.4 生成用于事务回查的本地事务日志成功 ######");
    }
}
4.5.4.2 监听器
4.5.4.2.1 RocketMQExecuteLocalTxListener
/**
 * Created On : 30/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: 事务消息,本地执行事务监听,半事务消息发送成功后,此监听会收到本地事务处理的通知
 */
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "rocketmq-tx-msg-group")
public class RocketMQExecuteLocalTxListener implements RocketMQLocalTransactionListener {
    @Autowired
    private RocketMQTxService rocketMQTxService;
    @Autowired
    private KgcMallTxlogrepisitory kgcMallTxlogrepisitory;
    //执行本地事务
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            //调用本地事务执行的业务处理接口
            log.info("###### 3 半事务消息发送成功, 执行本地事务  ######");
            rocketMQTxService.executeCreateOrderLocalTx((String) msg.getHeaders().get("txId"), (KgcMallOrder) arg);
            //响应本地事务执行成功结果给服务端,服务端接收到此提交结果,会投递消息
            log.info("###### 4 本地事务处理成功,响应事务处理结果给服务端 ######");
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("###### 本地事务执行异常:{} ######", e.getMessage());
        }
        //响应本地事务执行失败结果给服务端,服务端接收到此回滚结果,不会投递消息(缓存,并定期删除)
        log.info("###### 4 本地事务处理失败,响应事务处理结果给服务端 #######");
        return RocketMQLocalTransactionState.ROLLBACK;
    }
    //回查本地事务
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        log.info("###### 5 未收到第4步本地事务处理结果,回查事务状态 ######");
        //在网络闪断或者服务重启,没有及时通知服务断事务处理结果,进行会查操作
        //如果回查本地事务执行成功(看事物日志是否存在,如果存在就是处理成功如果不存在就是处理失败),通知服务端投递消息,否则不能投递
        log.info("###### 6 检查本地事务处理结果,响应事务处理结果给服务端 ######");
        if (kgcMallTxlogrepisitory.findById((String) msg.getHeaders().get("txId")).orElse(null) == null) {
            //本地事务入库失败,代表本地事务没有处理成功,步投递消息(缓存,并定期删除)
            log.info("###### 7 检查本地事务处理结果失败 ######");
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        //本地事务入库成功,代表本地事务处理成功,投递消息
        log.info("###### 7 检查本地事务处理结果成功 ######");
        return RocketMQLocalTransactionState.COMMIT;
    }
}
4.5.4.2.2 RocketMQConsumerTxMsgListener
/**
 * Created On : 30/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: 事务消息,消费监听,如果本地事务处理成功,会收到投递的消息,如果失败,收不到消息
 */
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "rocket-tx-msg-consumer-group",
        topic = "rocket-tx-msg-kh96"
)
public class RocketMQConsumerTxMsgListener implements RocketMQListener<Object> {
    @Override
    public void onMessage(Object message) {
        log.info("######  8 消费端,收到生成订单成功的事务消息:{} ###### ", message);
    }
}
4.5.4.3 控制器
/**
 * Created On : 30/11/2022.
 * <p>
 * Author : huayu
 * <p>
 * Description: RocketMQ 消息队列 测试消息入口
 */
@Slf4j
@RestController
public class RocketMQController {
    @Autowired(required = false)
    private RocketMQTemplate rocketMQTemplate;
    @Autowired
    private RocketMQTxService rocketMQTxService;
    /**
     * @param : [phoneNo]
     * @return : java.lang.String
     * @author : huayu
     * @date : 30/11/2022
     * @description : 测试发送消息到用户中心,用户中心给手机号发信息
     */
    @RequestMapping("/testRocketMQSendMsg")
    public String testRocketMQSendMsg(@RequestParam String phoneNo) {
        log.info("------ 使用RocketMQ,测试给手机:{},发送消息 -------", phoneNo);
        //使用RocketMQ发送消息
        rocketMQTemplate.convertAndSend("rocketmq-send-sms-kh96", phoneNo);
        return "send msg success";
    }
    /**
     * @param : [phoneNo]
     * @return : java.lang.String
     * @author : huayu
     * @date : 30/11/2022
     * @description : 测试roketmq-发送可靠同步消息
     */
    @RequestMapping("/testRocketMQSendMsgSync")
    public String testRocketMQSendMsgSync(@RequestParam String syncMsg) {
        log.info("------ 使用RocketMQ,发送可靠同步消息{} -------", syncMsg);
        //使用RocketMQ发送消息,拿到同步结果
        SendResult sendResult = rocketMQTemplate.syncSend("rocketmq-sync-msg-kh96", syncMsg);
        log.info("------ 使用RocketMQ,发送可靠同步消息结果:{} -------", sendResult);
        return "send sync msg success";
    }
    /**
     * @param : [phoneNo]
     * @return : java.lang.String
     * @author : huayu
     * @date : 30/11/2022
     * @description : 测试roketmq-发送可靠异步消息
     */
    @RequestMapping("/testRocketMQSendMsgAsync")
    public String testRocketMQSendMsgAsync(@RequestParam String asyncMsg) {
        log.info("------ 使用RocketMQ,发送可靠异步消息:{} -------", asyncMsg);
        //使用RocketMQ发送消息
        rocketMQTemplate.asyncSend("rocketmq-sync-msg-kh96",
                asyncMsg,
                new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        log.info("------  可靠异步发送成功回调  ------");
                    }
                    @Override
                    public void onException(Throwable throwable) {
                        log.info("------  可靠异步发送失败回调  ------");
                    }
                });
        return "send async msg success";
    }
    /**
     * @param : [phoneNo]
     * @return : java.lang.String
     * @author : huayu
     * @date : 30/11/2022
     * @description : 测试roketmq-发送单向消息,只发不收结果
     */
    @RequestMapping("/testRocketMQSendMsgOneWay")
    public String testRocketMQSendMsgOneWay(@RequestParam String oneWayMsg) {
        log.info("------ 使用RocketMQ,发送单向消息:{} -------", oneWayMsg);
        //使用RocketMQ发送消息
        rocketMQTemplate.sendOneWay("rocketmq-oneWay-msg-kh96", oneWayMsg);
        return "send oneWay msg success";
    }
    /**
     * @param : [phoneNo]
     * @return : java.lang.String
     * @author : huayu
     * @date : 30/11/2022
     * @description : 测试roketmq-发送顺序消息
     */
    @RequestMapping("/testRocketMQSendMsgOrderly")
    public String testRocketMQSendMsgOrderly(@RequestParam String orderlyMsgs) {
        log.info("------ 使用RocketMQ,发送顺序消息:{} -------", orderlyMsgs);
        //使用RocketMQ发送顺序消息,必须要提供一个唯一的标识di,比如用户编号等
        String userId = UUID.randomUUID().toString().replace("-", "");
        //发送多条顺序消息,模拟iang消息分割成多个符号发送
        Arrays.asList(orderlyMsgs.split("")).
                forEach(orderlyMsg -> rocketMQTemplate.syncSendOrderly("rocketmq-orderly-msg-kh96", orderlyMsg, userId));
        return "send orderly msg success";
    }
    /**
     * @param : [phoneNo]
     * @return : java.lang.String
     * @author : huayu
     * @date : 30/11/2022
     * @description : 测试roketmq-发送事务消息
     */
    @RequestMapping("/testRocketMQSendMsgTx")
    public String testRocketMQSendMsgTx(@RequestParam String txmsg) {
        log.info("------ 使用RocketMQ,发送事务消息:{} -------", txmsg);
        //使用RocketMQ发送事务消息,模拟生成一笔订单
        KgcMallOrder kgcMallOrder = KgcMallOrder.builder()
                .userId(2)
                .userName("RocketMQ_tx")
                .prodId(2)
                .prodName(txmsg)
                .totalPrice(96.0)
                .build();
        //发送事务消息
        rocketMQTxService.sendCreateOrderHalfTx(kgcMallOrder);
        return "send tx msg success";
    }
}

4.5.5 测试

4.5.5.1 发送事务消息成功
4.5.5.1.1 发送请求

SpringCloud Alibaba(五) - RocketMQ

4.5.5.1.2 日志查看

SpringCloud Alibaba(五) - RocketMQ

4.5.5.2 发送事务消息失败(模拟异常)
4.5.5.2.1 发送请求

SpringCloud Alibaba(五) - RocketMQ

4.5.5.2.2 日志查看

SpringCloud Alibaba(五) - RocketMQ