RabbitMQ

一、MQ

1.同步调用的优缺点

同步调用的优点:

同步调用的问题:

2.异步调用

异步调用常见实现就是事件驱动模式

RabbitMQ

好处:

缺点:

3.MQ实现

MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。

几种常见MQ的对比:

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议
可用性 一般
单机吞吐量 一般 非常高
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性 一般 一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

二、Linux安装RabbitMQ

1.1.下载镜像

方式一:在线拉取

docker pull rabbitmq:3-management

方式二:从本地加载

在课前资料已经提供了镜像包:

RabbitMQ

上传到虚拟机中后,使用命令加载镜像即可:

docker load -i mq.tar

1.2.安装MQ

执行下面的命令来运行MQ容器:

docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management

注意5672是MQ消息通信的端口,15672是ui端口

三、RabbitMQ

1.种类

大致分为两种

1.**无交换机的 **基本消息队列 和 工作消息队列

RabbitMQ

2.有交换机的发布订阅,路由,主题

RabbitMQ

2.基于SpringAMQP使用

2.1基础消息队列

RabbitMQ

  1. 先使用测试类生成队列

    public class ConsumerTest {
        public static void main(String[] args) throws IOException, TimeoutException {
            // 1.建立连接
            ConnectionFactory factory = new ConnectionFactory();
            // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
            factory.setHost("116.62.32.68");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            factory.setUsername("itcast");
            factory.setPassword("123321");
            // 1.2.建立连接
            Connection connection = factory.newConnection();
            // 2.创建通道Channel
            Channel channel = connection.createChannel();
            // 3.创建队列
            String queueName = "simple.queue";
            channel.queueDeclare(queueName, false, false, false, null);
            // 4.订阅消息
            channel.basicConsume(queueName, true, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 5.处理消息
                    String message = new String(body);
                    System.out.println("接收到消息:【" + message + "】");
                }
            });
            System.out.println("等待接收消息。。。。");
        }
    }
    
  2. 引入依赖

    <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  3. 生产者编写yaml文件的MQ地址,端口号,用户名密码

    spring:
      rabbitmq:
        host: 192.168.150.101 # 主机名
        port: 5672 # 端口
        virtual-host: / # 虚拟主机
        username: itcast # 用户名
        password: 123321 # 密码
    
  4. 编写生产者代码

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class SpringAmqpTest {
        //类似于redisTemplate
        @Autowired
        private RabbitTemplate rabbitTemplate;
        @Test
        public void testSimpleQueue(){
            //队列名
            String queueName = "simple.queue";
            //消息
            String message = "hello,SpringAmqp";
            //发送消息到队列
            rabbitTemplate.convertAndSend(queueName,message);
        }
    }
    
  5. 消费者编写yaml文件的MQ地址,端口号,用户名密码

    同上

  6. 编写消费者代码

    @Component
    public class SpringRabbitListener {
        @RabbitListener(queues = "simple.queue")
        public void listenSimpleQueueMessage(String msg) throws InterruptedException {
            System.out.println("spring 消费者接收到消息:【" + msg + "】");
        }
    }
    

2.2工作消息队列

RabbitMQ

消费者:

//工作队列
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1Message(String msg) throws InterruptedException {
    System.out.println("spring 消费者1接收到消息:【" + msg + "】");
}
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue2Message(String msg) throws InterruptedException {
    System.out.println("spring 消费者2接收到消息:【" + msg + "】");
}
logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS
spring:
  rabbitmq:
    host:  # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: itcast # 用户名
    password: 123321 # 密码
    listener:
      direct:
        prefetch: 1 # 消息预期,之前把所有消息预期导致平分消息,现在是处理一个取一个

2.3广播Fanout

RabbitMQ

多了一个交换机,交换机决定将生产者的消息发送给哪个队列,这决定就是交换机种类不同规则不同,因此分为三种,广播、路由和主题。

广播:只要队列绑定了交换机,就发送消息到队列

1.编写消费者:使用注解声明队列和交换机(同时绑定消费者的队列和交换机)

//3.广播(绑定队列,同时绑定交换机)
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "fanout.queue1"),
        exchange = @Exchange(name = "fanout.exange",type = ExchangeTypes.FANOUT)
))
public void listenFanoutQueue1(String message){
    System.out.println("广播方式队列1接受消息:"+message);
}
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "fanout.queue2"),
        exchange = @Exchange(name = "fanout.exange",type = ExchangeTypes.FANOUT)
))
public void listenFanoutQueue2(String message){
    System.out.println("广播方式队列2接受消息:"+message);
}

2.生产者

//广播类型交换机
@Test
public void testFanoutExchangeQueue(){
    //交换机名称
    String exchangeName = "fanout.exange";
    //消息
    String message = "广播消息";
    //发送,第二个参数是队列的key,在路由类型中可以使用到
    rabbitTemplate.convertAndSend(exchangeName,"",message);
}

2.4路由Route

RabbitMQ

和广播相比,在队列中多了一个key属性。

交换机通过消息的key来对应路由到相同key的队列上。

1.消费者:

/**
    4.路由
 */
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue1"),
        exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),
        key = {"red","blue"}
))
public void listenDirectQueue1(String message){
    System.out.println("路由红、蓝队列接受消息:"+message);
}
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue2"),
        exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),
        key = {"red","green"}
))
public void listenDirectQueue2(String message){
    System.out.println("路由红、绿队列接受消息:"+message);
}

2.生产者

//路由类型交换机
@Test
public void testDirectExchangeQueue(){
    //交换机名称
    String exchangeName = "direct.exchange";
    //消息
    String message = "路由消息";
    //发送,第二个参数是队列的key,在路由类型中可以使用到
    rabbitTemplate.convertAndSend(exchangeName,"red",message);
}

2.5主题Topic

RabbitMQ

key不再是一组单词,而是一个规则

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

item.#:能够匹配item.spu.insert 或者 item.spu

item.*:只能匹配item.spu

1.消费者

/**
 * 主题
 */
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue1"),
        exchange = @Exchange(name = "topic.exchange",type = ExchangeTypes.TOPIC),
        key = "china.#"
))
public void listenTopicQueue1(String message){
    System.out.println("中国队列接受消息:"+message);
}
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue2"),
        exchange = @Exchange(name = "topic.exchange",type = ExchangeTypes.TOPIC),
        key = "#.news"
))
public void listenTopicQueue2(String message){
    System.out.println("新闻队列接受消息:"+message);
}

2.生产者

//主题类型交换机
@Test
public void testTopicExchangeQueue(){
    //交换机名称
    String exchangeName = "topic.exchange";
    //消息
    String message = "china.lala消息";
    //发送,第二个参数是队列的key,在路由类型中可以使用到
    rabbitTemplate.convertAndSend(exchangeName,"china.news",message);
}

3.消息转换器

Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

因此需要配置一个Json转换器

步骤

  1. 在publisher和consumer两个服务中都引入依赖:
<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>
  1. 配置消息转换器。在启动类中添加一个Bean即可:
@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
}

发表回复