Vert.x gRPC

The best description of gRPC can be seen at wikipedia.

gRPC is an open source remote procedure call (RPC) system initially developed at Google. It uses HTTP/2 for transport, Protocol Buffers as the interface description language, and provides features such as authentication, bidirectional streaming and flow control, blocking or nonblocking bindings, and cancellation and timeouts. It generates cross-platform client and server bindings for many languages.

— wikipedia
wikipedia

Vert.x gRPC is a module that will align the programming style of Google gRPC with Vert.x style. As a user of this module you will be more familiar with the code style using Vert.x Streams and Futures while benefiting from all the benefits of gRPC.

For more information related to gRPC please consult the official documentation site http://www.grpc.io/.

Warning
Since Vert.x 4.3, this module is the new support for gRPC in the Vert.x stack, the previous implementation based on gRPC Netty is still available and has been renamed Vert.x gRPC Netty, it can be found at https://vertx.io/docs/vertx-grpc-netty/java/ . This module has Tech Preview status, this means the API can change between versions.

Vert.x gRPC is split into three parts:

  • Vert.x gRPC Server

  • Vert.x gRPC Client

  • Vert.x gRPC Context Storage

Vert.x gRPC Server

Vert.x gRPC Server is a new gRPC server powered by Vert.x HTTP server superseding the integrated Netty based gRPC client.

This server provides a gRPC request/response oriented API as well as a the generated stub approach with a service bridge.

Using Vert.x gRPC Server

To use Vert.x gRPC Server, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-grpc-server</artifactId>
 <version>4.5.7</version>
</dependency>
  • Gradle (in your build.gradle file):

dependencies {
 compile 'io.vertx:vertx-grpc-server:4.5.7'
}

gRPC request/response server API

The gRPC request/response server API provides an alternative way to interact with a client without the need of a generated stub.

A GrpcServer is a Handler<HttpServerRequest> and can be used as an HTTP server request handler.

GrpcServer grpcServer = GrpcServer.server(vertx);

HttpServer server = vertx.createHttpServer(options);

server
  .requestHandler(grpcServer)
  .listen();
Tip

A GrpcServer can be used within a Vert.x Web router:

router.consumes("application/grpc").handler(rc -> grpcServer.handle(rc.request()));

Request/response

Each service method is processed by a handler

server.callHandler(GreeterGrpc.getSayHelloMethod(), request -> {

  request.handler(hello -> {

    GrpcServerResponse<HelloRequest, HelloReply> response = request.response();

    HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + hello.getName()).build();

    response.end(reply);
  });
});

Streaming request

You can set handlers to process request events

server.callHandler(StreamingGrpc.getSinkMethod(), request -> {
  request.handler(item -> {
    // Process item
  });
  request.endHandler(v ->{
    // No more items
    // Send the response
    request.response().end(Empty.getDefaultInstance());
  });
  request.exceptionHandler(err -> {
    // Something wrong happened
  });
});

Streaming response

A streaming response involves calling write for each element of the stream and using end to end the stream

server.callHandler(StreamingGrpc.getSourceMethod(), request -> {
  GrpcServerResponse<Empty, Item> response = request.response();
  request.handler(empty -> {
    for (int i = 0;i < 10;i++) {
      response.write(Item.newBuilder().setValue("1").build());
    }
    response.end();
  });
});

Bidi request/response

A bidi request/response is simply the combination of a streaming request and a streaming response

server.callHandler(StreamingGrpc.getPipeMethod(), request -> {

  request.handler(item -> request.response().write(item));
  request.endHandler(v -> request.response().end());
});

Flow control

Request and response are back pressured Vert.x streams.

You can pause/resume/fetch a request

request.pause();

performAsyncOperation().onComplete(ar -> {
  // And then resume
  request.resume();
});

You can check the writability of a response and set a drain handler

if (response.writeQueueFull()) {
  response.drainHandler(v -> {
    // Writable again
  });
} else {
  response.write(item);
}

Compression

You can compress response messages by setting the response encoding prior before sending any message

response.encoding("gzip");

