Reactive

Vert.x RxJava

Vert.x API for RxJava3

RxJava is a popular library for composing asynchronous and event based programs using observable sequences for the Java VM.

Vert.x integrates naturally with RxJava, allowing using RxJava wherever you can use streams or asynchronous results.

Using Vert.x API for RxJava3

To use Vert.x API for RxJava3, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-rx-java3</artifactId>
 <version>4.5.11</version>
</dependency>
  • Gradle (in your build.gradle file):

compile 'io.vertx:vertx-rx-java3:4.5.11'

There are two ways for using the RxJava 3 API with Vert.x:

Read stream support

RxJava Flowable is a perfect match for Vert.x ReadStream class : both provide a flow of items.

The FlowableHelper.toFlowable static methods convert a Vert.x read stream to a Flowable:

FileSystem fileSystem = vertx.fileSystem();
fileSystem.open("/data.txt", new OpenOptions(), result -> {
  AsyncFile file = result.result();
  Flowable<Buffer> observable = FlowableHelper.toFlowable(file);
  observable.forEach(data -> System.out.println("Read data: " + data.toString("UTF-8")));
});

The Rxified Vert.x API provides a toFlowable method on ReadStream:

FileSystem fs = vertx.fileSystem();
fs.rxOpen("/data.txt", new OpenOptions())
  .flatMapPublisher(file -> file.toFlowable())
  .subscribe(data -> System.out.println("Read data: " + data.toString("UTF-8")));

Such flowables are hot flowables, i.e. they will produce notifications regardless of subscriptions because a ReadStream can potentially emit items spontaneously or not, depending on the implementation:

At subscription time, the adapter calls handler to set its own handler.

Some ReadStream implementations can start to emit events after this call, others will emit events wether an handler is set:

  • AsyncFile produces buffer events after the handler is set

  • HttpServerRequest produces events independantly of the handler (i.e buffer may be lost if no handler is set)

In both cases, subscribing to the Flowable in the same call is safe because the event loop or the worker verticles cannot be called concurrently, so the subscription will always happens before the handler starts emitting data.

When you need to delay the subscription, you need to pause the ReadStream and then resume it, which is what you would do with a ReadStream.

server.requestHandler(request -> {
  if (request.method() == HttpMethod.POST) {

    // Stop receiving buffers
    request.pause();

    checkAuth(res -> {

      // Now we can receive buffers again
      request.resume();

      if (res.succeeded()) {
        Flowable<Buffer> flowable = request.toFlowable();
        flowable.subscribe(buff -> {
          // Get buffers
        });
      }
    });
  }
});

Likewise it is possible to turn an existing Flowable into a Vert.x ReadStream.

The FlowableHelper.toReadStream static methods convert a Flowable to a Vert.x read stream:

Flowable<Buffer> observable = getFlowable();
ReadStream<Buffer> readStream = FlowableHelper.toReadStream(observable);
readStream.pipeTo(response);

Write stream support

A WriteStream, like a org.reactivestreams.Subscriber, consumes items, and, when it can’t keep-up, collaborates with the producer to avoid an ever-growing backlog.

Vert.x provides the WriteStreamSubscriber adapter that you can use to send Flowable items to any WriteStream:

Sending buffers to an HTTP server response
response.setChunked(true);
WriteStreamSubscriber<io.vertx.core.buffer.Buffer> subscriber = io.vertx.rxjava3.RxHelper.toSubscriber(response);
flowable.subscribe(subscriber);
There is also a io.vertx.rxjava3.WriteStreamObserver adapter for the non-backpressured io.reactivex.Observable. The difference is that this adapter will send items to the WriteStream even when it can’t keep-up with the producer rate.

If you are progamming with the Rxified Vert.x API, the WriteStream implementations provide a toSubscriber method. The previous example then becomes even more straightforward:

response.setChunked(true);
flowable.subscribe(response.toSubscriber());
When the Flowable terminates successfully, the adapter invokes the end method.
The adapter sets the WriteStream drain and exception handlers, so don’t use them after subscribing.

The WriteStreamSubscriber adapter is able to invoke callbacks when:

  • the Flowable terminates with an error, or

  • the WriteStream fails (e.g. HTTP connection is closed or filesystem is full), or

  • the WriteStream ends (i.e. all writes done and file is closed), or

  • the WriteStream ends with an error (i.e. all writes done and an error occured when closing the file)

This allows for a more robust program design, as well as scheduling other tasks after the stream has been handled:

response.setChunked(true);

WriteStreamSubscriber<Buffer> subscriber = response.toSubscriber();

subscriber.onError(throwable -> {
  if (!response.headWritten() && response.closed()) {
    response.setStatusCode(500).end("oops");
  } else {
    // log error
  }
});

subscriber.onWriteStreamError(throwable -> {
  // log error
});

subscriber.onWriteStreamEnd(() -> {
  // log end of transaction to audit system...
});

flowable.subscribe(subscriber);
If the WriteStream fails, the adapter cancels the org.reactivestreams.Subscription.

Async result support

