RxJS is a popular library for composing asynchronous and event based programs using observable sequences for the JavaScript.
Vert.x integrates naturally with RxJS, allowing to use observable wherever you can use streams or asynchronous results.
Vert.x for RxJS comes as an extension for RxJS:
var Rx = require("rx.vertx");
It provides the Rx
object we need for creating Observable
, or other kind of Rx objects.
If you are using Maven or Gradle, add the following dependency to the dependencies section of your build descriptor:
Maven (in your pom.xml
):
<dependency>
<groupId>{maven-groupId}</groupId>
<artifactId>{maven-artifactId}</artifactId>
<version>{maven-version}</version>
</dependency>
Gradle (in your build.gradle
file):
compile {maven-groupId}:{maven-artifactId}:{maven-version}
RxJS observable is a perfect match for Vert.x read streams : both provide a flow of items.
A read stream can be adapted to an observable with the Rx.Observable.fromReadStream
function:
var Rx = require("rx.vertx");
var fs = vertx.fileSystem();
fs.open("/data.txt", {}, function(result, err) {
var file = result.result();
var observable = Rx.Observable.fromReadStream(file);
observable.forEach(function(data) {
console.log("Read data: " + data.toString("UTF-8"));
});
});
The rx.vertx
module provides an observableHandler
function:
var Rx = require("rx.vertx");
var observable = Rx.observableHandler();
observable.subscribe(
function(evt) {
// Got event
}
);
vertx.setTimer(1000, observable.toHandler());
Rx can also turn an existing Observer into an handler:
var Rx = require("rx.vertx");
// Create an observer via the Rx api
var observer = Rx.Observer.create(
function(evt) {
// Got event
}
);
// The rx.vertx extension augmented the observer with the toHandler method
var handler = observer.toHandler();
vertx.setTimer(1000, handler);
In Vert.x future objects are modelled as async result handlers and occur as last parameter of asynchronous methods.
The rx.vertx
module provides an observableFuture
function:
var Rx = require("rx.vertx");
// Create an observable that is also an handler of async result
var observable = Rx.observableFuture();
observable.subscribe(
function(server) {
// Server is listening
},
function(err) {
// Server could not start
}
);
var server = vertx.createHttpServer({ "port":1234, "host":"localhost" });
server.listen(observable.toHandler());
Rx can also turn an existing Observer into an future:
var Rx = require("rx.vertx");
// Create an observer via the Rx api
var observer = Rx.Observer.create(
function(evt) {
// Got event
}
);
// The rx.vertx extension augmented the observer with the toHandler method
var future = observer.toFuture();
RxJS relies on the default context method timeout and interval functions to schedule operations. The vertx-js integration implements such functions providing an out of the box scheduler support.
Let’s study now a few examples of using Vert.x with RxJava.
The event bus message consumer provides naturally an stream of messages:
var Rx = require("rx.vertx");
var eb = vertx.eventBus();
var messageConsumer = eb.consumer("the-address");
// Create an observable from the message consumer
var observable = Rx.Observable.fromReadStream(messageConsumer);
// Subscribe to the observable
var subscription = observable.subscribe(
function(msg) {
// Got message
});
// Unregisters the stream after 10 seconds
vertx.setTimer(10000, function() {
subscription.dispose();
});
The message consumer provides a stream of messages. The Message#body()
method gives access to a new
stream of message bodies if needed:
var Rx = require("rx.vertx");
var eb = vertx.eventBus();
var messageConsumer = eb.consumer("the-address");
var bodyConsumer = messageConsumer.bodyStream();
// Create an observable from the body consumer
var observable = Rx.Observable.fromReadStream(bodyConsumer);
RxJS map/reduce composition style can be then be used:
var Rx = require("rx.vertx");
var eb = vertx.eventBus();
var consumer = eb.consumer("heat-sensor").bodyStream();
var observable = Rx.Observable.fromReadStream(consumer);
observable.
bufferWithTime(500).
map(function (arr) { return arr.reduce(function (acc, x) { return acc + x; }, "") }).
subscribe(
function(heat) {
console.log("Current heat is " + heat);
});
Timer task can be created with Vertx#timerStream(long)
:
var Rx = require("rx.vertx");
var timer = Rx.Observable.fromReadStream(vertx.timerStream(1000));
timer.subscribe(function(id) {
console.log("Callback after 1 second");
});
Periodic task can be created with Vertx#periodicStream(long)
:
The HttpClientRequest provides a one shot callback with the
http.HttpClientResponse
object. The observable reports a request failure.
var Rx = require("rx.vertx");
var client = vertx.createHttpClient();
var req = client.request("GET", 8080, "localhost", "/the_uri");
var observable = Rx.Observable.fromReadStream(req);
observable.subscribe(function(resp) {
// Process the response
}, function(err) {
// Could not connect
});
req.end();
The response can be processed as an stream of buffer:
var Rx = require("rx.vertx");
var client = vertx.createHttpClient();
var req = client.request("GET", 8080, "localhost", "/the_uri");
Rx.Observable.fromReadStream(req).subscribe(function(resp) {
var observable = Rx.Observable.fromReadStream(resp);
observable.forEach(function(buffer) {
// Process buffer
});
});
req.end();
The HttpServer#requestStream()
provides a callback for each incoming
request:
var Rx = require("rx.vertx");
var server = vertx.createHttpServer();
var requests = Rx.Observable.fromReadStream(server.requestStream());
requests.subscribe(function(request) {
// Process the request
});
The HttpServerRequest
can then be adapted to a buffer observable:
Unresolved directive in index.adoc - include::http_server_request_observable.js[tags=example]
The HttpClient#websocketStream
provides a single callback when the websocket connects, otherwise a failure:
var Rx = require("rx.vertx");
var client = vertx.createHttpClient();
var stream = client.websocketStream(8080, "localhost", "/the_uri");
Rx.Observable.fromReadStream(stream).
subscribe(function (ws) {
var observable = Rx.Observable.fromReadStream(ws);
observable.forEach(function(buffer) {
// Process message
});
});
The WebSocket
can then be turned into an observable of buffer easily
var Rx = require("rx.vertx");
var client = vertx.createHttpClient();
var stream = client.websocketStream(8080, "localhost", "/the_uri");
var observable = Rx.Observable.fromReadStream(stream);
observable.subscribe(function(ws) {
// Use the websocket
}, function(err) {
// Could not connect
});
The HttpServer#websocketStream()
provides a callback for each incoming connection:
var Rx = require("rx.vertx");
var server = vertx.createHttpServer();
var sockets = Rx.Observable.fromReadStream(server.websocketStream());
sockets.subscribe(function(ws) {
// Process the web socket
});
The ServerWebSocket
can be turned into a buffer observable easily:
var Rx = require("rx.vertx");
var server = vertx.createHttpServer();
Rx.Observable.fromReadStream(server.websocketStream()).subscribe(function(ws) {
var observable = Rx.Observable.fromReadStream(ws);
observable.forEach(function(buffer) {
// Process message
});
});