在我们日常开发中经常有这种类型的场景:
- 新建用户后,需要进行一些操作,如发送优惠券等(和创建用户本身无关的操作)
- 数据变更时,对应的展示表格等信息需要进行对应的更新
即当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新
观察者模式(发布-订阅)就是这种情况下的一种解决方案,使用这种方式可以让解耦发布者和订阅者,互相不需要知道对方,之前一篇文档中简单介绍过Spring中的事件使用,这次介绍一种非Spring环境下Guava提供的EventBus的使用
使用
下面简单介绍一下EventBus的使用,使用起来其实是比较简单的,代码如下
创建事件类
1 2 3 4
| public class UserCreateEvent { }
|
定义监听者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class SendSmsService { @Subscribe public void listen(UserCreateEvent event) { System.out.println(this.getClass().getSimpleName() + "-" + event); } }
public class SendCouponService { @Subscribe public void listen(UserCreateEvent event) { System.out.println(this.getClass().getSimpleName() + "-" + event); } }
|
初始化EventBus
1 2 3 4
| EventBus eventBus = new EventBus(); eventBus.register(new SendCouponService()); eventBus.register(new SendSmsService());
|
事件触发
1 2
| eventBus.post(new UserCreateEvent());
|
原理
使用其实挺简单的,下面我们可以想下它是怎么实现的
解析:首先在register注册监听者时解析对应监听类,获取@Subscribe注解方法及对应参数中的监听类信息
注册:需要维护一个注册表,记录被监听事件及对应的处理方法集合
派发执行:当调用EventBus发送事件时,可能根据同步或异步方式进行消息分发
解析注册
Guava的解析注册的类为SubscriberRegistry
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap();
void register(Object listener) { Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { Class<?> eventType = entry.getKey(); Collection<Subscriber> eventMethodsInListener = entry.getValue(); CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers == null) { CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>(); eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); } eventSubscribers.addAll(eventMethodsInListener); } }
private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) { Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create(); Class<?> clazz = listener.getClass(); for (Method method : getAnnotatedMethods(clazz)) { Class<?>[] parameterTypes = method.getParameterTypes(); Class<?> eventType = parameterTypes[0]; methodsInListener.put(eventType, Subscriber.create(bus, listener, method)); } return methodsInListener; }
|
消息派发
1 2 3 4 5 6 7 8 9 10 11
| public void post(Object event) { Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { post(new DeadEvent(this, event)); } }
|
消息派发目前提供了三种实现
- PerThreadQueuedDispatcher 每个线程一个队列派发
- LegacyAsyncDispatcher 异步派发使用
- ImmediateDispatcher 立即派发(无队列)
下面依次看下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
|
private static final class PerThreadQueuedDispatcher extends Dispatcher { private final ThreadLocal<Queue<Event>> queue = new ThreadLocal<Queue<Event>>() { @Override protected Queue<Event> initialValue() { return Queues.newArrayDeque(); } };
private final ThreadLocal<Boolean> dispatching = new ThreadLocal<Boolean>() { @Override protected Boolean initialValue() { return false; } };
@Override void dispatch(Object event, Iterator<Subscriber> subscribers) { Queue<Event> queueForThread = queue.get(); queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) { dispatching.set(true); try { Event nextEvent; while ((nextEvent = queueForThread.poll()) != null) { while (nextEvent.subscribers.hasNext()) { nextEvent.subscribers.next().dispatchEvent(nextEvent.event); } } } finally { dispatching.remove(); queue.remove(); } } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| private static final class LegacyAsyncDispatcher extends Dispatcher { private final ConcurrentLinkedQueue<Dispatcher.LegacyAsyncDispatcher.EventWithSubscriber> queue = Queues.newConcurrentLinkedQueue();
@Override void dispatch(Object event, Iterator<Subscriber> subscribers) { while (subscribers.hasNext()) { queue.add(new Dispatcher.LegacyAsyncDispatcher.EventWithSubscriber(event, subscribers.next())); }
Dispatcher.LegacyAsyncDispatcher.EventWithSubscriber e; while ((e = queue.poll()) != null) { e.subscriber.dispatchEvent(e.event); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12
| private static final class ImmediateDispatcher extends Dispatcher { private static final ImmediateDispatcher INSTANCE = new ImmediateDispatcher();
@Override void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); while (subscribers.hasNext()) { subscribers.next().dispatchEvent(event); } } }
|
最后我们看下dispatchEvent方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| final void dispatchEvent(final Object event) { executor.execute( new Runnable() { @Override public void run() { try { invokeSubscriberMethod(event); } catch (InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); } } }); }
enum DirectExecutor implements Executor { INSTANCE;
@Override public void execute(Runnable command) { command.run(); } }
|