You can create an RxJava Observer from an existing Vert.x Handler<AsyncResult<T>> and subscribe it:

Handler<AsyncResult<String>> handler = getHandler();

// Subscribe to a Single
Single.just("hello").subscribe(SingleHelper.toObserver(handler));
Handler<AsyncResult<String>> handler = getHandler();

// Subscribe to a Single
Maybe.just("hello").subscribe(MaybeHelper.toObserver(handler));
Handler<AsyncResult<Void>> handler = getHandler();

// Subscribe to a Single
Completable.complete().subscribe(CompletableHelper.toObserver(handler));

The Rxified Vert.x API duplicates each such method with the rx prefix that returns an RxJava Single, Maybe or Completable:

Single<HttpServer> single = vertx
  .createHttpServer()
  .rxListen(1234, "localhost");

// Subscribe to bind the server
single.
    subscribe(
        server -> {
          // Server is listening
        },
        failure -> {
          // Server could not start
        }
    );

Such single are cold singles, and the corresponding API method is called on subscribe.

Maybe can produce a result or no result:

DnsClient client = vertx.createDnsClient(dnsPort, dnsHost);

// Obtain a maybe that performs the actual reverse lookup on subscribe
Maybe<String> maybe = client.rxReverseLookup(ipAddress);

// Subscribe to perform the lookup
maybe.
  subscribe(
    name -> {
      // Lookup produced a result
    },
    failure -> {
      // Lookup failed
    },
    () -> {
      // Lookup produced no result
    }
  );

Completable is usually mapped to Handler<AsyncResult<Void>>

Completable single = server.rxClose();

// Subscribe to close the server
single.
  subscribe(
    () -> {
      // Server is closed
    },
    failure -> {
      // Server closed but encountered issue
    }
  );

If you cannot use the Vert.x Rxified API or, if you have your own, callback-based, asynchronous methods, Vert.x provides adapters:

Adapting Vert.x core executeBlocking method
Maybe<String> maybe = MaybeHelper.toMaybe(handler -> {
  vertx.executeBlocking(fut -> fut.complete(invokeBlocking()), handler);
});

Scheduler support

The reactive extension sometimes needs to schedule actions, for instance Flowable#timer creates and returns a timer that emit periodic events. By default, scheduled actions are managed by RxJava, it means that the timer threads are not Vert.x threads and therefore not executing in a Vert.x event loop nor on a Vert.x worker thread.

When an RxJava method deals with a scheduler, it accepts an overloaded method accepting an extra io.reactivex.Scheduler, the RxHelper.scheduler method will return a scheduler that can be used in such places.

Scheduler scheduler = RxHelper.scheduler(vertx);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);

For blocking scheduled actions, a scheduler can be created with the RxHelper.blockingScheduler method:

Scheduler scheduler = RxHelper.blockingScheduler(vertx);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);

RxJava can also be reconfigured to use the Vert.x scheduler:

RxJavaPlugins.setComputationSchedulerHandler(s -> RxHelper.scheduler(vertx));
RxJavaPlugins.setIoSchedulerHandler(s -> RxHelper.blockingScheduler(vertx));
RxJavaPlugins.setNewThreadSchedulerHandler(s -> RxHelper.scheduler(vertx));
RxJava uses the words computation for non-blocking tasks and io for blocking tasks which is the opposite of the Vert.x terminology

The Rxified Vert.x API provides also similar method on the RxHelper class:

Scheduler scheduler = RxHelper.scheduler(vertx);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
RxJavaPlugins.setComputationSchedulerHandler(s -> RxHelper.scheduler(vertx));
RxJavaPlugins.setIoSchedulerHandler(s -> RxHelper.blockingScheduler(vertx));
RxJavaPlugins.setNewThreadSchedulerHandler(s -> RxHelper.scheduler(vertx));

It is also possible to create a scheduler backed by a named worker pool. This can be useful if you want to re-use the specific thread pool for scheduling blocking actions:

Scheduler scheduler = RxHelper.blockingScheduler(workerExecutor);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);

Json unmarshalling

The FlowableHelper.unmarshaller creates an io.reactivex.rxjava3.FlowableOperator that transforms an Flowable<Buffer> in json format into an object flowable:

fileSystem.open("/data.txt", new OpenOptions(), result -> {
  AsyncFile file = result.result();
  Flowable<Buffer> observable = FlowableHelper.toFlowable(file);
  observable.compose(FlowableHelper.unmarshaller(MyPojo.class)).subscribe(
      mypojo -> {
        // Process the object
      }
  );
});

The same can be done with the Rxified helper:

fileSystem
  .rxOpen("/data.txt", new OpenOptions())
  .flatMapObservable(file -> file.toObservable())
  .compose(ObservableHelper.unmarshaller((MyPojo.class)))
  .subscribe(mypojo -> {
    // Process the object
  });

Deploying a Verticle

To deploy existing Verticle instances, you can use RxHelper.deployVerticle , it deploys a Verticle and returns an Single<String> of the deployment ID.

Single<String> deployment = RxHelper.deployVerticle(vertx, verticle);

