EventBus

/ 技术 / 无站内评论 / 325浏览

前言

EventBus 处理的事情类似观察者模式,基于事件驱动,观察者们监听自己感兴趣的特定事件,进行相应的处理。

本文想要介绍的内容是,在 Spring 环境中优雅地使用 Guava 包中的 EventBus,对我们的代码进行一定程度的解耦。当然,本文不介绍 EventBus 的原理,我所说的优雅也只是我觉得优雅,也许读者有更漂亮的代码,欢迎在评论区留言。

EventBus 类似消息队列,一方生产,一方消费。

步骤

1.依赖

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>23.0</version>
</dependency>

作为 java 程序员,如果你还没有使用过 Google Guava,请从现在开始将它加到你的每一个项目中。

2.自定义注解标记Listener

注解传送门:java注解(四种元注解:@Retention @Target @Document @Inherited)

/**
* 自定义注解 用于注册事件总线
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface EventBusListener {
}

3.定义注册中心

/**
* 管理所有异步/同步事件总线
*/
@Component
public class EventBusCenter {

/**
* 管理同步事件
*/
private EventBus syncEventBus = new EventBus();

/**
* 管理异步事件
*/
private AsyncEventBus asyncEventBus = new AsyncEventBus(Executors.newCachedThreadPool());

/**
* 发送一个同步消息
*
* @param o
*/
public void postSync(Object o) {
syncEventBus.post(o);
}

/**
* 发送一个异步消息
*
* @param o
*/
public void postAsync(Object o) {
asyncEventBus.post(o);
}

/**
* 获取所有带有 自定义注解@EventBusListenerbean,将他们注册为监听者
*/
@PostConstruct
public void init() {
List<Object> listeners = SpringContextUtils.getBeansWithAnnotation(EventBusListener.class);
listeners.forEach(o -> {
syncEventBus.register(o);
asyncEventBus.register(o);
});
}

4.定义事件bean

用于往EventBus中push的对象

举例:我们要搜索一些航班数据。

/**
* 搜索报价实体
*/
@Data
@Builder
public class PriceEvent {

/**
* 起飞机场
*/
private String depCity;

/**
* 到达机场
*/
private String arrCity;

/**
* 起飞时间
*/
private Date depDate;
}
/**
* 搜索数据实体
*/
@Data
@Builder
public class PriceInfoEvent {

/**
* 航班号
*/
private String flightNum;

/**
* 票面价
*/
private double price;

/**
* 税费
*/
private double tax;

//...
}

5.定义事件监听器 

当EventBus中push了数据,监听器会收到消息,进行处理。

首先,类上面需要加我们之前定义的注解:@EventBusListener,然后监听方法需要加注解 @Subscribe,方法参数为具体事件。

注意:@Subscribe只能含有一个参数,不同方法可以指定不同的类型形参。

/**
* 监听事件消息
*/
@Service
@EventBusListener
public class PriceListener {

@Autowired
private EventBusCenter eventBusCenter;

/**
* 监听报价消息
*
* @param event
*/
@Subscribe
public void search(PriceEvent event) {
//进行报价搜索等操作
System.out.printf("收到的报价消息:%s", event.toString() + "\n");
//报价信息
eventBusCenter.postAsync(PriceInfoEvent.builder().flightNum("HU7233").price(1200).tax(50).build());
}

/**
* 监听查询结果报价
*
* @param event
*/
@Subscribe
public void savePriceInfo(PriceInfoEvent event) {
//进行报价入库等操作
System.out.printf("得到的报价数据:%s", event.toString() + "\n");
}
}


6.发送事件

这里使用了接口来模拟

/**
* 测试发布消息
*/
@RestController
@RequestMapping("/event")
public class EventBusControler {

@Autowired
private EventBusCenter eventBusCenter;

/**
* 发布同步消息
*
* @return
*/
@GetMapping("/postSync")
public String postSync(@RequestParam String depCity, @RequestParam String arrCity,@RequestParam(name = "count",defaultValue = "1",required = false) int count) {
while (count>0){
eventBusCenter.postSync(PriceEvent.builder().depCity(depCity).arrCity(arrCity).depDate(new Date()).build());
count--;
}
return "SUCCESS";
}

/**
* 发布异步消息
*
* @return
*/
@GetMapping("/postAsync")
public String postAsync(@RequestParam String depCity, @RequestParam String arrCity,@RequestParam(name = "count",defaultValue = "1",required = false) int count) {
while (count>0){
eventBusCenter.postAsync(PriceEvent.builder().depCity(depCity).arrCity(arrCity).depDate(new Date()).build());
count--;
}
return "SUCCESS";
}


}


7.测试

http://127.0.0.1:8080/event/postSync?depCity=WUH&arrCity=HAN&count=100

模拟往EventBus中push 100条消息


总结

EventBus 的好处在于,它将发生事件的代码和事件处理的代码进行了解耦。

比如系统中很多地方都会修改订单,用户可以自己修改、客服也可以修改、甚至可能是团购没成团系统进行的订单修改,所有这些触发订单修改的地方都要发短信、发邮件,假设以后还要增加其他操作,那么需要修改的地方就比较多。

而如果采用事件驱动的话,只要这些地方抛出事件就可以了,后续的维护是比较简单的。

而且,EventBus 支持同步事件和异步事件,可以满足我们不同场景下的需求。比如发短信,系统完全没必要等在那边,完全是可以异步做的。

参考资料

单机环境下优雅地使用事件驱动进行代码解耦

附录:SpringContextUtils

上面的代码使用到了 SpringContextUtils,我想大部分的 Spring 应用都会写这么一个工具类来从 Spring 容器中获取 Bean,用于一些不方便采用注入的地方。

@Component
public class SpringContextUtils implements BeanFactoryPostProcessor {

private static ConfigurableListableBeanFactory beanFactory;

@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
SpringContextUtils.beanFactory = configurableListableBeanFactory;
}

public static <T> T getBean(String name) throws BeansException {
return (T) beanFactory.getBean(name);
}

public static <T> T getBean(Class<T> clz) throws BeansException {
T result = beanFactory.getBean(clz);
return result;
}

public static <T> List<T> getBeansOfType(Class<T> type) {
return beanFactory.getBeansOfType(type).entrySet().stream().map(entry->entry.getValue()).collect(Collectors.toList());
}

// 上面的例子用到了这个
public static List<Object> getBeansWithAnnotation(Class<? extends Annotation> annotationType) {
Map<String, Object> beansWithAnnotation = beanFactory.getBeansWithAnnotation(annotationType);

// java 8 的写法,将 map value 收集起来到一个 list
return beansWithAnnotation.entrySet().stream().map(entry->entry.getValue()).collect(Collectors.toList());

// java 7
/* List<Object> result = new ArrayList<>();
for (Map.Entry<String, Object> entry : beansWithAnnotation.entrySet()) {
result.add(entry.getValue());
}
return result;*/
}
}

召唤蕾姆
琼ICP备18000156号

鄂公网安备 42011502000211号