EventBus源码解析

EventBus源码解析

EventBus 是一个开源框架,可用于 java 和 Android 平台。由于用法简单、解耦发送者接收者、体积小等优势,EventBus 被广泛使用。大致用法如下:

第一步,定义事件类型:
public static class MessageEvent { / Additional fields if needed / }
第二步,定义接收这个事件的方法,该方法必须指定注解@Subscribe,threadMode 为可选参数:
@Subscribe(threadMode = ThreadMode.MAIN)
public void onMessageEvent(MessageEvent event) {/ Do something /};
同时,在订阅者注册和解绑订阅。对于 Android 平台的 Activity 和 Fragment 而言,注册、解绑与对象的生命周期保持一致。类似如下:
@Override
public void onStart() {
super.onStart();
EventBus.getDefault().register(this);
}

@Override
public void onStop() {
super.onStop();
EventBus.getDefault().unregister(this);
}
最后一步就是发送通知消息了,可以在代码里的任何地方发送消息,这种特性真是太方便了:
EventBus.getDefault().post(new MessageEvent());

EventBus 的用法如此简单,它到底是什么原理,下面就开始分析一下源码:

EventBus.getDefault() 使用了单例模式,返回了一个 EventBus 对象。register(subscriber) 方法实现如下:
Class<?> subscriberClass = subscriber.getClass();
List subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod); }
}

通过 subscriberMethodFinder 的 findSubscriberMethods 方法找到订阅类所对应的订阅方法集合,接着对每个订阅方法实施订阅操作。
接着看一下 findSubscriberMethods 方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe " +
"annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}

首先查询 METHOD_CACHE 看是否存在缓存,如果已经缓存该类对应的订阅方法则直接返回,避免当同一类型的对象多次注册时每次都要重新查询订阅方法。如果没有缓存则查询订阅方法。有两种方式,findUsingReflection 和 findUsingInfo 方法,通过 ignoreGeneratedIndex 变量决定使用哪种方式。该变量是指是否使用编译期的缓存信息。先看一下 findUsingReflection 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
...
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
...
}
}
}

findUsingReflection 方法使用 java 的反射技术获取到订阅者所有的 注解@Subscribe 方法.
findUsingInfo 方法使用 Eventbus 3.0 新引入的方法,后面再进行分析。

接着看一下 subscribe 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
...
}
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
...
}

订阅方法会把订阅信息放到全局变量集合 subscriptionsByEventType 里,subscriptionsByEventType 是每个事件类型对应的订阅信息,key 是事件类型,value 是该类型对应的订阅信息集合。后面在发送消息的会用到这个变量。typesBySubscriber 也是个全局变量,key 是订阅者,value 是订阅类型集合,后面解绑消息的时候会用到这个集合。
接着看下发信息的过程: EventBus.getDefault().post(new MessageEvent());
post(Object event)-> postSingleEvent(eventQueue.remove(0), postingState)-> postSingleEventForEventType(event, postingState, clazz),post 方法调用链条如上,看一下最重要的 postSingleEventForEventType 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
}
...
}
return true;
}
return false;
}

这里用到了上文中提到的全局变量 subscriptionsByEventType。此处根据事件类型找到该类型所对应的订阅信息集合,对集合里的每个订阅信息进行处理 postToSubscription(subscription, event, postingState.isMainThread):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}

此处理方法根据注解参数 threadMode 的值在指定的线程执行操作。通过调用 invokeSubscriber 方法来通知消息。invokeSubscriber 的实现是通过 java 的反射技术,代码如下:

1
2
3
4
5
6
7
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
...
}
}
至此整个流程大概走完了一遍。

EventBus 3.0 新增了注解处理器,此特性的优势是在编译期处理注解,缓存订阅信息,这样就不必在运行时通过反射获取订阅信息,减少了性能损耗、程序运行更快,据框架作者测试,注册时间快了 30-40%。注解处理器的具体实现是使用 java 的 apt 技术,apt 是 javac 的一个工具,在编译期可处理注解并根据注解信息生成新的 java 文件。