// Write items after encoding has been defined
response.write(Item.newBuilder().setValue("item-1").build());
response.write(Item.newBuilder().setValue("item-2").build());
response.write(Item.newBuilder().setValue("item-3").build());

Decompression

Decompression is done transparently by the server when the client send encoded requests.

Stub API

The Vert.x gRPC Server can bridge a gRPC service to use with a generated server stub in a more traditional fashion

GrpcServer grpcServer = GrpcServer.server(vertx);

GreeterGrpc.GreeterImplBase service = new GreeterGrpc.GreeterImplBase() {
  @Override
  public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
    responseObserver.onNext(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
    responseObserver.onCompleted();
  }
};

// Bind the service bridge in the gRPC server
GrpcServiceBridge serverStub = GrpcServiceBridge.bridge(service);
serverStub.bind(grpcServer);

// Start the HTTP/2 server
vertx.createHttpServer(options)
  .requestHandler(grpcServer)
  .listen();

Message level API

The server provides a message level API to interact directly with protobuf encoded gRPC messages.

Tip
the server message level API can be used with the client message level API to write a gRPC reverse proxy

Such API is useful when you are not interested in the content of the messages, and instead you want to forward them to another service, e.g. you are writing a proxy.

ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");

server.callHandler(request -> {

  if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {

    request.handler(protoHello -> {
      // Handle protobuf encoded hello
      performAsyncOperation(protoHello)
        .onSuccess(protoReply -> {
          // Reply with protobuf encoded reply
          request.response().end(protoReply);
        }).onFailure(err -> {
          request.response()
            .status(GrpcStatus.ABORTED)
            .end();
        });
    });
  } else {
    request.response()
      .status(GrpcStatus.NOT_FOUND)
      .end();
  }
});

You can also set a messageHandler to handle GrpcMessage, such messages preserve the client encoding, which is useful the service you are forwarding to can handle compressed messages directly, in this case the message does not need to be decompressed and compressed again.

ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");

server.callHandler(request -> {

  if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {

    request.messageHandler(helloMessage -> {

      // Can be identity or gzip
      String helloEncoding = helloMessage.encoding();

      // Handle hello message
      handleGrpcMessage(helloMessage)
        .onSuccess(replyMessage -> {
          // Reply with reply message

          // Can be identity or gzip
          String replyEncoding = replyMessage.encoding();

          // Send the reply
          request.response().endMessage(replyMessage);
        }).onFailure(err -> {
          request.response()
            .status(GrpcStatus.ABORTED)
            .end();
        });
    });
  } else {
    request.response()
      .status(GrpcStatus.NOT_FOUND)
      .end();
  }
});

The writeMessage and endMessage will handle the message encoding:

  • when the message uses the response encoding, the message is sent as is

  • when the message uses a different encoding, it will be encoded, e.g. compressed or uncompressed

Vert.x gRPC Client

Vert.x gRPC Client is a new gRPC client powered by Vert.x HTTP client superseding the integrated Netty based gRPC client.

This client provides a gRPC request/response oriented API as well as a the generated stub approach with a gRPC Channel

Using Vert.x gRPC Client

To use Vert.x gRPC Client, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-grpc-client</artifactId>
 <version>4.5.7</version>
</dependency>
  • Gradle (in your build.gradle file):

dependencies {
 compile 'io.vertx:vertx-grpc-client:4.5.7'
}

gRPC request/response client API

The gRPC request/response client API provides an alternative way to interact with a server without the need of a generated stub.

You can easily create the gRPC client

GrpcClient client = GrpcClient.client(vertx);

Request/response

Any interaction with a gRPC server involves creating a request to the remote gRPC service

SocketAddress server = SocketAddress.inetSocketAddress(443, "example.com");
MethodDescriptor<HelloRequest, HelloReply> sayHelloMethod = GreeterGrpc.getSayHelloMethod();
Future<GrpcClientRequest<HelloRequest, HelloReply>> fut = client.request(server, sayHelloMethod);
fut.onSuccess(request -> {
  // The end method calls the service
  request.end(HelloRequest.newBuilder().setName("Bob").build());
});

The response holds the response and the last holds the result

request.response().onSuccess(response -> {
  Future<HelloReply> fut = response.last();
  fut.onSuccess(reply -> {
    System.out.println("Received " + reply.getMessage());
  });
});

