前言
EventBus 处理的事情类似观察者模式,基于事件驱动,观察者们监听自己感兴趣的特定事件,进行相应的处理。
本文想要介绍的内容是,在 Spring 环境中优雅地使用 Guava 包中的 EventBus,对我们的代码进行一定程度的解耦。当然,本文不介绍 EventBus 的原理,我所说的优雅也只是我觉得优雅,也许读者有更漂亮的代码,欢迎在评论区留言。
EventBus 类似消息队列,一方生产,一方消费。
步骤
1.依赖
<dependency> <groupId>com.google.guavagroupId> <artifactId>guavaartifactId> <version>23.0version> dependency>
作为 java 程序员,如果你还没有使用过 Google Guava,请从现在开始将它加到你的每一个项目中。
2.自定义注解标记Listener
注解传送门:java注解(四种元注解:@Retention @Target @Document @Inherited)
/** * 自定义注解 用于注册事件总线 */ @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface EventBusListener { }
3.定义注册中心
/** * 管理所有异步/同步事件总线 */ @Component public class EventBusCenter { /** * 管理同步事件 */ private EventBus syncEventBus = new EventBus(); /** * 管理异步事件 */ private AsyncEventBus asyncEventBus = new AsyncEventBus(Executors.newCachedThreadPool()); /** * 发送一个同步消息 * * @param o */ public void postSync(Object o) { syncEventBus.post(o); } /** * 发送一个异步消息 * * @param o */ public void postAsync(Object o) { asyncEventBus.post(o); } /** * 获取所有带有 自定义注解@EventBusListener的bean,将他们注册为监听者 */ @PostConstruct public void init() { List<Object> listeners = SpringContextUtils.getBeansWithAnnotation(EventBusListener.class); listeners.forEach(o -> { syncEventBus.register(o); asyncEventBus.register(o); }); }
4.定义事件bean
用于往EventBus中push的对象
举例:我们要搜索一些航班数据。
/** * 搜索报价实体 */ @Data @Builder public class PriceEvent { /** * 起飞机场 */ private String depCity; /** * 到达机场 */ private String arrCity; /** * 起飞时间 */ private Date depDate; }
/** * 搜索数据实体 */ @Data @Builder public class PriceInfoEvent { /** * 航班号 */ private String flightNum; /** * 票面价 */ private double price; /** * 税费 */ private double tax; //... }
5.定义事件监听器
当EventBus中push了数据,监听器会收到消息,进行处理。
首先,类上面需要加我们之前定义的注解:@EventBusListener,然后监听方法需要加注解 @Subscribe,方法参数为具体事件。
注意:@Subscribe只能含有一个参数,不同方法可以指定不同的类型形参。
/** * 监听事件消息 */ @Service @EventBusListener public class PriceListener { @Autowired private EventBusCenter eventBusCenter; /** * 监听报价消息 * * @param event */ @Subscribe public void search(PriceEvent event) { //进行报价搜索等操作 System.out.printf("收到的报价消息:%s", event.toString() + "\n"); //报价信息 eventBusCenter.postAsync(PriceInfoEvent.builder().flightNum("HU7233").price(1200).tax(50).build()); } /** * 监听查询结果报价 * * @param event */ @Subscribe public void savePriceInfo(PriceInfoEvent event) { //进行报价入库等操作 System.out.printf("得到的报价数据:%s", event.toString() + "\n"); } }
6.发送事件
这里使用了接口来模拟
/** * 测试发布消息 */ @RestController @RequestMapping("/event") public class EventBusControler { @Autowired private EventBusCenter eventBusCenter; /** * 发布同步消息 * * @return */ @GetMapping("/postSync") public String postSync(@RequestParam String depCity, @RequestParam String arrCity,@RequestParam(name = "count",defaultValue = "1",required = false) int count) { while (count>0){ eventBusCenter.postSync(PriceEvent.builder().depCity(depCity).arrCity(arrCity).depDate(new Date()).build()); count--; } return "SUCCESS"; } /** * 发布异步消息 * * @return */ @GetMapping("/postAsync") public String postAsync(@RequestParam String depCity, @RequestParam String arrCity,@RequestParam(name = "count",defaultValue = "1",required = false) int count) { while (count>0){ eventBusCenter.postAsync(PriceEvent.builder().depCity(depCity).arrCity(arrCity).depDate(new Date()).build()); count--; } return "SUCCESS"; } }
7.测试
http://127.0.0.1:8080/event/postSync?depCity=WUH&arrCity=HAN&count=100
模拟往EventBus中push 100条消息
总结
EventBus 的好处在于,它将发生事件的代码和事件处理的代码进行了解耦。
比如系统中很多地方都会修改订单,用户可以自己修改、客服也可以修改、甚至可能是团购没成团系统进行的订单修改,所有这些触发订单修改的地方都要发短信、发邮件,假设以后还要增加其他操作,那么需要修改的地方就比较多。
而如果采用事件驱动的话,只要这些地方抛出事件就可以了,后续的维护是比较简单的。
而且,EventBus 支持同步事件和异步事件,可以满足我们不同场景下的需求。比如发短信,系统完全没必要等在那边,完全是可以异步做的。
参考资料
附录:SpringContextUtils
上面的代码使用到了 SpringContextUtils,我想大部分的 Spring 应用都会写这么一个工具类来从 Spring 容器中获取 Bean,用于一些不方便采用注入的地方。
@Component public class SpringContextUtils implements BeanFactoryPostProcessor { private static ConfigurableListableBeanFactory beanFactory; @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException { SpringContextUtils.beanFactory = configurableListableBeanFactory; } public staticT getBean(String name) throws BeansException { return (T) beanFactory.getBean(name); } public staticT getBean(Classclz) throws BeansException { T result = beanFactory.getBean(clz); return result; } public staticList getBeansOfType(Class type) { return beanFactory.getBeansOfType(type).entrySet().stream().map(entry->entry.getValue()).collect(Collectors.toList()); } // 上面的例子用到了这个 public static List<Object> getBeansWithAnnotation(Class<? extends Annotation> annotationType) { Map<String, Object> beansWithAnnotation = beanFactory.getBeansWithAnnotation(annotationType); // java 8 的写法,将 map 的 value 收集起来到一个 list 中 return beansWithAnnotation.entrySet().stream().map(entry->entry.getValue()).collect(Collectors.toList()); // java 7 /* List for (Map.Entryentry : beansWithAnnotation.entrySet()) { result.add(entry.getValue()); } return result;*/ } }
本文由 SAn 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为:
2020/05/14 21:13