R0002 - Rx模式的Acceptor/Worker实例
「平行章节」:1.7.EventBus初探
伴随着第一章节的结束,再引入一个EventBus相关的Rx模式的实例,其目的是让读者可以在学习Vert.x的同时去慢慢领悟rxjava2
的这种写法,通过对Rx模式的解读,给读者多一种选择。
我不是Rx模式的专家,所有的Rx模式的代码都是逐步翻译逐步摸索,所以并不能保证Rx模式的代码就是最佳实践,只是通过这些代码打开一扇窗,让代码本身的结构更加流畅。
1. 源代码
1.1. RxAcceptor
package io.vertx.rx._01.verticles;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.eventbus.EventBus;
import io.vertx.reactivex.core.http.HttpServer;
public class RxAcceptorVerticle extends AbstractVerticle {
@Override
public void start() {
final HttpServer server = this.vertx.createHttpServer();
System.out.println(Thread.currentThread().getName() + ", Rx Start Acceptor...");
server.requestHandler(request -> {
// 调用Event Bus
final EventBus event = this.vertx.eventBus();
System.out.println(Thread.currentThread().getName() + ", Rx Accept Request...");
// 发送消息
event.<JsonObject>rxSend("MSG://EVENT/RX/BUS",
new JsonObject().put("message", "Event Communication"))
.doOnSuccess(message -> {
// 发送消息回客户端
System.out.println(Thread.currentThread().getName() + ", Rx Reply Message...");
System.out.println(" Message: " + message.body());
request.response().end(message.body().encode());
}).subscribe();
}).rxListen(8099).subscribe();
}
}
1.2. RxWorker
package io.vertx.rx._01.verticles;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.eventbus.EventBus;
public class RxWorkerVerticle extends AbstractVerticle {
@Override
public void start() {
System.out.println(Thread.currentThread().getName() + ", Start RxWorker...");
final EventBus event = this.vertx.eventBus();
// 接收消息
event.<JsonObject>consumer("MSG://EVENT/RX/BUS", reply -> {
System.out.println(Thread.currentThread().getName() + ", Rx Consume Message...");
// 提取接收消息
final JsonObject message = reply.body();
System.out.println(" Message: " + message.encode());
// 回复消息
reply.rxReply(new JsonObject().put("worker", "Worker Message")).subscribe();
});
}
}
1.3. Launcher
package io.vertx.rx._01.event;
import io.vertx.core.DeploymentOptions;
import io.vertx.rx._01.launcher.ClusterLauncher;
import io.vertx.rx._01.launcher.Launcher;
import io.vertx.rx._01.launcher.SingleLauncher;
public class EventLauncher {
public static void main(final String[] args) {
// 选择哪种模式
final boolean isClustered = false;
final Launcher launcher = isClustered ? new ClusterLauncher() :
new SingleLauncher();
launcher.start(vertx -> {
// Rx模式的发布
vertx.rxDeployVerticle(
"io.vertx.rx._01.verticles.RxAcceptorVerticle",
new DeploymentOptions().setInstances(2))
.doOnSuccess(id -> {
System.out.println("Rx Acceptor: " + id + " has been deployed successfully!");
}).subscribe();
vertx.rxDeployVerticle(
"io.vertx.rx._01.verticles.RxWorkerVerticle",
new DeploymentOptions().setInstances(4).setWorker(true))
.doOnSuccess(id -> {
System.out.println("Rx Worker: " + id + " has been deployed successfully!");
}).subscribe();
});
}
}
2. Console的输出
启动过程中,您将会看到下边的输出:
vert.x-worker-thread-0, Start RxWorker...
vert.x-worker-thread-2, Start RxWorker...
vert.x-worker-thread-1, Start RxWorker...
vert.x-worker-thread-3, Start RxWorker...
Rx Worker: 374ff6b0-bddb-4d81-83be-a241a50f0660 has been deployed successfully!
vert.x-eventloop-thread-1, Rx Start Acceptor...
vert.x-eventloop-thread-0, Rx Start Acceptor...
Rx Acceptor: 904f64ad-017d-4d3f-883e-2d1b287a70c2 has been deployed successfully!
当您使用浏览器访问:http://localhost:8099 时,可以看到:
vert.x-eventloop-thread-1, Rx Accept Request...
vert.x-worker-thread-4, Rx Consume Message...
Message: {"message":"Event Communication"}
vert.x-eventloop-thread-1, Rx Reply Message...
Message: {"worker":"Worker Message"}
vert.x-eventloop-thread-1, Rx Accept Request...
vert.x-worker-thread-5, Rx Consume Message...
Message: {"message":"Event Communication"}
vert.x-eventloop-thread-1, Rx Reply Message...
Message: {"worker":"Worker Message"}
3. 总结
值得注意的是,rxjava2
的写法并不单单是上边演示的这种,上述代码只是针对平行章节的案例的直接翻译,可以简单说这些代码并没有释放出rxjava2
的威力,只是单纯地改换了一种代码风格。不过万丈高楼平地起,当读者熟悉了这些写法过后,最终要按照自己的风格写出rxjava2
的程序也就不难了。实际上我在写这部分代码时候就遇到了一些简单的风格差异,这种差异还不仅仅在正章和平行世界之间,包括rxjava2
本身都存在,比如:
vertx.rxDeployVerticle(
"io.vertx.rx._01.verticles.RxAcceptorVerticle",
new DeploymentOptions().setInstances(2))
.doOnSuccess(id -> {
System.out.println("Rx Acceptor: " + id + " has been deployed successfully!");
}).subscribe();
上边的代码如果使用rxjava2
的另外的写法也可以用:
final Single<String> result = vertx.rxDeployVerticle(
"io.vertx.rx._01.verticles.RxAcceptorVerticle",
new DeploymentOptions().setInstances(2));
result.doOnSuccess(id -> {
System.out.println("Rx Acceptor: " + id + " has been deployed successfully!");
}).subscribe();
第二种写法在中间引入了Single<String>
的引用,如果您要在这个过程中有其他的逻辑注入或者需要使用它时,就可以用第二种写法,但从代码本身上看第一种更加Fluent
一点,形成了完整的链式代码。
最后需要说明的一点也是在rxjava2
中遇到的小坑,初学者可能会经常犯错,如果了解rxjava2
的读者应该不会犯这种问题:在整个链式代码最后不要忘记调用subscribe()
方法!当然如果你使用的是Rx模式的写法,那么你的代码就会少很多类似回调代码块的内容,这种东西就仁者见仁智者见智了,因为rxjava2
的代码不论如何去组织,最后一次回调的步骤似乎不能少——如doOnSuccess
,这时你可能更怀念await/async
这两个关键字,可惜Java不支持。