RabbitMQ

整合RabbitMQ

/**
 * 使用RabbitMQ
 *  1、引入ampq场景,RabbitAutoConfiguration 就会自动生效
 *  2、给容器中自动配置了
 *      RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate
 *      所有的属性都是在
 *          @EnableConfigurationProperties(RabbitProperties.class)
 *          @ConfigurationProperties(prefix = "spring.rabbitmq")
 *          public class RabbitProperties
 *  3、给配置文件中配置 spring.rabbitmq 信息
 *  4、@EnableRabbit 开启功能
 *  5、监听消息:使用 @RabbitListener,必须有 @EnableRabbit
 *      @RabbitListener:类 + 方法上
 *      @RabbitHandler: 只能标在方法上
 */
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
# rabbit 配置文件
spring.rabbitmq.host=192.168.106.101
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

测试

package com.atguigu.gulimall.order;
import com.atguigu.gulimall.order.entity.OrderReturnApplyEntity;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Date;
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class GulimallOrderApplicationTests {
    @Autowired
    AmqpAdmin amqpAdmin;
    @Autowired
    RabbitTemplate rabbitTemplate;
    /**
     * 1、创建Exchange[hello.java.exchange]、Queue、Binding
     *      - 使用 AmqpAdmin 进行创建
     *
     * 2、如何收发消息 -> RabbitTemplate
     *      如果发送的消息是个对象,使用序列化机制,将对象写出去,对象实现 Serializable 接口
     *      自定义序列化添加配置
     *      @Configuration
     *      public class MyRabbitConfig {
     *          @Bean
     *          public MessageConverter messageConverter() {
     *              return new Jackson2JsonMessageConverter();
     *           }
     *      }
     */
    @Test
    public void sendMessageTest() {
        String msg = "Hello World";
        OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity();
        orderReturnApplyEntity.setId(1L);
        orderReturnApplyEntity.setSkuName("华为");
        orderReturnApplyEntity.setCreateTime(new Date());
        rabbitTemplate.convertAndSend("hello.java.exchange", "hello.java", orderReturnApplyEntity);
        log.info("消息发送完成:{}", orderReturnApplyEntity);
    }
    @Test
    public void createExchange() {
        //amqpAdmin
        /**
         * DirectExchange
         * public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
         */
        DirectExchange exchange = new DirectExchange("hello.java.exchange", true,false);
        amqpAdmin.declareExchange(exchange);
        log.info("Exchange[{}]创建成功", "hello.java.exchange");
    }
    @Test
    public void createQueue() {
        /**
         * public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)
         */
        Queue queue = new Queue("hello-java-queue", true, false,true);
        amqpAdmin.declareQueue(queue);
        log.info("Queue[{}]创建成功", "hello-java-queue");
    }
    @Test
    public void createBinding() {
        /**
         * public Binding(String destination【目的地】,
         * DestinationType destinationType【目的地类型】,
         * String exchange【交换机】,
         * String routingKey【路由键】,
         * Map<String, Object> arguments)【参数】
         * 将 exchange 指定交换机和 destination目的地进行绑定,使用routingKey作为指定路由键
         */
        Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,"hello.java.exchange","hello.java",null);
        amqpAdmin.declareBinding(binding);
        log.info("Binding == 创建成功");
    }
}

测试监听消息

/**
 * queues:声明需要监听的所欲队列
 *
 * org.springframework.amqp.core.Message;
 *
 * 参数可以写以下类型
 *  1、Message message;原生消息详细信息,头 + 体
 *  2、T<发送的消息的类型> OrderReturnApplyEntity content
 *  3、Channel channel:当前传输数据的通道
 *
 *  Queue:可以很多人都来监听,只要收到消息,队列删除消息,而且只有一个人收到此消息
 *      1、订单服务启动多个:同一个消息,只能有一个客户端收到
 *      2、只有一个消息完全处理完,方法运行结束,我们就可以接受到下一个消息
 */
//@RabbitListener(queues = {"hello-java-queue"})
@RabbitHander
public void receiveMessage(Message message, OrderReturnReasonEntity content) {
    System.out.println("接收到消息....:"+ message + "===>内容;" + content + "类型是:" + message.getClass());
    byte[] body = message.getBody();
    //消息头属性信息
    MessageProperties properties = message.getMessageProperties();
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("消息处理完成=》" + content.getName());
}

@RabbitListener

简介:

1.用于标注在监听类或监听方法上,接收消息,需要指定监听的队列(数组)
2.使用该注解之前,需要在启动类加上该注解:@EnableRabbit
3.@RabbitListener即可以标注在方法上又可以标注在类上
	标注在类上:表示该类是监听类,使得@RabbitHandler注解生效
	标注在方法上:表示该方法时监听方法,会监听指定队列获得消息