Future composition can combine all the previous steps together in a compact fashion

client
  .request(server, GreeterGrpc.getSayHelloMethod()).compose(request -> {
    request.end(HelloRequest
      .newBuilder()
      .setName("Bob")
      .build());
    return request.response().compose(response -> response.last());
  }).onSuccess(reply -> {
    System.out.println("Received " + reply.getMessage());
  });

Streaming request

A streaming request involves calling write for each element of the stream and using end to end the stream

client
  .request(server, StreamingGrpc.getSinkMethod())
  .onSuccess(request -> {
  for (int i = 0;i < 10;i++) {
    request.write(Item.newBuilder().setValue("1").build());
  }
  request.end();
});

Streaming response

You can set handlers to process response events

client
  .request(server, StreamingGrpc.getSourceMethod())
  .compose(request -> {
    request.end(Empty.getDefaultInstance());
    return request.response();
  })
  .onSuccess(response -> {
    response.handler(item -> {
      // Process item
    });
    response.endHandler(v -> {
      // Done
    });
    response.exceptionHandler(err -> {
      // Something went bad
    });
  });

Bidi request/response

A bidi request/response is simply the combination of a streaming request and a streaming response.

Flow control

Request and response are back pressured Vert.x streams.

You can check the writability of a request and set a drain handler

if (request.writeQueueFull()) {
  request.drainHandler(v -> {
    // Writable again
  });
} else {
  request.write(item);
}

You can pause/resume/fetch a response

response.pause();

performAsyncOperation().onComplete(ar -> {
  // And then resume
  response.resume();
});

Cancellation

You can call cancel to cancel a request

request.cancel();
Note
cancellation sends an HTTP/2 reset frame to the server

Compression

You can compress request messages by setting the request encoding prior before sending any message

request.encoding("gzip");

// Write items after encoding has been defined
request.write(Item.newBuilder().setValue("item-1").build());
request.write(Item.newBuilder().setValue("item-2").build());
request.write(Item.newBuilder().setValue("item-3").build());

Decompression

Decompression is done transparently by the client when the server send encoded responses.

Stub API

The Vert.x gRPC Client provides a gRPC channel to use with a generated client stub in a more traditional fashion

GrpcClientChannel channel = new GrpcClientChannel(client, SocketAddress.inetSocketAddress(443, "example.com"));

GreeterGrpc.GreeterStub greeter = GreeterGrpc.newStub(channel);

greeter.sayHello(HelloRequest.newBuilder().setName("Bob").build(), new StreamObserver<HelloReply>() {
  @Override
  public void onNext(HelloReply value) {
    // Process response
  }
  @Override
  public void onCompleted() {
    // Done
  }
  @Override
  public void onError(Throwable t) {
    // Something went bad
  }
});

Message level API

The client provides a message level API to interact directly with protobuf encoded gRPC messages.

Tip
the client message level API can be used with the server message level API to write a gRPC reverse proxy

Such API is useful when you are not interested in the content of the messages, and instead you want to forward them to another service, e.g. you are writing a proxy.

Future<GrpcClientRequest<Buffer, Buffer>> requestFut = client.request(server);

requestFut.onSuccess(request -> {

  // Set the service name and the method to call
  request.serviceName(ServiceName.create("helloworld", "Greeter"));
  request.methodName("SayHello");

  // Send the protobuf request
  request.end(protoHello);

  // Handle the response
  Future<GrpcClientResponse<Buffer, Buffer>> responseFut = request.response();
  responseFut.onSuccess(response -> {
    response.handler(protoReply -> {
      // Handle the protobuf reply
    });
  });
});

You can also set a messageHandler to handle GrpcMessage, such messages preserve the server encoding.

Future<GrpcClientRequest<Buffer, Buffer>> requestFut = client.request(server);

requestFut.onSuccess(request -> {

  // Set the service name and the method to call
  request.serviceName(ServiceName.create("helloworld", "Greeter"));
  request.methodName("SayHello");

  // Send the protobuf request
  request.endMessage(GrpcMessage.message("identity", protoHello));

  // Handle the response
  Future<GrpcClientResponse<Buffer, Buffer>> responseFut = request.response();
  responseFut.onSuccess(response -> {
    response.messageHandler(replyMessage -> {
      System.out.println("Got reply message encoded as " + replyMessage.encoding());
    });
  });
});

