Google Guava 工具类(一)—— EventBus(观察者模式的实现)

❗❗❗ 未解决的问题:

  1. AsyncEventBus 的并发执行

  EventBus 是设计模式中的观察者模式(生产者/消费者编程模型)的实现。

  在学习 EventBus 之前,先了解一下其涉及到的相关术语

EvenBus 中的相关术语

EventBus 术语 解释 备注
事件(消息) 可以向事件总线(EventBus)发布的对象 通常是一个类,不同的消息事件用不同的类来代替,消息内容就是类里面的属性
订阅 向事件总线注册监听者,以接受事件的行为 EventBus.register(Object),参数就是监听者
监听者 提供一个处理方法,希望接受和处理事件的对象 通常也是一个类,里面有消息的处理方法
处理方法 监听者提供的公共方法,事件总线使用该方法向监听者发送事件;该方法应使用 Subscribe 注解 监听者里面添加一个 Subscribe 注解的方法,就可以认为是消息的处理方法
发布消息 通过事件总线向所有匹配的监听者提供事件 EventBus.post(Object)

EvenBus 的简单使用

  添加依赖

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>31.1-jre</version>
        </dependency>

  EventBus 的使用很简单,笼统来说可分为以下几个步骤。

  1. 创建 EventBus 对象。通常全局或模块内通过单例模式只用一个 EventBus 对象
  2. 创建消息类
  3. 创建监听者类
  4. 注册监听者类。如果有多个 EventBus 对象,监听者类注册在哪个 EventBus 对象下,消息就需要发布到对应的 EventBus 中
  5. 发布消息

  EventBusDemo.java

package cn.jkingtools.demo.guava.eventbus;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
public class EventBusDemo {
    // 1. 创建消息类
    private static class Message {
        private String message;
        public Message(String message) {
            this.message = message;
        }
        public String getMessage() {
            return message;
        }
        public void setMessage(String message) {
            this.message = message;
        }
    }
    // 2. 创建监听者类,即事件处理函数,需要使用 @Subscribe 进行注解
    private static class EventListener {
        @Subscribe
        public void dealWithEvent(Message msg) {
            System.out.println("接收消息" + msg.getMessage());
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("处理消息" + msg.getMessage());
        }
    }
    public static void main(String[] args) {
        // 3. 创建 EventBus 对象
        EventBus eventBus = new EventBus("Test");
        // 4. 注册监听者类
        eventBus.register(new EventListener());
        // 5. 发布消息
        for (int i=0; i<5; i++) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("---------");
            eventBus.post(new Message("tttt" + i));
        }
    }
}

  执行输出信息:

---------
接收消息tttt0
处理消息tttt0
---------
接收消息tttt1
处理消息tttt1
---------
接收消息tttt2
处理消息tttt2
---------
接收消息tttt3
处理消息tttt3
---------
接收消息tttt4
处理消息tttt4

  在上面的代码中,添加了几个 sleep,在实际运行的时候注意输出,所有的事件消息都是顺序输出的。这是因为事件的发送方和事件消费方都在一个线程中,事件发送方只有在发送的事件处理完毕后才会继续执行自己后面的代码。这里可使用 AsyncEventBus 类实现事件的异步处理,也就是将事件处理放到一个线程池里面去执行。

AsyncEventBus(EventBus 的异步实现)

  com.google.common.eventbus.AsyncEventBus 是 EventBus 的异步实现,即将事件放到一个单独的线程池中去执行,只需要将实例化的 EventBus 对象换成 AsyncEventBus 即可,并传入一个线程池对象。

AsyncEventBus asyncEventBus = new AsyncEventBus("", Executors.newCachedThreadPool());

❗❗❗ 我的测试源码如下,但未实现事件的并行执行。

package cn.jkingtools.demo.guava.eventbus;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import java.util.concurrent.Executors;
public class AsyncEventBusDemo {
    // 1. 创建消息类
    private static class Message {
        private String message;
        public Message(String message) {
            this.message = message;
        }
        public String getMessage() {
            return message;
        }
        public void setMessage(String message) {
            this.message = message;
        }
    }
    // 2. 创建监听者类,即事件处理函数,需要使用 @Subscribe 进行注解
    private static class EventListener {
        @Subscribe
        public void dealWithEvent(Message msg) {
            System.out.println("接收消息" + msg.getMessage());
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("处理消息" + msg.getMessage());
        }
    }
    public static void main(String[] args) {
        // 3. 创建 EventBus 对象
        AsyncEventBus eventBus = new AsyncEventBus("Test", Executors.newCachedThreadPool());
        // 4. 注册监听者类
        eventBus.register(new EventListener());
        // 5. 发布消息
        for (int i=0; i<10; i++) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("---------");
            eventBus.post(new Message("tttt" + i));
        }
    }
}

输出信息如下:

---------
接收消息tttt0
---------
---------
---------
---------
处理消息tttt0
接收消息tttt4
处理消息tttt4
接收消息tttt3
处理消息tttt3
接收消息tttt2
处理消息tttt2
接收消息tttt1
处理消息tttt1

通过观察输出,AsyncEventBus 虽然并未在主线程阻塞,但事件却是被顺序执行的,并未实现并发。

发表回复