Event Bus初探
Event Bus的直译称为“事件总线”,提到它就会涉及到事件驱动模型,实际上Event Bus是Vert.x的核心1:
The event bus is the nervous system of Vert.x.
通常我们提到Vert.x时都会说它是事件驱动、无阻塞、纯异步化的,那么这里的事件驱动的中枢神经就是本文中的Event Bus,关于Event Bus的内容,读者可以先看看Reactor模式——不了解Reactor模式在未来很长一段时间设计Vert.x的系统都会遇到思路上的障碍,所以我还是推荐读者在理解Event Bus之前先仔细看看什么是Reactor模式,参考后记《Reactor模式2》。
- 在传统服务器中(如Tomcat/Jetty),服务器接收到一个客户端请求,这个请求将直接进入某个Servlet组件中,该线程不仅会接收此请求,同样会去处理这个请求,一旦此处发生阻塞操作(访问数据库、网络、文件系统),整个线程就必须等待执行结果,最终生成响应发送回客户端。
- 在Vert.x中(Netty服务器),服务器接收到一个客户端请求后,这个请求会直接由Standard类型的Verticle组件来接收,当然对于简单的请求,这个Verticle组件直接执行完成就可以生成响应了,这种模式等价于上述传统服务器执行请求的模式。但若遇到了复杂的请求如阻塞操作,该Verticle组件可以将请求封装成事件发送到Event Bus,由它另一头的Worker类型的Verticle组件消费该事件、执行该事件、生成响应发送回客户端。
1.关于阻塞
上述流程中,第一种模式下的阻塞会成为一个常态,而第二种请求中也会有一部分任务是阻塞式的,好了,问题就来了:读者在前边的章节已经看到,既然Vert.x的官方提到的黄金法则,是不要阻塞Event Loop,那么遇到阻塞任务时,怎么解决这种需求?您若想要写一个程序不访问网络、数据库、文件系统几乎是不可能的事,一旦出现这种IO装置的操作,就意味着阻塞有可能开始暴走了——比如您读取了一个4G的大文件。
Vert.x中提供了两种解决阻塞任务的方案,我称为”快速方案“和”标准方案“。
1.1.快速方案
快速方案的核心就是直接调用executeBlocking
方法3,前文中我们已经分析过这个方法的源代码了,再次提到这个方法是因为它提供了处理阻塞任务的绿色通道,这种方式很快速,只需要写上下边几行代码就足够了。
vertx.executeBlocking(future -> {
// Call some blocking API that takes a significant amount of time to return
String result = someAPI.blockingMethod("hello");
future.complete(result);
}, res -> {
System.out.println("The result is: " + res.result());
});
关于这个方法的阐述,读者可以直接参考官方文档,这里主要谈谈这部分内容的经验分享。zero的前身是我的硕士论文的主体部分,最初的系统名称叫做vie
,它是一个纯数据驱动的RESTful API引擎,它的基本代码如下(有vertx-web
部分剧透):
@Override
public void start() {
Fn.safeVie(LOGGER, () -> {
/** 0.Http Server **/
final ConcurrentMap<Integer, HttpServerOptions> configMap = INTAKER.ingest();
final HttpServerOptions options = configMap.get(Integer.parseInt(VertxGrid.getPortEndpoint()));
final HttpServer server = this.vertx.createHttpServer(options);
/** 1.读取Router引用,处理Block Thread的问题 **/
this.vertx.<Router>executeBlocking(future -> {
/** 2.初始化Router,使用Kinetic初始化维持唯一Router **/
final Router router = Router.router(this.vertx);
/** 3.初始化标准Router,注入方式操作 **/
final RouterHuber routerHuber =
Fn.pool(ROUTERS, RouterHuber.class,
() -> Instance.instance(RouterHubor.class));
routerHuber.immitRouter(router);
/** 4.添加引用池 **/
// this.kinetic.addReference(Thread.currentThread().getName(), router);
/** 5.完成初始化 **/
future.complete(router);
}, result -> {
if (result.succeeded()) {
final Router router = result.result();
/** 6.Api Default 配置 **/
RouterKit.injectRouter(router);
/** 7.Server监听 **/
server.requestHandler(router::accept).listen();
// 必须使用该变量控制日志仅写一次,而且RMI的内容也只写一次
if (Constants.ONE == FLAG.getAndIncrement()) {
/** 9.成功写入过后输出最终日志 **/
final String pubApi = INCEPTOR.getString(Point.Web.Api.PUBLIC);
LOGGER.info(Info.VX_API, getClass().getSimpleName(), options.getHost(),
String.valueOf(options.getPort()), pubApi);
LOGGER.info(Info.VX_SERVER, getClass().getSimpleName());
}
}
});
});
}
上述代码是路由管理器RouterAgent中的核心代码,其中executeBlocking
方法体的内容如下:
/** 2.初始化Router,使用Kinetic初始化维持唯一Router **/
final Router router = Router.router(this.vertx);
/** 3.初始化标准Router,注入方式操作 **/
final RouterHuber routerHuber =
Fn.pool(ROUTERS, RouterHuber.class, () -> Instance.instance(RouterHubor.class));
routerHuber.immitRouter(router);
这里的immitRouter
方法会访问ZooKeeper配置中心去初始化该系统中的Router
实例,并且完成RESTful API的注入工作,这个过程中ZooKeeper配置中心的访问工作是”阻塞“方式。最开始这段代码没有写在executeBlocking
的代码块中,所以在很长一段时间,启动该Verticle组件时遇到了Block
的异常,后来修改成上述代码后,整个过程就正常了。——这也是我第一次赤裸裸地挑战了Vert.x框架中的黄金法则:还是那句话,不要阻塞Event Loop。
1.2.标准方案
有了上述的快速方案,为什么还需要标准方案呢?原因在下边这句话:
Worker verticles are designed for calling blocking code, as they won’t block any event loops.
是的,标准方案就是启用Event Bus,然后开启Vert.x中的Worker类型的Verticle组件,这样的方式就实现了完整的事件驱动模型的系统设计原型——后来几乎所有RESTful类型的API都是使用这种方式来设计的,如果读者习惯后会觉得这种方式的可扩展性非常好,那么这里用另外一个例子来看看这种方案的具体代码应该如何写。
Acceptor——AcceptorVerticle
package io.vertx.up._01.verticles;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.http.HttpServer;
import io.vertx.core.json.JsonObject;
public class AcceptorVerticle extends AbstractVerticle {
@Override
public void start() {
final HttpServer server = this.vertx.createHttpServer();
System.out.println(Thread.currentThread().getName() + ", Start Acceptor...");
server.requestHandler(request -> {
// 调用Event Bus
final EventBus event = this.vertx.eventBus();
System.out.println(Thread.currentThread().getName() + ", Accept Request...");
// 发送消息
event.<JsonObject>send("MSG://EVENT/BUS",
new JsonObject().put("message", "Event Communication"),
reply -> {
if (reply.succeeded()) {
// 发送回客户端
System.out.println(Thread.currentThread().getName() + ", Reply Message...");
System.out.println(" Message: " + reply.result().body());
request.response().end(reply.result().body().encode());
}
});
});
server.listen(8099);
}
}
Worker——WorkerVerticle
package io.vertx.up._01.verticles;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.json.JsonObject;
public class WorkerVerticle extends AbstractVerticle {
@Override
public void start() {
System.out.println(Thread.currentThread().getName() + ", Start Worker...");
final EventBus event = this.vertx.eventBus();
// 接收消息
event.<JsonObject>consumer("MSG://EVENT/BUS", reply -> {
System.out.println(Thread.currentThread().getName() + ", Consume Message...");
// 提取接收消息
final JsonObject message = reply.body();
System.out.println(" Message: " + message.encode());
// 回复消息
reply.reply(new JsonObject().put("worker", "Worker Message"));
});
}
}
Launcher
package io.vertx.up._01.event;
import io.vertx.core.DeploymentOptions;
import io.vertx.up._01.lanucher.ClusterLauncher;
import io.vertx.up._01.lanucher.Launcher;
import io.vertx.up._01.lanucher.SingleLauncher;
public class EventLauncher {
public static void main(final String[] args) {
// 哪种模式?
final boolean isClustered = false;
final Launcher launcher = isClustered ? new ClusterLauncher() :
new SingleLauncher();
System.out.println(Thread.currentThread().getName() + ","
+ Thread.currentThread().getId());
launcher.start(vertx -> {
// 发布Standard
vertx.deployVerticle("io.vertx.up._01.verticles.AcceptorVerticle",
new DeploymentOptions().setInstances(4));
// 发布Worker
vertx.deployVerticle("io.vertx.up._01.verticles.WorkerVerticle",
new DeploymentOptions().setWorker(true).setInstances(16));
});
}
}
上边的三段代码,基本上把标准方案的数据流程解释清楚了。
2.再谈生命周期
运行第一个章节中的代码,在Launcher启动过程中您将会看到如下的输出:
main,1
vert.x-worker-thread-0, Start Worker...
vert.x-worker-thread-1, Start Worker...
vert.x-worker-thread-2, Start Worker...
vert.x-worker-thread-3, Start Worker...
vert.x-worker-thread-4, Start Worker...
vert.x-worker-thread-5, Start Worker...
vert.x-worker-thread-6, Start Worker...
vert.x-worker-thread-7, Start Worker...
vert.x-worker-thread-8, Start Worker...
vert.x-worker-thread-9, Start Worker...
vert.x-worker-thread-10, Start Worker...
vert.x-worker-thread-11, Start Worker...
vert.x-worker-thread-12, Start Worker...
vert.x-worker-thread-13, Start Worker...
vert.x-worker-thread-14, Start Worker...
vert.x-worker-thread-15, Start Worker...
vert.x-eventloop-thread-3, Start Acceptor...
vert.x-eventloop-thread-2, Start Acceptor...
vert.x-eventloop-thread-0, Start Acceptor...
vert.x-eventloop-thread-1, Start Acceptor...
在解析这个过程之前,我们先等等,您可以在浏览器中打开链接地址:http://localhost:8099/,您将会看到下边的输出:
vert.x-eventloop-thread-2, Accept Request...
vert.x-worker-thread-16, Consume Message...
Message: {"message":"Event Communication"}
vert.x-eventloop-thread-2, Reply Message...
Message: {"worker":"Worker Message"}
vert.x-eventloop-thread-2, Accept Request...
vert.x-worker-thread-17, Consume Message...
Message: {"message":"Event Communication"}
vert.x-eventloop-thread-2, Reply Message...
Message: {"worker":"Worker Message"}
不要奇怪,您的确看到了两次请求的输出,这不是Vert.x的问题,而是Chrome浏览器导致的结果,由于我们在这个地方只监听了端口,没有设置路径,所以Chrome在请求过程中它会发送两次请求,分别是: http://localhost:8099/ http://localhost:8099/favicon.ico 所以您看到上边的输出一点都不奇怪,如果在后续的
vertx-web
中加入了路由匹配,那么就只有一次请求的输出了。
我不止一次在以前的文章中提到过Verticle在start
方法中有两个细粒度的生命周期:启动周期和请求周期——很多初学者由于对函数式编程不是很熟悉,不太理解lambda部分的代码究竟在什么时候执行,我想这个例子很容易说明一切。第一段输出就属于启动周期的代码部分,而第二段输出就属于请求周期部分,当然后边我们还会谈到这一点,只是刚好有这样一个例子来说明,那么也希望读者尽可能去理解这两个生命周期。
3.开发关注点
同样的,在上边的例子中,您可以看到Event Bus如何在Acceptor和Worker之间通信。这里简单谈几点:
- 发送方调用的方法为
send
,消费方调用的方法为consumer
,由于请求是从Acceptor到Worker,所以这时候Acceptor是发送方,Worker是消费方。 - Worker在执行完成过后,一定要调用
reply
方法,否则这个过程会导致Timeout
的异常(因为发送者不会接收到对应的响应信息)。 - 二者通信的地址需要保持一致:上边例子中都是
MSG://EVENT/BUS
,这个概念对应到官方文档理论部分中的Addressing
的概念,而在Vert.x中这个地址就是一个单纯的字符串——这也造成了在使用Event Bus过程中的架构设计的难度。 - 其次有一点就是关于数据类型,也就是官方提到的
Message Types
的内容,Vert.x中原生支持JsonObject
,Buffer
,以及String
这种基础类型,它并不支持复杂的数据结构,如果您有自己的数据类型要在Event Bus
中传输,怎么办?这种情况就需要您开发Codec
消息编解码器来实现。
这里看看上边代码的执行流程图:
结合上图,是不是就更容易理解前文中的代码交互信息了?在上图中,从输出可以知道:Acceptor使用的线程名为:vert.x-eventloop-thread-2
,而Worker在两次请求中使用了不同的线程:vert.x-worker-thread-11
和vert.x-worker-thread-15
,也就是说,在Vert.x中,它的Event Loop判断同一请求是基于会话的,这时候由于会话没有改变,所以这里的Standard的Verticle组件接收请求的是同一个线程,Worker则不然,两次使用的Worker线程不是同一个,这一点也印证了Reactor
模式中涉及到的非1:1
的线程对等问题(这个例子中发布了4个Acceptor,却发布了16个工作线程Worker)。
在zero中可能有一个读者容易混淆的地方,就是关于命名,通常一个Standard的Verticle组件用来接收请求时,我在定义时会给一个更符合它意义的名字,比如Router Agent,而后端的Worker类型的Verticle组件通常称为Worker。但是在zero中出现了另外的两个名字:Sender和Consumer,实际上这两个组件就是Agent和Worker中真正执行代码的Handler处理器线程的名称,如果读者熟悉了基于Event Bus的请求架构过后,理解这几个概念就不难了。它们之间可能存在如下关系:
- 一类(请原谅我用一类)Agent组件中可能不止一种Sender,因为上边发送消息给Event Bus的代码完全有可能是几次而不是一次,也可能发送的时候将不同的消息发送给不同的地址来处理;
- 一类Worker组件中也可能不止一种Consumer,道理同上。
4.总结
好吧,天下无不散之筵席,这个时候,EventBus的初探过程算是告一段落,至少读者应该从上边的例子知道在一个纯的Vert.x中(不包含vertx-web
)应该如何调用Event Bus,后边的章节我们会谈到应用级别,那个时候我们再回过头来讨论Event Bus的使用场景以及真正起到的异步事件的作用。同样这里只是针对Web应用中最常见的Request-Response的模式提供了相关代码,还没涉及到Point-To-Point和Publish-Subscribe这两种模式,毕竟不同的模式它的场景会有所区别。
1. https://vertx.io/docs/vertx-core/java/#event_bus ↩
2. 「Reactor模式」https://www.origin-x.cn/zero-up/5-vertx-land/zbr-006-reactormo-shi.html ↩
3. https://vertx.io/docs/vertx-core/java/#blocking_code ↩