4.一般只标注在方法上,并配合@RabbitHandler使用,重载的方式接收不同消息对象

@RabbitHandler

作用:

配合@RabbitListener,使用方法重载的方法接收不同的消息类型

简介:

1.用于标注在监听方法上,接收消息,不需要指定监听的队列
2.使用该注解之前,需要在启动类加上该注解:@EnableRabbit
3.@RabbitListener只可以标注在方法,重载的方式接收不同消息对象

发送端消息确认配置

1、配置

2、定制 RabbitTemplate,设置确认回调

# rabbit 配置文件
spring.rabbitmq.host=192.168.106.101
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
# 开启发送端确认
spring.rabbitmq.publisher-confirms=true
#开启发送端消息抵达确认
spring.rabbitmq.publisher-returns=true
#只要抵达队列。以异步发送优先回调returnconfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
package com.atguigu.gulimall.order.config;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration
public class MyRabbitConfig {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    /**
     * 定制 rabbitTemplate
     * 1、服务收到消息就回调
     *      1、spring.rabbitmq.publisher-confirms=true
     *      2、设置确认回调ConfirmCallback
     * 2、消息正确地打队列进行回调
     *      1、spring.rabbitmq.publisher-returns=true
     *         spring.rabbitmq.template.mandatory=true
     *      2、设置消息抵达队列的回调
     * 3、消费端确认【保证每一个消息被正确消费,此时才可以让broker删除】
     *      1、默认是自动确认,只要消息接受到,自动确认,服务端就会移除这个消息
     *      2、手动确认默认,只要没有明确告诉MQ,货物被签收,没有ACK,消息一直是unacked状态。
     *          即使Cosumer宕机,消息也不会丢失,会重新变成Ready,等待下一次新的consumer链接发给他
     *      3、如果手动确认:Channel channel -> long deliveryTag = properties.getDeliveryTag(); -> channel.basicAck(deliveryTag, false);
     *          channel.basicAck(deliveryTag, false);           签收
     *          channel.basicNack(deliveryTag, false, true);    拒签
     */
    @PostConstruct // MyRabbitConfig 对象创建完成以后执行这个方法
    public void initRabbitTemplate(){
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 只要抵达服务器,ack就确认为true
             * @param correlationData 当前消息的唯一关联数据(消息的唯一id)
             * @param ack 是否成功或者失败
             * @param cause 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm..." + correlationData + "==> ack:" + ack + "==> cause:" + cause);
            }
        });
        //设置消息抵达队列的回调
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * 只要消息没有投递给指定的队列,就触发失败回调
             * @param message   投递失败的消息详细信息
             * @param replyCode 回复的状态码
             * @param replyText 回复的文本内容
             * @param exchange  消息发给那个交换机
             * @param routingKey 当时这个消息使用哪个路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("Fail Message:" + message + "==> replyTest:" + replyText + "==>exchange" + exchange + "==>routingKey:" + routingKey);
            }
        });
    }
}
/**
 * queues:声明需要监听的所欲队列
 * <p>
 * org.springframework.amqp.core.Message;
 * <p>
 * 参数可以写以下类型
 * 1、Message message;原生消息详细信息,头 + 体
 * 2、T<发送的消息的类型> OrderReturnApplyEntity content
 * 3、Channel channel:当前传输数据的通道
 * <p>
 * Queue:可以很多人都来监听,只要收到消息,队列删除消息,而且只有一个人收到此消息
 * 1、订单服务启动多个:同一个消息,只能有一个客户端收到
 * 2、只有一个消息完全处理完,方法运行结束,我们就可以接受到下一个消息
 */
@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel) throws IOException {
    //System.out.println("接收到消息....:"+ message + "===>内容;" + content + "类型是:" + message.getClass());
    System.out.println("接收到消息....:" + content);
    byte[] body = message.getBody();
    //消息头属性信息
    MessageProperties properties = message.getMessageProperties();
    /*try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }*/
    System.out.println("消息处理完成=》" + content.getName());
    long deliveryTag = properties.getDeliveryTag();
    System.out.println("deliverTag: " + deliveryTag);
    if (deliveryTag % 2 == 0) {
        //收货
        // 签收获取,非批量模式
        channel.basicAck(deliveryTag, false);
    } else {
        //requeue 重新入队
        //basicNack(long deliveryTag, boolean multiple, boolean requeue)
        channel.basicNack(deliveryTag, false, true);
        System.out.println("没有签收的货物....." + deliveryTag);
    }
}

最终整合

