Vert.x RxJava
Warning
|
RxJava1 is end-of-life as of March 31, 2018. Bindings for new Vert.x modules will not be generated. Consider migrating to Vert.x RxJava3. |
Vert.x API for RxJava
RxJava is a popular library for composing asynchronous and event based programs using observable sequences for the Java VM. Vert.x integrates naturally with RxJava, allowing using observable wherever you can use streams or asynchronous results.
Using Vert.x API for RxJava1
To use Vert.x API for RxJava1, add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rx-java</artifactId>
<version>4.2.7</version>
</dependency>
-
Gradle (in your
build.gradle
file):
compile 'io.vertx:vertx-rx-java:4.2.7'
There are two ways for using the RxJava API with Vert.x:
-
via the original Vert.x API with the
RxHelper
helper class that provides static methods for converting objects between Vert.x core API and RxJava API. -
via the Rxified Vert.x API enhancing the core Vert.x API.
Read stream support
RxJava Observable
is a perfect match for Vert.x ReadStream
class : both provide a flow of items.
The RxHelper.toObservable
static methods convert
a Vert.x read stream to an rx.Observable
:
FileSystem fileSystem = vertx.fileSystem();
fileSystem.open("/data.txt", new OpenOptions(), result -> {
AsyncFile file = result.result();
Observable<Buffer> observable = RxHelper.toObservable(file);
observable.forEach(data -> System.out.println("Read data: " + data.toString("UTF-8")));
});
The Rxified Vert.x API provides a toObservable
method on
ReadStream
:
FileSystem fs = vertx.fileSystem();
fs.open("/data.txt", new OpenOptions(), result -> {
AsyncFile file = result.result();
Observable<Buffer> observable = file.toObservable();
observable.forEach(data -> System.out.println("Read data: " + data.toString("UTF-8")));
});
Such observables are hot observables, i.e. they will produce notifications regardless of subscriptions because
a ReadStream
can potentially emit items spontaneously or not, depending on the implementation:
At subscription time, the adapter calls handler
to set its own handler.
Some ReadStream
implementations can start to emit events after this call, others will emit events whether an
handler is set or not:
-
AsyncFile
produces buffer events after the handler is set -
HttpServerRequest
produces events independantly of the handler (i.e buffer may be lost if no handler is set)
In both cases, subscribing to the Observable
in the same call is safe because the event loop or the worker
verticles cannot be called concurrently, so the subscription will always happens before the handler starts emitting
data.
When you need to delay the subscription, you need to pause
the ReadStream
and then resume
it, which is what
you would do with a ReadStream
.
server.requestHandler(request -> {
if (request.method() == HttpMethod.POST) {
// Stop receiving buffers
request.pause();
checkAuth(res -> {
// Now we can receive buffers again
request.resume();
if (res.succeeded()) {
Observable<Buffer> observable = request.toObservable();
observable.subscribe(buff -> {
// Get buffers
});
}
});
}
});
Likewise it is possible to turn an existing Observable
into a Vert.x ReadStream
.
The RxHelper.toReadStream
static methods convert
an rx.Observable
to a Vert.x read stream:
Observable<Buffer> observable = getObservable();
ReadStream<Buffer> readStream = RxHelper.toReadStream(observable);
Pump pump = Pump.pump(readStream, response);
pump.start();
Write stream support
A WriteStream
, like a rx.Subscriber
, consumes items, and, when it can’t keep-up, collaborates with the producer to avoid an ever-growing backlog.
Vert.x provides the WriteStreamSubscriber
adapter that you can use to send Observable
items to any WriteStream
:
response.setChunked(true);
WriteStreamSubscriber<io.vertx.core.buffer.Buffer> subscriber = io.vertx.rx.java.RxHelper.toSubscriber(response);
observable.subscribe(subscriber);
If you are progamming with the Rxified Vert.x API, the WriteStream
implementations provide a toSubscriber
method.
The previous example then becomes even more straightforward:
response.setChunked(true);
observable.subscribe(response.toSubscriber());
Note
|
When the Observable terminates successfully, the adapter invokes the end method.
|
Caution
|
The adapter sets the WriteStream drain and exception handlers, so don’t use them after subscribing.
|
The WriteStreamSubscriber
adapter is able to invoke callbacks when:
-
the
Observable
terminates with an error, or -
the
WriteStream
fails (e.g. HTTP connection is closed or filesystem is full), or -
the
WriteStream
ends (i.e. all writes done and file is closed), or -
the
WriteStream
ends with an error (i.e. all writes done and an error occured when closing the file)
This allows for a more robust program design, as well as scheduling other tasks after the stream has been handled:
response.setChunked(true);
WriteStreamSubscriber<Buffer> subscriber = response.toSubscriber();
subscriber.onError(throwable -> {
if (!response.headWritten() && response.closed()) {
response.setStatusCode(500).end("oops");
} else {
// log error
}
});
subscriber.onWriteStreamError(throwable -> {
// log error
});
subscriber.onWriteStreamEnd(() -> {
// log end of transaction to audit system...
});
observable.subscribe(subscriber);
Note
|
If the WriteStream fails, the adapter unsubscribes from the Observable .
|
Handler support
The RxHelper
can create an ObservableHandler
: an Observable
with a
toHandler
method returning an Handler<T>
implementation:
ObservableHandler<Long> observable = RxHelper.observableHandler();
observable.subscribe(id -> {
// Fired
});
vertx.setTimer(1000, observable.toHandler());
The Rxified Vert.x API does not provide a specific API for handler.
Async result support
You can create an RxJava Subscriber
from an existing Vert.x Handler<AsyncResult<T>>
and subscribe
it to an Observable
or a Single
:
observable.subscribe(RxHelper.toSubscriber(handler1));
// Subscribe to a Single
single.subscribe(RxHelper.toSubscriber(handler2));
The Vert.x Handler<AsyncResult<T>>
construct occuring as last parameter of an asynchronous method can
be mapped to an observable of a single element:
-
when the callback is a success, the observer
onNext
method is called with the item and theonComplete
method is immediately invoked after -
when the callback is a failure, the observer
onError
method is called
The RxHelper.observableFuture
method creates an ObservableFuture
:
an Observable
with a toHandler
method returning a Handler<AsyncResult<T>>
implementation:
ObservableFuture<HttpServer> observable = RxHelper.observableFuture();
observable.subscribe(
server -> {
// Server is listening
},
failure -> {
// Server could not start
}
);
vertx.createHttpServer(new HttpServerOptions().
setPort(1234).
setHost("localhost")
).listen(observable.toHandler());
The ObservableFuture<Server>
will get a single HttpServer
object, if the listen
operation fails,
the subscriber will be notified with the failure.
The RxHelper.toHandler
method adapts an existing Observer
into an handler:
Observer<HttpServer> observer = new Observer<HttpServer>() {
@Override
public void onNext(HttpServer o) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onCompleted() {
}
};
Handler<AsyncResult<HttpServer>> handler = RxHelper.toFuture(observer);
It also works with just actions:
Action1<HttpServer> onNext = httpServer -> {};
Action1<Throwable> onError = httpServer -> {};
Action0 onComplete = () -> {};
Handler<AsyncResult<HttpServer>> handler1 = RxHelper.toFuture(onNext);
Handler<AsyncResult<HttpServer>> handler2 = RxHelper.toFuture(onNext, onError);
Handler<AsyncResult<HttpServer>> handler3 = RxHelper.toFuture(onNext, onError, onComplete);
The Rxified Vert.x API duplicates each such method with the rx
prefix that returns an RxJava Single
:
Single<HttpServer> single = vertx
.createHttpServer()
.rxListen(1234, "localhost");
// Subscribe to bind the server
single.
subscribe(
server -> {
// Server is listening
},
failure -> {
// Server could not start
}
);
Such single are cold singles, and the corresponding API method is called on subscribe.
Note
|
the rx* methods replace the *Observable of the previous Rxified versions with a semantic
change to be more in line with RxJava.
|
Scheduler support
The reactive extension sometimes needs to schedule actions, for instance Observable#timer
creates and returns
a timer that emit periodic events. By default, scheduled actions are managed by RxJava, it means that the
timer threads are not Vert.x threads and therefore not executing in a Vert.x event loop.
When an RxJava method deals with a scheduler, it accepts an overloaded method accepting an extra rx.Scheduler
,
the RxHelper.scheduler
method will return a scheduler that can be used
in such places.
Scheduler scheduler = RxHelper.scheduler(vertx);
Observable<Long> timer = Observable.timer(100, 100, TimeUnit.MILLISECONDS, scheduler);
For blocking scheduled actions, a scheduler can be created with the RxHelper.blockingScheduler
method:
Scheduler scheduler = RxHelper.blockingScheduler(vertx);
Observable<Integer> obs = blockingObservable.observeOn(scheduler);
RxJava can also be reconfigured to use the Vert.x scheduler, thanks to the scheduler hook created with
RxHelper.schedulerHook
, the returned scheduler hook
uses a blocking scheduler for IO actions:
RxJavaSchedulersHook hook = RxHelper.schedulerHook(vertx);
RxJavaHooks.setOnIOScheduler(f -> hook.getIOScheduler());
RxJavaHooks.setOnNewThreadScheduler(f -> hook.getNewThreadScheduler());
RxJavaHooks.setOnComputationScheduler(f -> hook.getComputationScheduler());
The Rxified Vert.x API provides also similar method on the RxHelper
class:
Scheduler scheduler = io.vertx.rxjava.core.RxHelper.scheduler(vertx);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
RxJavaSchedulersHook hook = io.vertx.rxjava.core.RxHelper.schedulerHook(vertx);
RxJavaHooks.setOnIOScheduler(f -> hook.getIOScheduler());
RxJavaHooks.setOnNewThreadScheduler(f -> hook.getNewThreadScheduler());
RxJavaHooks.setOnComputationScheduler(f -> hook.getComputationScheduler());
It is also possible to create a scheduler backed by a named worker pool. This can be useful if you want to re-use the specific thread pool for scheduling blocking actions:
Scheduler scheduler = io.vertx.rxjava.core.RxHelper.scheduler(workerExecutor);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
Json unmarshalling
The RxHelper.unmarshaller
creates an rx.Observable.Operator
that
transforms an Observable<Buffer>
in json format into an object observable:
fileSystem.open("/data.txt", new OpenOptions(), result -> {
AsyncFile file = result.result();
Observable<Buffer> observable = RxHelper.toObservable(file);
observable.lift(RxHelper.unmarshaller(MyPojo.class)).subscribe(
mypojo -> {
// Process the object
}
);
});
The same can be done with the Rxified helper:
fileSystem.open("/data.txt", new OpenOptions(), result -> {
AsyncFile file = result.result();
Observable<Buffer> observable = file.toObservable();
observable.lift(io.vertx.rxjava.core.RxHelper.unmarshaller(MyPojo.class)).subscribe(
mypojo -> {
// Process the object
}
);
});
Deploying a Verticle
The Rxified API cannot deploy an existing Verticle instance, the helper RxHelper.observableFuture
method
provides a solution to that.
The RxHelper.deployVerticle
does it automatically
for you, it deploys a Verticle
and returns an Observable<String>
of the deployment ID.
Observable<String> deployment = RxHelper.deployVerticle(vertx, verticle);
deployment.subscribe(id -> {
// Deployed
}, err -> {
// Could not deploy
});
Rxified API
The Rxified API is a code generated version of the Vert.x API, just like the JavaScript or Groovy
language. The API uses the io.vertx.rxjava
prefix, for instance the io.vertx.core.Vertx
class is
translated to the Vertx
class.
Embedding Rxfified Vert.x
Just use the Vertx.vertx
methods:
Vertx vertx = io.vertx.rxjava.core.Vertx.vertx();
As a Verticle
Extend the AbstractVerticle
class, it will wrap it for you:
class MyVerticle extends io.vertx.rxjava.core.AbstractVerticle {
public void start() {
// Use Rxified Vertx here
}
}
Deploying an RxJava verticle is still performed by the Java deployer and does not need a specified deployer.
Verticles having an asynchronous start can override instead the rxStart
method and return a Completable
:
class MyVerticle extends io.vertx.rxjava.core.AbstractVerticle {
public Completable rxStart() {
return vertx.createHttpServer()
.requestHandler(req -> req.response().end("Hello World"))
.rxListen()
.toCompletable();
}
}
Api examples
Let’s study now a few examples of using Vert.x with RxJava.
EventBus message stream
The event bus MessageConsumer
provides naturally an Observable<Message<T>>
:
EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.<String>consumer("the-address");
Observable<Message<String>> observable = consumer.toObservable();
Subscription sub = observable.subscribe(msg -> {
// Got message
});
// Unregisters the stream after 10 seconds
vertx.setTimer(10000, id -> {
sub.unsubscribe();
});
The MessageConsumer
provides a stream of Message
.
The body
gives access to a new stream of message bodies if needed:
EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.<String>consumer("the-address");
Observable<String> observable = consumer.bodyStream().toObservable();
RxJava map/reduce composition style can then be used:
Observable<Double> observable = vertx.eventBus().
<Double>consumer("heat-sensor").
bodyStream().
toObservable();
observable.
buffer(1, TimeUnit.SECONDS).
map(samples -> samples.
stream().
collect(Collectors.averagingDouble(d -> d))).
subscribe(heat -> {
vertx.eventBus().send("news-feed", "Current heat is " + heat);
});
Timers
Timer task can be created with timerStream
:
vertx.timerStream(1000).
toObservable().
subscribe(
id -> {
System.out.println("Callback after 1 second");
}
);
Periodic task can be created with periodicStream
:
vertx.periodicStream(1000).
toObservable().
subscribe(
id -> {
System.out.println("Callback every second");
}
);
The observable can be cancelled with an unsubscription:
vertx.periodicStream(1000).
toObservable().
subscribe(new Subscriber<Long>() {
public void onNext(Long aLong) {
// Callback
unsubscribe();
}
public void onError(Throwable e) {}
public void onCompleted() {}
});
Http client requests
rxRequest
provides a single of an
HttpClientRequest
. The single reports a request failure.
Once you have a request, calling rxSend
will
send the request and gives a response back.
HttpClient client = vertx.createHttpClient(new HttpClientOptions());
Single<HttpClientResponse> request = client
.rxRequest(HttpMethod.GET, 8080, "localhost", "/the_uri")
.flatMap(HttpClientRequest::rxSend);
request.subscribe(
response -> {
// Process the response
},
error -> {
// Could not connect
}
);
The response can be processed as an Observable<Buffer>
with the
toObservable
method:
Single<HttpClientResponse> request = client
.rxRequest(HttpMethod.GET, 8080, "localhost", "/the_uri")
.flatMap(HttpClientRequest::rxSend);
request.toObservable().
subscribe(
response -> {
Observable<Buffer> observable = response.toObservable();
observable.forEach(
buffer -> {
// Process buffer
}
);
}
);
The same flow can be achieved with the flatMap
operation:
Single<HttpClientResponse> request = client
.rxRequest(HttpMethod.GET, 8080, "localhost", "/the_uri")
.flatMap(HttpClientRequest::rxSend);
request.toObservable().
flatMap(HttpClientResponse::toObservable).
forEach(
buffer -> {
// Process buffer
}
);
We can also unmarshall the Observable<Buffer>
into an object using the RxHelper.unmarshaller
static method. This method creates an Rx.Observable.Operator
unmarshalling buffers to an object:
Single<HttpClientResponse> request = client
.rxRequest(HttpMethod.GET, 8080, "localhost", "/the_uri")
.flatMap(HttpClientRequest::rxSend);
request.toObservable().
flatMap(HttpClientResponse::toObservable).
lift(io.vertx.rxjava.core.RxHelper.unmarshaller(MyPojo.class)).
forEach(
pojo -> {
// Process pojo
}
);
Http server requests
The requestStream
provides a callback for each incoming
request:
Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable();
requestObservable.subscribe(request -> {
// Process request
});
The HttpServerRequest
can then be adapted to an Observable<Buffer>
:
Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable();
requestObservable.subscribe(request -> {
Observable<Buffer> observable = request.toObservable();
});
The RxHelper.unmarshaller
can be used to parse and map
a json request to an object:
Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable();
requestObservable.subscribe(request -> {
Observable<MyPojo> observable = request.
toObservable().
lift(io.vertx.rxjava.core.RxHelper.unmarshaller(MyPojo.class));
});
Websocket client
The rxWebSocket
provides a single callback when the websocket
connects, otherwise a failure:
HttpClient client = vertx.createHttpClient(new HttpClientOptions());
client.rxWebSocket(8080, "localhost", "/the_uri").subscribe(
ws -> {
// Use the websocket
},
error -> {
// Could not connect
}
);
The WebSocket
can then be turned into an Observable<Buffer>
easily:
socketObservable.subscribe(
socket -> {
Observable<Buffer> dataObs = socket.toObservable();
dataObs.subscribe(buffer -> {
System.out.println("Got message " + buffer.toString("UTF-8"));
});
}
);
Websocket server
The webSocketStream
provides a callback for each incoming
connection:
Observable<ServerWebSocket> socketObservable = server.webSocketStream().toObservable();
socketObservable.subscribe(
socket -> System.out.println("Web socket connect"),
failure -> System.out.println("Should never be called"),
() -> {
System.out.println("Subscription ended or server closed");
}
);
The ServerWebSocket
can be turned into an Observable<Buffer>
easily:
socketObservable.subscribe(
socket -> {
Observable<Buffer> dataObs = socket.toObservable();
dataObs.subscribe(buffer -> {
System.out.println("Got message " + buffer.toString("UTF-8"));
});
}
);