The writeMessage and endMessage will handle the message encoding:

  • when the message uses the response encoding, the message is sent as is

  • when the message uses a different encoding, it will be encoded, e.g. compressed or uncompressed

Vert.x gRPC Protoc Plugin

The easiest way to start using vertx-grpc is to utilize its built-in code generator plugin. To do so, one must define the protocol in the protobuffer format as required by gRPC.

syntax = "proto3";

option java_multiple_files = true;
option java_package = "examples";
option java_outer_classname = "HelloWorldProto";
package helloworld;

// The greeting service definition.
service Greeter {
 // Sends a greeting
 rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
 string name = 1;
}

// The response message containing the greetings
message HelloReply {
 string message = 1;
}

This is a very simple example showing the single request, single response mode.

Compile the RPC definition

Using the definition above we need to compile it.

You can compile the proto file using the protoc compiler if you like or you can integrate it in your build.

If you’re using Apache Maven you need to add the plugin:

<plugin>
 <groupId>org.xolstice.maven.plugins</groupId>
 <artifactId>protobuf-maven-plugin</artifactId>
 <version>0.6.1</version>
 <configuration>
   <protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
   <pluginId>grpc-java</pluginId>
   <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
   <protocPlugins>
     <protocPlugin>
       <id>vertx-grpc-protoc-plugin2</id>
       <groupId>io.vertx</groupId>
       <artifactId>vertx-grpc-protoc-plugin2</artifactId>
       <version>${stack.version}</version>
       <mainClass>io.vertx.grpc.plugin.VertxGrpcGenerator</mainClass>
     </protocPlugin>
   </protocPlugins>
 </configuration>
 <executions>
   <execution>
     <id>compile</id>
     <configuration>
       <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
       <clearOutputDirectory>false</clearOutputDirectory>
     </configuration>
     <goals>
       <goal>compile</goal>
       <goal>compile-custom</goal>
     </goals>
   </execution>
 </executions>
</plugin>

