Event Bus初探

  Event Bus的直译称为“事件总线”,提到它就会涉及到事件驱动模型,实际上Event Bus是Vert.x的核心1

The event bus is the nervous system of Vert.x.

  通常我们提到Vert.x时都会说它是事件驱动、无阻塞、纯异步化的,那么这里的事件驱动的中枢神经就是本文中的Event Bus,关于Event Bus的内容,读者可以先看看Reactor模式——不了解Reactor模式在未来很长一段时间设计Vert.x的系统都会遇到思路上的障碍,所以我还是推荐读者在理解Event Bus之前先仔细看看什么是Reactor模式,参考后记《Reactor模式2》。

  • 在传统服务器中(如Tomcat/Jetty),服务器接收到一个客户端请求,这个请求将直接进入某个Servlet组件中,该线程不仅会接收此请求,同样会去处理这个请求,一旦此处发生阻塞操作(访问数据库、网络、文件系统),整个线程就必须等待执行结果,最终生成响应发送回客户端。
  • 在Vert.x中(Netty服务器),服务器接收到一个客户端请求后,这个请求会直接由Standard类型的Verticle组件来接收,当然对于简单的请求,这个Verticle组件直接执行完成就可以生成响应了,这种模式等价于上述传统服务器执行请求的模式。但若遇到了复杂的请求如阻塞操作,该Verticle组件可以将请求封装成事件发送到Event Bus,由它另一头的Worker类型的Verticle组件消费该事件、执行该事件、生成响应发送回客户端。

1.关于阻塞

  上述流程中,第一种模式下的阻塞会成为一个常态,而第二种请求中也会有一部分任务是阻塞式的,好了,问题就来了:读者在前边的章节已经看到,既然Vert.x的官方提到的黄金法则,是不要阻塞Event Loop,那么遇到阻塞任务时,怎么解决这种需求?您若想要写一个程序不访问网络、数据库、文件系统几乎是不可能的事,一旦出现这种IO装置的操作,就意味着阻塞有可能开始暴走了——比如您读取了一个4G的大文件。

  Vert.x中提供了两种解决阻塞任务的方案,我称为”快速方案“和”标准方案“。

1.1.快速方案

  快速方案的核心就是直接调用executeBlocking方法3,前文中我们已经分析过这个方法的源代码了,再次提到这个方法是因为它提供了处理阻塞任务的绿色通道,这种方式很快速,只需要写上下边几行代码就足够了。

vertx.executeBlocking(future -> {
    // Call some blocking API that takes a significant amount of time to return
    String result = someAPI.blockingMethod("hello");
    future.complete(result);
}, res -> {
    System.out.println("The result is: " + res.result());
});

  关于这个方法的阐述,读者可以直接参考官方文档,这里主要谈谈这部分内容的经验分享。zero的前身是我的硕士论文的主体部分,最初的系统名称叫做vie,它是一个纯数据驱动的RESTful API引擎,它的基本代码如下(有vertx-web部分剧透):

    @Override
    public void start() {
        Fn.safeVie(LOGGER, () -> {
            /** 0.Http Server **/
            final ConcurrentMap<Integer, HttpServerOptions> configMap = INTAKER.ingest();
            final HttpServerOptions options = configMap.get(Integer.parseInt(VertxGrid.getPortEndpoint()));
            final HttpServer server = this.vertx.createHttpServer(options);
            /** 1.读取Router引用,处理Block Thread的问题 **/
            this.vertx.<Router>executeBlocking(future -> {
                /** 2.初始化Router,使用Kinetic初始化维持唯一Router **/
                final Router router = Router.router(this.vertx);
                /** 3.初始化标准Router,注入方式操作 **/
                final RouterHuber routerHuber =
                        Fn.pool(ROUTERS, RouterHuber.class,
                                () -> Instance.instance(RouterHubor.class));
                routerHuber.immitRouter(router);
                /** 4.添加引用池 **/
                // this.kinetic.addReference(Thread.currentThread().getName(), router);
                /** 5.完成初始化 **/
                future.complete(router);
            }, result -> {
                if (result.succeeded()) {
                    final Router router = result.result();
                    /** 6.Api Default 配置 **/
                    RouterKit.injectRouter(router);
                    /** 7.Server监听 **/
                    server.requestHandler(router::accept).listen();
                    // 必须使用该变量控制日志仅写一次,而且RMI的内容也只写一次
                    if (Constants.ONE == FLAG.getAndIncrement()) {
                        /** 9.成功写入过后输出最终日志 **/
                        final String pubApi = INCEPTOR.getString(Point.Web.Api.PUBLIC);
                        LOGGER.info(Info.VX_API, getClass().getSimpleName(), options.getHost(),
                                String.valueOf(options.getPort()), pubApi);
                        LOGGER.info(Info.VX_SERVER, getClass().getSimpleName());
                    }
                }
            });
        });
    }

  上述代码是路由管理器RouterAgent中的核心代码,其中executeBlocking方法体的内容如下:

/** 2.初始化Router,使用Kinetic初始化维持唯一Router **/
final Router router = Router.router(this.vertx);
/** 3.初始化标准Router,注入方式操作 **/
final RouterHuber routerHuber =
    Fn.pool(ROUTERS, RouterHuber.class, () -> Instance.instance(RouterHubor.class));
routerHuber.immitRouter(router);

  这里的immitRouter方法会访问ZooKeeper配置中心去初始化该系统中的Router实例,并且完成RESTful API的注入工作,这个过程中ZooKeeper配置中心的访问工作是”阻塞“方式。最开始这段代码没有写在executeBlocking的代码块中,所以在很长一段时间,启动该Verticle组件时遇到了Block的异常,后来修改成上述代码后,整个过程就正常了。——这也是我第一次赤裸裸地挑战了Vert.x框架中的黄金法则:还是那句话,不要阻塞Event Loop。

1.2.标准方案

  有了上述的快速方案,为什么还需要标准方案呢?原因在下边这句话:

Worker verticles are designed for calling blocking code, as they won’t block any event loops.

  是的,标准方案就是启用Event Bus,然后开启Vert.x中的Worker类型的Verticle组件,这样的方式就实现了完整的事件驱动模型的系统设计原型——后来几乎所有RESTful类型的API都是使用这种方式来设计的,如果读者习惯后会觉得这种方式的可扩展性非常好,那么这里用另外一个例子来看看这种方案的具体代码应该如何写。

Acceptor——AcceptorVerticle

package io.vertx.up._01.verticles;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.http.HttpServer;
import io.vertx.core.json.JsonObject;

public class AcceptorVerticle extends AbstractVerticle {

    @Override
    public void start() {

        final HttpServer server = this.vertx.createHttpServer();
        System.out.println(Thread.currentThread().getName() + ", Start Acceptor...");
        server.requestHandler(request -> {
            // 调用Event Bus
            final EventBus event = this.vertx.eventBus();
            System.out.println(Thread.currentThread().getName() + ", Accept Request...");
            // 发送消息
            event.<JsonObject>send("MSG://EVENT/BUS",
                    new JsonObject().put("message", "Event Communication"),
                    reply -> {
                        if (reply.succeeded()) {
                            // 发送回客户端
                            System.out.println(Thread.currentThread().getName() + ", Reply Message...");
                            System.out.println(" Message: " + reply.result().body());
                            request.response().end(reply.result().body().encode());
                        }
                    });
        });

        server.listen(8099);
    }
}

Worker——WorkerVerticle

package io.vertx.up._01.verticles;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.json.JsonObject;

public class WorkerVerticle extends AbstractVerticle {