1.导入mq依赖
<!--amqp高级消息队列协议,rabbitmq实现-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.ware模块导入配置
spring:
  rabbitmq:
    host: 192.168.56.10
    port: 5672
    # 虚拟主机
    virtual-host: /
    # 开启发送端发送确认,无论是否到达broker都会触发回调【发送端确认机制+本地事务表】
    publisher-confirm-type: correlated
    # 开启发送端抵达队列确认,消息未被队列接收时触发回调【发送端确认机制+本地事务表】
    publisher-returns: true
    # 消息在没有被队列接收时是否强行退回
    template:
      mandatory: true
    # 消费者手动确认模式,关闭自动确认,否则会消息丢失
    listener:
      simple:
        acknowledge-mode: manual
3.添加注解
// 开启rabbit
@EnableRabbit
4.创建配置类
/**
 * @Author: wanzenghui
 * @Date: 2021/12/15 0:04
 */
@Configuration
public class MyRabbitConfig {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Bean
    public MessageConverter messageConverter() {
        // 使用json序列化器来序列化消息,发送消息时,消息对象会被序列化成json格式
        return new Jackson2JsonMessageConverter();
    }
    /**
     * 定制RabbitTemplate
     * 1、服务收到消息就会回调
     * 1、spring.rabbitmq.publisher-confirms: true
     * 2、设置确认回调
     * 2、消息正确抵达队列就会进行回调
     * 1、spring.rabbitmq.publisher-returns: true
     * spring.rabbitmq.template.mandatory: true
     * 2、设置确认回调ReturnCallback
     * <p>
     * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
     */
    @PostConstruct   // (MyRabbitConfig对象创建完成以后,执行这个方法)
    public void initRabbitTemplate() {
        /**
         * 发送消息触发confirmCallback回调
         * @param correlationData:当前消息的唯一关联数据(如果发送消息时未指定此值,则回调时返回null)
         * @param ack:消息是否成功收到(ack=true,消息抵达Broker)
         * @param cause:失败的原因
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            System.out.println("发送消息触发confirmCallback回调" +
                    "\ncorrelationData ===> " + correlationData +
                    "\nack ===> " + ack + "" +
                    "\ncause ===> " + cause);
            System.out.println("=================================================");
        });
        /**
         * 消息未到达队列触发returnCallback回调
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * @param message:投递失败的消息详细信息
         * @param replyCode:回复的状态码
         * @param replyText:回复的文本内容
         * @param exchange:接收消息的交换机
         * @param routingKey:接收消息的路由键
         */
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 需要修改数据库 消息的状态【后期定期重发消息】
            System.out.println("消息未到达队列触发returnCallback回调" +
                    "\nmessage ===> " + message +
                    "\nreplyCode ===> " + replyCode +
                    "\nreplyText ===> " + replyText +
                    "\nexchange ===> " + exchange +
                    "\nroutingKey ===> " + routingKey);
            System.out.println("==================================================");
        });
    }
}
5.创建ware解锁库存的延时队列、死信队列、交换机、绑定关系
/**
 * 创建队列,交换机,延时队列,绑定关系 的configuration
 * 1.Broker中的Queue、Exchange、Binding不存在的情况下,会自动创建(在RabbitMQ),不会重复创建覆盖
 * 2.懒加载,只有第一次使用的时候才会创建(例如监听队列)
 */
@Configuration
public class MyRabbitMQConfig {
    /**
     * 用于首次创建队列、交换机、绑定关系的监听
     * @param message
     */
    @RabbitListener(queues = "stock.release.stock.queue")
    public void handle(Message message) {
    }
    /**
     * 交换机
     * Topic,可以绑定多个队列
     */
    @Bean
    public Exchange stockEventExchange() {
        //String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
        return new TopicExchange("stock-event-exchange", true, false);
    }
    /**
     * 死信队列
     */
    @Bean
    public Queue stockReleaseStockQueue() {
        //String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        return new Queue("stock.release.stock.queue", true, false, false);
    }
    /**
     * 延时队列
     */
    @Bean
    public Queue stockDelay() {
        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "stock-event-exchange");
        arguments.put("x-dead-letter-routing-key", "stock.release");
        // 消息过期时间 2分钟
        arguments.put("x-message-ttl", 120000);
        return new Queue("stock.delay.queue", true, false, false,arguments);
    }
    /**
     * 绑定:交换机与死信队列
     */
    @Bean
    public Binding stockLocked() {
        //String destination, DestinationType destinationType, String exchange, String routingKey,
        // 			Map<String, Object> arguments
        return new Binding("stock.release.stock.queue",
                Binding.DestinationType.QUEUE,
                "stock-event-exchange",
                "stock.release.#",
                null);
    }
    /**
     * 绑定:交换机与延时队列
     */
    @Bean
    public Binding stockLockedBinding() {
        return new Binding("stock.delay.queue",
                Binding.DestinationType.QUEUE,
                "stock-event-exchange",
                "stock.locked",
                null);
    }
}

发表回复