The ${os.detected.classifier} property is used to make the build OS independant, on OSX it is replaced by osx-x86_64 and so on. To use it you need to add the os-maven-plugin[https://github.com/trustin/os-maven-plugin] in the build section of your pom.xml:

<build>
 ...
 <extensions>
   <extension>
     <groupId>kr.motd.maven</groupId>
     <artifactId>os-maven-plugin</artifactId>
     <version>1.4.1.Final</version>
   </extension>
 </extensions>
 ...
</build>

This plugin will compile your proto files under src/main/proto and make them available to your project.

If you’re using Gradle you need to add the plugin:

...
apply plugin: 'com.google.protobuf'
...
buildscript {
 ...
 dependencies {
   // ASSUMES GRADLE 2.12 OR HIGHER. Use plugin version 0.7.5 with earlier gradle versions
   classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.0'
 }
}
...
protobuf {
 protoc {
   artifact = 'com.google.protobuf:protoc:3.2.0'
 }
 plugins {
   grpc {
     artifact = "io.grpc:protoc-gen-grpc-java:1.25.0"
   }
   vertx {
     artifact = "io.vertx:vertx-grpc-protoc-plugin2:${vertx.grpc.version}"
   }
 }
 generateProtoTasks {
   all()*.plugins {
     grpc
     vertx
   }
 }
}

This plugin will compile your proto files under build/generated/source/proto/main and make them available to your project.

Writing a service

The plugin generates vertx idiomatic Vert.x service stubs.

Each service comes in two flavors, you can override the method you like depending on the style.

Unary services can return a Vert.x Future:

VertxGreeterGrpcServer.GreeterApi stub = new VertxGreeterGrpcServer.GreeterApi() {
 @Override
 public Future<HelloReply> sayHello(HelloRequest request) {
   return Future.succeededFuture(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
 }
};

or process a Vert.x Promise

VertxGreeterGrpcServer.GreeterApi stub = new VertxGreeterGrpcServer.GreeterApi() {
 @Override
 public void sayHello(HelloRequest request, Promise<HelloReply> response) {
   response.complete(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
 }
};

In both case you need to bind the stub to an existing GrpcServer:

server.bindAll(stub);

Streaming requests

Streaming requests are implemented with Vert.x ReadStream:

VertxStreamingGrpcServer.StreamingApi stub = new VertxStreamingGrpcServer.StreamingApi() {
 @Override
 public void sink(ReadStream<Item> stream, Promise<Empty> response) {
   stream.handler(item -> {
     System.out.println("Process item " + item.getValue());
   });
   // Send response
   stream.endHandler(v -> response.complete(Empty.getDefaultInstance()));
 }
};

Streaming responses

Streaming responses are implemented with Vert.x streams and comes in two flavors.

You can return a Vert.x ReadStream and let the service send it for you:

VertxStreamingGrpcServer.StreamingApi stub = new VertxStreamingGrpcServer.StreamingApi() {
 @Override
 public ReadStream<Item> source(Empty request) {
   return streamOfItem();
 }
};

or you can process a WriteStream:

VertxStreamingGrpcServer.StreamingApi stub = new VertxStreamingGrpcServer.StreamingApi() {
 @Override
 public void source(Empty request, WriteStream<Item> response) {
   response.write(Item.newBuilder().setValue("value-1").build());
   response.end(Item.newBuilder().setValue("value-2").build());
 }
};

Writing a client

The plugin generates Vert.x service clients.

A client wraps a GrpcClient and provides Vert.x idiomatic API to interact with the service:

VertxGreeterGrpcClient client = new VertxGreeterGrpcClient(grpcClient, server);

Unary services returns a Vert.x Future

Future<HelloReply> response = client.sayHello(HelloRequest.newBuilder().setName("John").build());

response.onSuccess(result -> System.out.println("Service responded: " + ar.result().getMessage()));

response.onFailure(err -> System.out.println("Service failure: " + ar.cause().getMessage()));

Streaming requests

Streaming requests use a lambda passed a Vert.x WriteStream of messages sent to the service

Future<Empty> response = client.sink(stream -> {
 stream.write(Item.newBuilder().setValue("Value 1").build());
 stream.write(Item.newBuilder().setValue("Value 2").build());
 stream.end(Item.newBuilder().setValue("Value 3").build());
});

Streaming responses

Streaming responses get a Vert.x ReadStream of messages sent by the service

Future<ReadStream<Item>> response = client.source(Empty.getDefaultInstance());

response.onSuccess(stream -> stream
 .handler(item -> System.out.println("Item " + item.getValue()))
 .exceptionHandler(err -> System.out.println("Stream failed " + err.getMessage()));
 .endHandler(v -> System.out.println("Stream ended")));

response.onFailure(err -> System.out.println("Service failure: " + ar.cause().getMessage()));

Vert.x gRPC Context Storage

Vert.x gRPC Context Storage overrides the default io.grpc.Context.Storage implementation.

The default implementation always stores the gRPC context in a thread-local variable. This implementation stores the gRPC context the same way as Vert.x core stores request tracing data.

This means, for example, that when you implement a service method, the gRPC context is propagated across Vert.x async API calls:

Context grpcCtx1 = Context.current();

vertx.<String>executeBlocking(prom -> {

  // Same as grpcCtx1
  Context grpcCtx2 = Context.current();

  String result = doSomething();
  prom.complete(result);

}, ar -> {

  // Same as grpcCtx1 and grpcCtx2
  Context grpcCtx3 = Context.current();

});
Caution

The gRPC context is propagated across Vert.x async API calls only when the current Vert.x Context is bound to a Vert.x HTTP server request.

It is not propagated if, for example, you invoke a stub on a non-Vert.x thread or from a verticle start method.

Using Vert.x gRPC Context Storage

To use Vert.x gRPC Context Storage, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-grpc-context-storage</artifactId>
 <version>4.5.7</version>
</dependency>
  • Gradle (in your build.gradle file):

dependencies {
 compile 'io.vertx:vertx-grpc-context-storage:4.5.7'
}