    @Override
    public void start() {
        System.out.println(Thread.currentThread().getName() + ", Start Worker...");
        final EventBus event = this.vertx.eventBus();
        // 接收消息
        event.<JsonObject>consumer("MSG://EVENT/BUS", reply -> {
            System.out.println(Thread.currentThread().getName() + ", Consume Message...");
            // 提取接收消息
            final JsonObject message = reply.body();
            System.out.println(" Message: " + message.encode());
            // 回复消息
            reply.reply(new JsonObject().put("worker", "Worker Message"));
        });
    }
}

Launcher

package io.vertx.up._01.event;

import io.vertx.core.DeploymentOptions;
import io.vertx.up._01.lanucher.ClusterLauncher;
import io.vertx.up._01.lanucher.Launcher;
import io.vertx.up._01.lanucher.SingleLauncher;

public class EventLauncher {
    public static void main(final String[] args) {
        // 哪种模式?
        final boolean isClustered = false;
        final Launcher launcher = isClustered ? new ClusterLauncher() :
                new SingleLauncher();
        System.out.println(Thread.currentThread().getName() + ","
                + Thread.currentThread().getId());
        launcher.start(vertx -> {
            // 发布Standard
            vertx.deployVerticle("io.vertx.up._01.verticles.AcceptorVerticle",
                    new DeploymentOptions().setInstances(4));
            // 发布Worker
            vertx.deployVerticle("io.vertx.up._01.verticles.WorkerVerticle",
                    new DeploymentOptions().setWorker(true).setInstances(16));
        });
    }
}

  上边的三段代码,基本上把标准方案的数据流程解释清楚了。

2.再谈生命周期

  运行第一个章节中的代码,在Launcher启动过程中您将会看到如下的输出:

main,1
vert.x-worker-thread-0, Start Worker...
vert.x-worker-thread-1, Start Worker...
vert.x-worker-thread-2, Start Worker...
vert.x-worker-thread-3, Start Worker...
vert.x-worker-thread-4, Start Worker...
vert.x-worker-thread-5, Start Worker...
vert.x-worker-thread-6, Start Worker...
vert.x-worker-thread-7, Start Worker...
vert.x-worker-thread-8, Start Worker...
vert.x-worker-thread-9, Start Worker...
vert.x-worker-thread-10, Start Worker...
vert.x-worker-thread-11, Start Worker...
vert.x-worker-thread-12, Start Worker...
vert.x-worker-thread-13, Start Worker...
vert.x-worker-thread-14, Start Worker...
vert.x-worker-thread-15, Start Worker...
vert.x-eventloop-thread-3, Start Acceptor...
vert.x-eventloop-thread-2, Start Acceptor...
vert.x-eventloop-thread-0, Start Acceptor...
vert.x-eventloop-thread-1, Start Acceptor...

  在解析这个过程之前,我们先等等,您可以在浏览器中打开链接地址:http://localhost:8099/,您将会看到下边的输出:

vert.x-eventloop-thread-2, Accept Request...
vert.x-worker-thread-16, Consume Message...
 Message: {"message":"Event Communication"}
vert.x-eventloop-thread-2, Reply Message...
 Message: {"worker":"Worker Message"}

vert.x-eventloop-thread-2, Accept Request...
vert.x-worker-thread-17, Consume Message...
 Message: {"message":"Event Communication"}
vert.x-eventloop-thread-2, Reply Message...
 Message: {"worker":"Worker Message"}

不要奇怪,您的确看到了两次请求的输出,这不是Vert.x的问题,而是Chrome浏览器导致的结果,由于我们在这个地方只监听了端口,没有设置路径,所以Chrome在请求过程中它会发送两次请求,分别是: http://localhost:8099/ http://localhost:8099/favicon.ico 所以您看到上边的输出一点都不奇怪,如果在后续的vertx-web中加入了路由匹配,那么就只有一次请求的输出了。

  我不止一次在以前的文章中提到过Verticle在start方法中有两个细粒度的生命周期:启动周期请求周期——很多初学者由于对函数式编程不是很熟悉,不太理解lambda部分的代码究竟在什么时候执行,我想这个例子很容易说明一切。第一段输出就属于启动周期的代码部分,而第二段输出就属于请求周期部分,当然后边我们还会谈到这一点,只是刚好有这样一个例子来说明,那么也希望读者尽可能去理解这两个生命周期。

