Skip to main content

RxJava is a popular library for composing asynchronous and event based programs using observable sequences for the Java VM. RxGroovy is the Reactive Extensions for Groovy. This adaptor allows groovy.lang.Closure functions to be used and RxJava will know how to invoke them.

Vert.x integrates naturally with RxGroovy, allowing to use observable wherever you can use streams or asynchronous results.

To use vert.x API for RxGroovy, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
  <groupId>{maven-groupId}</groupId>
  <artifactId>{maven-artifactId}</artifactId>
  <version>{maven-version}</version>
</dependency>
  • Gradle (in your build.gradle file):

compile {maven-groupId}:{maven-artifactId}:{maven-version}

Read stream support

RxJava observable is a perfect match for Vert.x ReadStream class : both provide a flow of items.

Vert.x API for Groovy provides io.vertx.groovy.core.stream.ReadStream objects, the RxGroovy provides a Groovy extension module that adds the toObservable method to the read stream class.

def fs = vertx.fileSystem()
fs.open("/data.txt", [:], { result ->
  def file = result.result()
  def observable = file.toObservable()
  observable.forEach({ data -> println("Read data: ${data.toString("UTF-8")}") })
})

Handler support

The RxJava io.vertx.ext.rx.java.RxHelper should be used to: - create an io.vertx.ext.rx.java.ObservableHandler, - transform actions to an handler

The RxGroovy extension module adds the toHandler method on the rx.Observer class:

Observer<Long> observer = Observers.create({ item -> println("Timer fired!") })
Handler<Long> handler = observer.toHandler()
vertx.setTimer(1000, handler)

Async result support

In Vert.x future objects are modelled as async result handlers and occur as last parameter of asynchronous methods.

The RxGroovy extension module adds the toFuture method on the rx.Observer class:

Observer<HttpServer> observer = Observers.create({ server -> println("Server started") })
Handler<AsyncResult<HttpServer>> handler = observer.toFuture()
vertx.createHttpServer([port:1234,host:"localhost"]).listen(handler)

Scheduler support

The reactive extension sometimes needs to schedule actions, for instance Observable#timer create and returns a timer that emit periodic events. By default, scheduled actions are managed by RxJava, it means that the timer thread are not Vert.x threads and therefore not executing in a Vert.x event loop.

When an RxJava method deals with a scheduler, it accepts an overloaded method accepting an extra Rx.Scheduler, the RxGroovy extension module adds to the Vertx class the scheduler() method will return a scheduler that can be used in such places.

Observable<Long> timer = Observable.timer(100, 100, TimeUnit.MILLISECONDS, vertx.scheduler())

For blocking scheduled actions, a scheduler can be created with the blockingScheduler method:

Scheduler scheduler = vertx.blockingScheduler();
Observable<Integer> obs = blockingObservable.observeOn(scheduler);

RxJava can also be configured to use a scheduler by default, the returned scheduler hook uses a blocking scheduler for IO actions:

RxJavaSchedulersHook hook = RxHelper.schedulerHook(vertx)
RxJavaPlugins.getInstance().registerSchedulersHook(hook)

Json unmarshalling

The io.vertx.rx.groovy.RxHelper#unmarshaller(java.lang.Class)} creates an rx.Observable.Operator that transforms an Observable<Buffer> in json format into an object observable:

def fileSystem = vertx.fileSystem()
fileSystem.open("/data.txt", [:], { result ->
  AsyncFile file = result.result()
  Observable<Buffer> observable = file.toObservable()
  observable.lift(RxHelper.unmarshaller(MyPojo.class)).subscribe({ mypojo ->
      // Process the object
  })
})

Api examples

Let’s study now a few examples of using Vert.x with RxJava.

EventBus message stream

The event bus MessageConsumer provides naturally an Observable<Message<T>>:

EventBus eb = vertx.eventBus()
MessageConsumer<String> consumer = eb.<String>consumer("the-address")
Observable<Message<String>> observable = consumer.toObservable()
Subscription sub = observable.subscribe({ msg ->
  // Got message
});

// Unregisters the stream after 10 seconds
vertx.setTimer(10000, { id ->
  sub.unsubscribe()
});

The MessageConsumer provides a stream of Message. The Message#body() gives access to a new stream of message bodies if needed:

EventBus eb = vertx.eventBus()
MessageConsumer<String> consumer = eb.<String>consumer("the-address")
Observable<String> observable = consumer.bodyStream().toObservable()

RxJava map/reduce composition style can be then be used:

Observable<Double> observable = vertx.eventBus().
<Double>consumer("heat-sensor").
    bodyStream().
    toObservable()

observable.
    buffer(1, TimeUnit.SECONDS).
    map({ samples -> samples.sum() }).
    subscribe({ heat ->
      vertx.eventBus().send("news-feed", "Current heat is " + heat)
    });

Timers

Timer task can be created with Vertx#timerStream(long):

vertx.timerStream(1000).
    toObservable().
    subscribe({ id ->
          println("Callback after 1 second")
        }
    )

Periodic task can be created with Vertx#periodicStream(long):

vertx.periodicStream(1000).
    toObservable().
    subscribe({ id ->
          println("Callback every second")
        }
    )

The observable can be cancelled with an unsubscription:

vertx.periodicStream(1000).
    toObservable().
    subscribe(new Subscriber<Long>() {
      public void onNext(Long aLong) {
        // Callback
        unsubscribe()
      }
      public void onError(Throwable e) {}
      public void onCompleted() {}
    })

Http client requests

HttpClientRequest#toObservable() provides a one shot callback with the HttpClientResponse} object. The observable reports a request failure.

HttpClientRequest request = client.request(HttpMethod.GET, 8080, "localhost", "/the_uri")
request.toObservable().subscribe({ response ->
  // Process the response
}, { error ->
  // Could not connect
})
request.end()
 The response can be processed as an `Observable<Buffer>` with the
`HttpClientResponse#toObservable()` method:
HttpClientRequest request = client.request(HttpMethod.GET, 8080, "localhost", "/the_uri")
request.toObservable().
    subscribe(
        { response ->
          Observable<Buffer> observable = response.toObservable()
          observable.forEach(
              { buffer ->
                // Process buffer
              })
        })

The same flow can be achieved with the flatMap operation:

request.toObservable().
    flatMap({resp -> resp.&toObservable}).
    forEach(
        { buffer ->
      // Process buffer
    })

We can also unmarshall the Observable<Buffer> into an object using the {@link io.vertx.rx.groovy.RxHelpe.RxHelper#unmarshaller(java.lang.Class)} static method. This method creates an Rx.Observable.Operator unmarshalling buffers to an object:

request.toObservable().
    flatMap({ resp -> resp.&toObservable }).
    lift(RxHelper.unmarshaller(MyPojo.class)).
    forEach({ pojo ->
      // Process pojo
    })

Http server requests

The HttpServer#requestStream() provides a callback for each incoming request:

Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable()
requestObservable.subscribe({ request ->
  // Process request
})

The HttpServerRequest can then be adapted to an Observable<Buffer>:

Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable()
requestObservable.subscribe({ request ->
  Observable<Buffer> observable = request.toObservable()
});

The io.vertx.rx.groovy.RxHelpe.RxHelper#unmarshaller(java.lang.Class)} can be used to parse and map a json request to an object:

Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable()
requestObservable.subscribe({ request ->
  Observable<MyPojo> observable = request.
      toObservable().
      lift(RxHelper.unmarshaller(MyPojo.class))
})

Websocket client

The`HttpClient#websocketStream`} provides a single callback when the websocket connects, otherwise a failure:

HttpClient client = vertx.createHttpClient()
WebSocketStream stream = client.websocketStream(8080, "localhost", "/the_uri")
Observable<WebSocket> socketObservable = stream.toObservable()
socketObservable.subscribe(
    { ws ->
      // Use the websocket
    }, { error ->
  // Could not connect
})

The WebSocket can then be turned into an Observable<Buffer> easily

socketObservable.subscribe(
    { socket ->
      Observable<Buffer> dataObs = socket.toObservable()
      dataObs.subscribe({ buffer ->
        println("Got message ${buffer.toString("UTF-8")}")
      })
    })

Websocket server

The HttpServer#websocketStream() provides a callback for each incoming connection:

Observable<ServerWebSocket> socketObservable = server.websocketStream().toObservable()
socketObservable.subscribe(
    { socket -> println("Web socket connect") },
    { failure -> println("Should never be called") },
    { println("Subscription ended or server closed") }
)

The ServerWebSocket can be turned into an Observable<Buffer> easily:

Observable<ServerWebSocket> socketObservable = server.websocketStream().toObservable()
socketObservable.subscribe({ socket ->
  Observable<Buffer> dataObs = socket.toObservable()
  dataObs.subscribe({ buffer ->
    println("Got message ${buffer.toString("UTF-8")}")
  })
})