deployment.subscribe(id -> {
  // Deployed
}, err -> {
  // Could not deploy
});

Rxified API

The rxified API is a code generated version of the Vert.x API. The API uses the io.vertx.rxjava3 prefix, for instance the io.vertx.core.Vertx class is translated to the Vertx class.

Rxified API expose Vert.x asynchronous methods in two fashion

  • the original method translated to a RxJava equivalent returning an eager and cached subscription

  • a rx prefixed derived method that invokes the original method at subscription time

// Immediate write
// no need to subscribe
// completion provides the asynchronous result
response.write(buffer);

// No write happened
completion = response.rxWrite(buffer);

// Perform an actual write
completion.subscribe(() -> ..., err -> ...);

You can use the original method or the rxified method depending on your needs, e.g when you don’t want to subscribe or you don’t care of the result you can call the original method.

Embedding Rxfified Vert.x

Just use the Vertx.vertx methods:

Vertx vertx = io.vertx.rxjava3.core.Vertx.vertx();

As a Verticle

Extend the AbstractVerticle class, it will wrap it for you:

class MyVerticle extends AbstractVerticle {
  public void start() {
    // Use Rxified Vertx here
  }
}

Deploying an RxJava verticle is still performed by the Java deployer and does not need a specified deployer.

Verticles having an asynchronous start can override instead the rxStart method and return a Completable:

class MyVerticle extends AbstractVerticle {
  public Completable rxStart() {
    return vertx.createHttpServer()
      .requestHandler(req -> req.response().end("Hello World"))
      .rxListen()
      .ignoreElement();
  }
}

Api examples

Let’s study now a few examples of using Vert.x with RxJava.

EventBus message stream

The event bus MessageConsumer provides naturally an Observable<Message<T>>:

EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.<String>consumer("the-address");
Flowable<Message<String>> flowable = consumer.toFlowable();
Disposable sub = flowable.subscribe(msg -> {
  // Got message
});

// Unregisters the stream after 10 seconds
vertx.setTimer(10000, id -> {
  sub.dispose();
});

The MessageConsumer provides a stream of Message. The body gives access to a new stream of message bodies if needed:

EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.<String>consumer("the-address");
Flowable<String> flowable = consumer.bodyStream().toFlowable();

RxJava map/reduce composition style can then be used:

Flowable<Double> flowable = vertx.eventBus().
    <Double>consumer("heat-sensor").
    bodyStream().
    toFlowable();

flowable.
    buffer(1, TimeUnit.SECONDS).
    map(samples -> samples.
        stream().
        collect(Collectors.averagingDouble(d -> d))).
    subscribe(heat -> {
      vertx.eventBus().send("news-feed", "Current heat is " + heat);
    });

Http client requests

You can easily use the http client to create a request and process the response:

HttpClient client = vertx.createHttpClient();
client.rxRequest(HttpMethod.GET, 8080, "localhost", "/")
  .flatMap(request -> request
    .rxSend()
    .flatMap(response -> {
      if (response.statusCode() == 200) {
        return response.body();
      } else {
        return Single.error(new NoStackTraceThrowable("Invalid response"));
      }
    }))
  .subscribe(body -> {
    // Process the body
  });

When you need to process large streaming response, you can get a Flowable<Buffer> from the http response

HttpClient client = vertx.createHttpClient();
client.rxRequest(HttpMethod.GET, 8080, "localhost", "/")
  .flatMapPublisher(request -> request
    .rxSend()
    .flatMapPublisher(response -> {
      if (response.statusCode() == 200) {
        return response.toFlowable();
      } else {
        return Flowable.error(new NoStackTraceThrowable("Invalid response"));
      }
    }))
  .subscribe(chunk -> {
    // Process the response chunks
  });

You can also use the Vert.x Web Client.

Http server requests

A HttpServerRequest can be adapted to an Observable<Buffer>:

Observable<Buffer> observable = request.toObservable();

ObservableHelper.unmarshaller can be used to parse and map a json request to an object:

Flowable<MyPojo> flowable = request.
  toFlowable().
  compose(FlowableHelper.unmarshaller(MyPojo.class));

WebSocket client

The webSocket provides a single callback when the WebSocket connects, otherwise a failure:

HttpClient client = vertx.createHttpClient(new HttpClientOptions());
client.rxWebSocket(8080, "localhost", "/the_uri").subscribe(
    ws -> {
      // Use the websocket
    },
    error -> {
      // Could not connect
    }
);

The WebSocket can then be turned into an Observable<Buffer> easily:

socketObservable.subscribe(
    socket -> {
      Flowable<Buffer> dataObs = socket.toFlowable();
      dataObs.subscribe(buffer -> {
        System.out.println("Got message " + buffer.toString("UTF-8"));
      });
    }
);

WebSocket server

A ServerWebSocket can be turned into an Observable<Buffer> easily:

socketObservable.subscribe(
    socket -> {
      Flowable<Buffer> dataObs = socket.toFlowable();
      dataObs.subscribe(buffer -> {
        System.out.println("Got message " + buffer.toString("UTF-8"));
      });
    }
);