3.开发关注点

  同样的,在上边的例子中,您可以看到Event Bus如何在Acceptor和Worker之间通信。这里简单谈几点:

  • 发送方调用的方法为send,消费方调用的方法为consumer,由于请求是从Acceptor到Worker,所以这时候Acceptor是发送方,Worker是消费方。
  • Worker在执行完成过后,一定要调用reply方法,否则这个过程会导致Timeout的异常(因为发送者不会接收到对应的响应信息)。
  • 二者通信的地址需要保持一致:上边例子中都是MSG://EVENT/BUS,这个概念对应到官方文档理论部分中的Addressing的概念,而在Vert.x中这个地址就是一个单纯的字符串——这也造成了在使用Event Bus过程中的架构设计的难度。
  • 其次有一点就是关于数据类型,也就是官方提到的Message Types的内容,Vert.x中原生支持JsonObjectBuffer,以及String这种基础类型,它并不支持复杂的数据结构,如果您有自己的数据类型要在Event Bus中传输,怎么办?这种情况就需要您开发Codec消息编解码器来实现。

  这里看看上边代码的执行流程图:

  结合上图,是不是就更容易理解前文中的代码交互信息了?在上图中,从输出可以知道:Acceptor使用的线程名为:vert.x-eventloop-thread-2,而Worker在两次请求中使用了不同的线程:vert.x-worker-thread-11vert.x-worker-thread-15,也就是说,在Vert.x中,它的Event Loop判断同一请求是基于会话的,这时候由于会话没有改变,所以这里的Standard的Verticle组件接收请求的是同一个线程,Worker则不然,两次使用的Worker线程不是同一个,这一点也印证了Reactor模式中涉及到的非1:1的线程对等问题(这个例子中发布了4个Acceptor,却发布了16个工作线程Worker)。

  在zero中可能有一个读者容易混淆的地方,就是关于命名,通常一个Standard的Verticle组件用来接收请求时,我在定义时会给一个更符合它意义的名字,比如Router Agent,而后端的Worker类型的Verticle组件通常称为Worker。但是在zero中出现了另外的两个名字:Sender和Consumer,实际上这两个组件就是Agent和Worker中真正执行代码的Handler处理器线程的名称,如果读者熟悉了基于Event Bus的请求架构过后,理解这几个概念就不难了。它们之间可能存在如下关系:

  • 一类(请原谅我用一类)Agent组件中可能不止一种Sender,因为上边发送消息给Event Bus的代码完全有可能是几次而不是一次,也可能发送的时候将不同的消息发送给不同的地址来处理;
  • 一类Worker组件中也可能不止一种Consumer,道理同上。

4.总结

  好吧,天下无不散之筵席,这个时候,EventBus的初探过程算是告一段落,至少读者应该从上边的例子知道在一个纯的Vert.x中(不包含vertx-web)应该如何调用Event Bus,后边的章节我们会谈到应用级别,那个时候我们再回过头来讨论Event Bus的使用场景以及真正起到的异步事件的作用。同样这里只是针对Web应用中最常见的Request-Response的模式提供了相关代码,还没涉及到Point-To-PointPublish-Subscribe这两种模式,毕竟不同的模式它的场景会有所区别。

1. https://vertx.io/docs/vertx-core/java/#event_bus
2. 「Reactor模式」https://www.origin-x.cn/zero-up/5-vertx-land/zbr-006-reactormo-shi.html
3. https://vertx.io/docs/vertx-core/java/#blocking_code

results matching ""

    No results matching ""