Skip to main content

RxJS is a popular library for composing asynchronous and event based programs using observable sequences for the JavaScript.

Vert.x integrates naturally with RxJS, allowing to use observable wherever you can use streams or asynchronous results.

Vert.x for RxJS comes as an extension for RxJS:

var Rx = require("rx.vertx");

It provides the Rx object we need for creating Observable, or other kind of Rx objects.

If you are using Maven or Gradle, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
  <groupId>{maven-groupId}</groupId>
  <artifactId>{maven-artifactId}</artifactId>
  <version>{maven-version}</version>
</dependency>
  • Gradle (in your build.gradle file):

compile {maven-groupId}:{maven-artifactId}:{maven-version}

Read stream support

RxJS observable is a perfect match for Vert.x read streams : both provide a flow of items. A read stream can be adapted to an observable with the Rx.Observable.fromReadStream function:

var Rx = require("rx.vertx");
var fs = vertx.fileSystem();
fs.open("/data.txt", {}, function(result, err) {
  var file = result.result();
  var observable = Rx.Observable.fromReadStream(file);
  observable.forEach(function(data) {
    console.log("Read data: " + data.toString("UTF-8"));
  });
});

Handler support

The rx.vertx module provides an observableHandler function:

var Rx = require("rx.vertx");
var observable = Rx.observableHandler();
observable.subscribe(
    function(evt) {
        // Got event
    }
);
vertx.setTimer(1000, observable.toHandler());

Rx can also turn an existing Observer into an handler:

var Rx = require("rx.vertx");

// Create an observer via the Rx api
var observer = Rx.Observer.create(
    function(evt) {
        // Got event
    }
);

// The rx.vertx extension augmented the observer with the toHandler method
var handler = observer.toHandler();
vertx.setTimer(1000, handler);

Future support

In Vert.x future objects are modelled as async result handlers and occur as last parameter of asynchronous methods.

The rx.vertx module provides an observableFuture function:

var Rx = require("rx.vertx");

// Create an observable that is also an handler of async result
var observable = Rx.observableFuture();
observable.subscribe(
    function(server) {
        // Server is listening
    },
    function(err) {
        // Server could not start
    }
);

var server = vertx.createHttpServer({ "port":1234, "host":"localhost" });
server.listen(observable.toHandler());

Rx can also turn an existing Observer into an future:

var Rx = require("rx.vertx");

// Create an observer via the Rx api
var observer = Rx.Observer.create(
    function(evt) {
        // Got event
    }
);

// The rx.vertx extension augmented the observer with the toHandler method
var future = observer.toFuture();

Scheduler support

RxJS relies on the default context method timeout and interval functions to schedule operations. The vertx-js integration implements such functions providing an out of the box scheduler support.

Examples

Let’s study now a few examples of using Vert.x with RxJava.

EventBus message stream

The event bus message consumer provides naturally an stream of messages:

var Rx = require("rx.vertx");

var eb = vertx.eventBus();
var messageConsumer = eb.consumer("the-address");

// Create an observable from the message consumer
var observable = Rx.Observable.fromReadStream(messageConsumer);

// Subscribe to the observable
var subscription = observable.subscribe(
    function(msg) {
        // Got message
    });

// Unregisters the stream after 10 seconds
vertx.setTimer(10000, function() {
    subscription.dispose();
});

The message consumer provides a stream of messages. The Message#body() method gives access to a new stream of message bodies if needed:

var Rx = require("rx.vertx");

var eb = vertx.eventBus();
var messageConsumer = eb.consumer("the-address");
var bodyConsumer = messageConsumer.bodyStream();

// Create an observable from the body consumer
var observable = Rx.Observable.fromReadStream(bodyConsumer);

RxJS map/reduce composition style can be then be used:

var Rx = require("rx.vertx");

var eb = vertx.eventBus();
var consumer = eb.consumer("heat-sensor").bodyStream();
var observable = Rx.Observable.fromReadStream(consumer);

observable.
    bufferWithTime(500).
    map(function (arr) { return arr.reduce(function (acc, x) { return acc + x; }, "") }).
    subscribe(
    function(heat) {
        console.log("Current heat is " + heat);
    });

Timers

Timer task can be created with Vertx#timerStream(long):

var Rx = require("rx.vertx");
var timer = Rx.Observable.fromReadStream(vertx.timerStream(1000));
timer.subscribe(function(id) {
    console.log("Callback after 1 second");
});

Periodic task can be created with Vertx#periodicStream(long):

Http client requests

The HttpClientRequest provides a one shot callback with the http.HttpClientResponse object. The observable reports a request failure.

var Rx = require("rx.vertx");
var client = vertx.createHttpClient();
var req = client.request("GET", 8080, "localhost", "/the_uri");
var observable = Rx.Observable.fromReadStream(req);
observable.subscribe(function(resp) {
    // Process the response
}, function(err) {
    // Could not connect
});
req.end();

The response can be processed as an stream of buffer:

var Rx = require("rx.vertx");
var client = vertx.createHttpClient();
var req = client.request("GET", 8080, "localhost", "/the_uri");
Rx.Observable.fromReadStream(req).subscribe(function(resp) {
    var observable = Rx.Observable.fromReadStream(resp);
    observable.forEach(function(buffer) {
        // Process buffer
    });
});
req.end();

Http server requests

The HttpServer#requestStream() provides a callback for each incoming request:

var Rx = require("rx.vertx");
var server = vertx.createHttpServer();
var requests = Rx.Observable.fromReadStream(server.requestStream());
requests.subscribe(function(request) {
    // Process the request
});

The HttpServerRequest can then be adapted to a buffer observable:

Unresolved directive in index.adoc - include::http_server_request_observable.js[tags=example]

Websocket client

The HttpClient#websocketStream provides a single callback when the websocket connects, otherwise a failure:

var Rx = require("rx.vertx");
var client = vertx.createHttpClient();
var stream = client.websocketStream(8080, "localhost", "/the_uri");
Rx.Observable.fromReadStream(stream).
    subscribe(function (ws) {
        var observable = Rx.Observable.fromReadStream(ws);
        observable.forEach(function(buffer) {
            // Process message
        });

    });

The WebSocket can then be turned into an observable of buffer easily

var Rx = require("rx.vertx");
var client = vertx.createHttpClient();
var stream = client.websocketStream(8080, "localhost", "/the_uri");
var observable = Rx.Observable.fromReadStream(stream);
observable.subscribe(function(ws) {
    // Use the websocket
}, function(err) {
    // Could not connect
});

Websocket server

The HttpServer#websocketStream() provides a callback for each incoming connection:

var Rx = require("rx.vertx");
var server = vertx.createHttpServer();
var sockets = Rx.Observable.fromReadStream(server.websocketStream());
sockets.subscribe(function(ws) {
    // Process the web socket
});

The ServerWebSocket can be turned into a buffer observable easily:

var Rx = require("rx.vertx");
var server = vertx.createHttpServer();
Rx.Observable.fromReadStream(server.websocketStream()).subscribe(function(ws) {
    var observable = Rx.Observable.fromReadStream(ws);
    observable.forEach(function(buffer) {
       // Process message
    });
});