R0002 - Rx模式的Acceptor/Worker实例

「平行章节」1.7.EventBus初探

伴随着第一章节的结束,再引入一个EventBus相关的Rx模式的实例,其目的是让读者可以在学习Vert.x的同时去慢慢领悟rxjava2的这种写法,通过对Rx模式的解读,给读者多一种选择。

我不是Rx模式的专家,所有的Rx模式的代码都是逐步翻译逐步摸索,所以并不能保证Rx模式的代码就是最佳实践,只是通过这些代码打开一扇窗,让代码本身的结构更加流畅。

1. 源代码

1.1. RxAcceptor

package io.vertx.rx._01.verticles;

import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.eventbus.EventBus;
import io.vertx.reactivex.core.http.HttpServer;

public class RxAcceptorVerticle extends AbstractVerticle {

    @Override
    public void start() {
        final HttpServer server = this.vertx.createHttpServer();
        System.out.println(Thread.currentThread().getName() + ", Rx Start Acceptor...");
        server.requestHandler(request -> {
            // 调用Event Bus
            final EventBus event = this.vertx.eventBus();
            System.out.println(Thread.currentThread().getName() + ", Rx Accept Request...");
            // 发送消息
            event.<JsonObject>rxSend("MSG://EVENT/RX/BUS",
                    new JsonObject().put("message", "Event Communication"))
                    .doOnSuccess(message -> {
                        // 发送消息回客户端
                        System.out.println(Thread.currentThread().getName() + ", Rx Reply Message...");
                        System.out.println(" Message: " + message.body());
                        request.response().end(message.body().encode());
                    }).subscribe();
        }).rxListen(8099).subscribe();
    }
}

1.2. RxWorker

package io.vertx.rx._01.verticles;

import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.eventbus.EventBus;

public class RxWorkerVerticle extends AbstractVerticle {

    @Override
    public void start() {
        System.out.println(Thread.currentThread().getName() + ", Start RxWorker...");
        final EventBus event = this.vertx.eventBus();
        // 接收消息
        event.<JsonObject>consumer("MSG://EVENT/RX/BUS", reply -> {
            System.out.println(Thread.currentThread().getName() + ", Rx Consume Message...");
            // 提取接收消息
            final JsonObject message = reply.body();
            System.out.println(" Message: " + message.encode());
            // 回复消息
            reply.rxReply(new JsonObject().put("worker", "Worker Message")).subscribe();
        });
    }
}

1.3. Launcher

package io.vertx.rx._01.event;

import io.vertx.core.DeploymentOptions;
import io.vertx.rx._01.launcher.ClusterLauncher;
import io.vertx.rx._01.launcher.Launcher;
import io.vertx.rx._01.launcher.SingleLauncher;

public class EventLauncher {

    public static void main(final String[] args) {
        // 选择哪种模式
        final boolean isClustered = false;
        final Launcher launcher = isClustered ? new ClusterLauncher() :
                new SingleLauncher();
        launcher.start(vertx -> {
            // Rx模式的发布
            vertx.rxDeployVerticle(
                    "io.vertx.rx._01.verticles.RxAcceptorVerticle",
                    new DeploymentOptions().setInstances(2))
                    .doOnSuccess(id -> {
                        System.out.println("Rx Acceptor: " + id + " has been deployed successfully!");
                    }).subscribe();
            vertx.rxDeployVerticle(
                    "io.vertx.rx._01.verticles.RxWorkerVerticle",
                    new DeploymentOptions().setInstances(4).setWorker(true))
                    .doOnSuccess(id -> {
                        System.out.println("Rx Worker: " + id + " has been deployed successfully!");
                    }).subscribe();
        });
    }
}

2. Console的输出

启动过程中,您将会看到下边的输出:

vert.x-worker-thread-0, Start RxWorker...
vert.x-worker-thread-2, Start RxWorker...
vert.x-worker-thread-1, Start RxWorker...
vert.x-worker-thread-3, Start RxWorker...
Rx Worker: 374ff6b0-bddb-4d81-83be-a241a50f0660 has been deployed successfully!
vert.x-eventloop-thread-1, Rx Start Acceptor...
vert.x-eventloop-thread-0, Rx Start Acceptor...
Rx Acceptor: 904f64ad-017d-4d3f-883e-2d1b287a70c2 has been deployed successfully!

当您使用浏览器访问:http://localhost:8099 时,可以看到:

vert.x-eventloop-thread-1, Rx Accept Request...
vert.x-worker-thread-4, Rx Consume Message...
 Message: {"message":"Event Communication"}
vert.x-eventloop-thread-1, Rx Reply Message...
 Message: {"worker":"Worker Message"}

vert.x-eventloop-thread-1, Rx Accept Request...
vert.x-worker-thread-5, Rx Consume Message...
 Message: {"message":"Event Communication"}
vert.x-eventloop-thread-1, Rx Reply Message...
 Message: {"worker":"Worker Message"}

3. 总结

值得注意的是,rxjava2的写法并不单单是上边演示的这种,上述代码只是针对平行章节的案例的直接翻译,可以简单说这些代码并没有释放出rxjava2的威力,只是单纯地改换了一种代码风格。不过万丈高楼平地起,当读者熟悉了这些写法过后,最终要按照自己的风格写出rxjava2的程序也就不难了。实际上我在写这部分代码时候就遇到了一些简单的风格差异,这种差异还不仅仅在正章和平行世界之间,包括rxjava2本身都存在,比如:

    vertx.rxDeployVerticle(
            "io.vertx.rx._01.verticles.RxAcceptorVerticle",
            new DeploymentOptions().setInstances(2))
            .doOnSuccess(id -> {
                System.out.println("Rx Acceptor: " + id + " has been deployed successfully!");
            }).subscribe();

上边的代码如果使用rxjava2的另外的写法也可以用:

final Single<String> result = vertx.rxDeployVerticle(
            "io.vertx.rx._01.verticles.RxAcceptorVerticle",
            new DeploymentOptions().setInstances(2));
result.doOnSuccess(id -> {
    System.out.println("Rx Acceptor: " + id + " has been deployed successfully!");
}).subscribe();

第二种写法在中间引入了Single<String>的引用,如果您要在这个过程中有其他的逻辑注入或者需要使用它时,就可以用第二种写法,但从代码本身上看第一种更加Fluent一点,形成了完整的链式代码。

最后需要说明的一点也是在rxjava2中遇到的小坑,初学者可能会经常犯错,如果了解rxjava2的读者应该不会犯这种问题:在整个链式代码最后不要忘记调用subscribe()方法!当然如果你使用的是Rx模式的写法,那么你的代码就会少很多类似回调代码块的内容,这种东西就仁者见仁智者见智了,因为rxjava2的代码不论如何去组织,最后一次回调的步骤似乎不能少——如doOnSuccess,这时你可能更怀念await/async这两个关键字,可惜Java不支持。

results matching ""

    No results matching ""