You are currently viewing the documentation for the unreleased version 5.0.0.CR6 of Vert.x. Visit the latest stable version of this page.

Vert.x gRPC

The best description of gRPC can be seen at wikipedia.

gRPC is an open source remote procedure call (RPC) system initially developed at Google. It uses HTTP/2 for transport, Protocol Buffers as the interface description language, and provides features such as authentication, bidirectional streaming and flow control, blocking or nonblocking bindings, and cancellation and timeouts. It generates cross-platform client and server bindings for many languages.

— Wikipedia

Vert.x gRPC is a module that will align the programming style of gRPC with Vert.x style. As a user of this module you will be more familiar with the code style using Vert.x Streams and Futures while benefiting from all the benefits of gRPC.

For more information related to gRPC please consult the official documentation site http://www.grpc.io/.

Vert.x gRPC is split into several parts:

  • Vert.x gRPC Server

  • Vert.x gRPC Client

  • Vert.x gRPC/IO Server

  • Vert.x gRPC/IO Client

  • Vert.x gRPC/IO Context Storage

Vert.x gRPC Protoc Plugin 2

The easiest way to start using Vert.x gRPC is to utilize its built-in code generator plugin. To do so, one must define the protocol in the protobuffer format as required by gRPC.

syntax = "proto3";

option java_multiple_files = true;
option java_package = "examples";
option java_outer_classname = "HelloWorldProto";
package helloworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

This is a very simple example showing the single request, single response mode.

Compile the RPC definition

Using the definition above we need to compile it.

You can compile the proto file using the protoc compiler if you like, or you can integrate it in your build.

If you’re using Apache Maven you need to add the plugin:

<plugin>
  <groupId>org.xolstice.maven.plugins</groupId>
  <artifactId>protobuf-maven-plugin</artifactId>
  <version>0.6.1</version>
  <configuration>
    <protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
    <pluginId>grpc-java</pluginId>
    <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
    <protocPlugins>
      <protocPlugin>
        <id>vertx-grpc-protoc-plugin2</id>
        <groupId>io.vertx</groupId>
        <artifactId>vertx-grpc-protoc-plugin2</artifactId>
        <version>${stack.version}</version>
        <mainClass>io.vertx.grpc.plugin.VertxGrpcGenerator</mainClass>
      </protocPlugin>
    </protocPlugins>
  </configuration>
  <executions>
    <execution>
      <id>compile</id>
      <configuration>
        <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
        <clearOutputDirectory>false</clearOutputDirectory>
      </configuration>
      <goals>
        <goal>compile</goal>
        <goal>compile-custom</goal>
      </goals>
    </execution>
  </executions>
</plugin>

With io.vertx.grpc.plugin.VertxGrpcGenerator, the plugin generates both the client and the server files. If you need only one side, use either io.vertx.grpc.plugin.VertxGrpcClientGenerator or io.vertx.grpc.plugin.VertxGrpcServiceGenerator.

The ${os.detected.classifier} property is used to make the build OS independent, on OSX it is replaced by osx-x86_64 and so on. To use it you need to add the os-maven-plugin[https://github.com/trustin/os-maven-plugin] in the build section of your pom.xml:

<build>
  ...
  <extensions>
    <extension>
      <groupId>kr.motd.maven</groupId>
      <artifactId>os-maven-plugin</artifactId>
      <version>1.4.1.Final</version>
    </extension>
  </extensions>
  ...
</build>

This plugin will compile your proto files under src/main/proto and make them available to your project.

If you’re using Gradle you need to add the plugin:

...
apply plugin: 'com.google.protobuf'
...
buildscript {
  ...
  dependencies {
    // ASSUMES GRADLE 2.12 OR HIGHER. Use plugin version 0.7.5 with earlier gradle versions
    classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.0'
  }
}
...
protobuf {
  protoc {
    artifact = 'com.google.protobuf:protoc:3.2.0'
  }
  plugins {
    grpc {
      artifact = "io.grpc:protoc-gen-grpc-java:1.25.0"
    }
    vertx {
      artifact = "io.vertx:vertx-grpc-protoc-plugin2:${vertx.grpc.version}"
    }
  }
  generateProtoTasks {
    all()*.plugins {
      grpc
      vertx
    }
  }
}

This plugin will compile your proto files under build/generated/source/proto/main and make them available to your project.

Generated RPC files

For each service definition, the plugin creates two Java RPC files.

For the Greeter service:

  • examples/GreeterGrpcClient.java

  • examples/GreeterGrpcService.java

Besides the usual client/server generated code, these files contains service method constants:

public class GreeterGrpcClient {
  public static final ServiceMethod<examples.HelloReply, examples.HelloRequest> SayHello = ServiceMethod.client(
    ServiceName.create("helloworld", "Greeter"),
    "SayHello",
    GrpcMessageEncoder.encoder(),
    GrpcMessageDecoder.decoder(examples.HelloReply.parser())
  );
  // ...
}

public class GreeterGrpcServer {
  public static final ServiceMethod<examples.HelloRequest, examples.HelloReply> SayHello = ServiceMethod.server(
    ServiceName.create("helloworld", "Greeter"),
    "SayHello",
    GrpcMessageEncoder.encoder(),
    GrpcMessageDecoder.decoder(examples.HelloRequest.parser())
  );
  // ...
}

For each service method, a public static final ServiceMethod is generated, these constants provide everything Vert.x gRPC needs to know to interact with gRPC.

  • the service name: /helloworld.Greeter

  • the service method name: SayHello

  • the message decoder

  • the message encoder

They can be used to bind services or interact with a remote server.

Generate RxJava client wrapper

The plugin generates a client service interface annotated with Vert.x codegen annotations.

@io.vertx.codegen.annotations.VertxGen
public interface GreeterClient {
  ...
}

Therefore, a Vert.x codegen processor can process it, as such a Vert.x RxJava generator will generate an RxJava client wrapper with idiomatic RxJava API.

Here is a Maven configuration example:

<plugin>
  <artifactId>maven-compiler-plugin</artifactId>
  <executions>
    <execution>
      <id>default-compile</id>
      <configuration>
        <annotationProcessorPaths>
          <annotationProcessorPath>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-codegen</artifactId>
            <classifier>processor</classifier>
            <version>${vertx.version}</version>
          </annotationProcessorPath>
          <annotationProcessorPath>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-rx-java3-gen</artifactId>
            <version>${vertx.version}</version>
          </annotationProcessorPath>
        </annotationProcessorPaths>
      </configuration>
    </execution>
  </executions>
</plugin>

The generated can be then used

// Use the RxJava version
GreeterClient client = io.grpc.examples.rxjava3.helloworld.GreeterClient.create(grpcClient, SocketAddress.inetSocketAddress(8080, "localhost"));

// Get a Single instead of a Future
Single<HelloReply> reply = client.sayHello(HelloRequest.newBuilder().setName("World").build());

Flowable<HelloRequest> requestStream = Flowable.just("World", "Monde", "Mundo")
      .map(name -> HelloRequest.newBuilder().setName(name).build());

// Use Flowable instead of Vert.x streams
Flowable<String> responseStream = client
  .sayHelloStreaming(stream)
  .map(HelloReply::getMessage);
this uses RxJava version of GrpcClient

Generate transcoding definitions

The plugin can also generate transcoding definitions for the gRPC services. For more information see the Transcoding section.

Generate transcoding definitions

Currently, the plugin supports generating transcoding definitions for the gRPC services via http.proto. This feature is enabled by default.

Example of the gRPC transcoding definition:

syntax = "proto3";

import "google/api/http.proto";

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply) {
    option (google.api.http) = {
      post: "/v1/hello"
    };
  }

  rpc SayHelloAgain (HelloRequest) returns (HelloReply) {
    option (google.api.http) = {
      post: "/v1/hello/{name}"
    };
  }
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

To test if the transcoding is working correctly, you can use the curl command:

curl -X POST -H "Content-Type: application/json" -d '{"name":"vert.x"}' http://localhost:8080/v1/hello

And for the SayHelloAgain method:

curl -X POST -H "Content-Type: application/json" http://localhost:8080/v1/hello/vert.x

HTTP Transcoding Options

The protoc plugin automatically generates transcoding options for service methods that use the google.api.http annotation.

Overview

The google.api.http annotation allows you to map gRPC methods to HTTP endpoints, enabling your service to handle both gRPC and HTTP/REST requests. The plugin supports various HTTP methods (GET, POST) and custom methods.

Examples

Basic HTTP Mappings
service Greeter {
  // Maps a GET endpoint with a URL parameter
  rpc SayHello (HelloRequest) returns (HelloReply) {
    option (google.api.http) = {
      get: "/v1/hello/{name}"
      additional_bindings {
        post: "/v1/hello"  // Alternative POST endpoint
      }
    };
  }

  // Maps a POST endpoint with an alternative GET binding
  rpc SayHelloAgain (HelloRequest) returns (HelloReply) {
    option (google.api.http) = {
      post: "/v2/hello"
      additional_bindings {
        get: "/v2/hello/{name}"
      }
    };
  }
}

Advanced Configurations

Custom Methods
service Greeter {
  // Define custom HTTP methods
  rpc SayHelloCustom (HelloRequest) returns (HelloReply) {
    option (google.api.http) = {
      custom: {
        kind: "ACL"
        path: "/v1/hello/custom/{name}"
      }
    };
  }
}
Request Body Handling
service Greeter {
  // Specify which field should be mapped to the HTTP request body
  rpc SayHelloWithBody (HelloBodyRequest) returns (HelloReply) {
    option (google.api.http) = {
      post: "/v1/hello/body"
      body: "request"  // Maps the "request" field to the request body
    };
  }
}

message HelloBodyRequest {
  HelloRequest request = 1;
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string reply = 1;
}
Response Body Mapping
service Greeter {
  // Configure which field should be used as the HTTP response body
  rpc SayHelloWithResponseBody (HelloRequest) returns (HelloBodyResponse) {
    option (google.api.http) = {
      post: "/v1/hello/body/response"
      response_body: "response"  // Maps the "response" field to the response body
    };
  }
}

message HelloRequest {
  string name = 1;
}

message HelloBodyResponse {
  HelloResponse response = 1;
}

message HelloResponse {
  string reply = 1;
}

Vert.x gRPC Server

Vert.x gRPC Server is a gRPC server powered by Vert.x HTTP server superseding the integrated Netty based gRPC client.

This server provides a gRPC request/response oriented API as well as a generated stub approach with the Vert.x gRPC Generator.

Using Vert.x gRPC Server

To use Vert.x gRPC Server, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpc-server</artifactId>
  <version>5.0.0.CR6</version>
</dependency>
  • Gradle (in your build.gradle file):

dependencies {
  compile 'io.vertx:vertx-grpc-server:5.0.0.CR6'
}

Creating a gRPC server

A GrpcServer is a Handler<HttpServerRequest> and can be used as an HTTP server request handler.

GrpcServer grpcServer = GrpcServer.server(vertx);

HttpServer server = vertx.createHttpServer(options);

server
  .requestHandler(grpcServer)
  .listen();

A GrpcServer can be used within a Vert.x Web router:

route.consumes("application/grpc").handler(rc -> grpcServer.handle(rc.request()));

HTTP/2 protocol

The default protocol served by the Vert.x gRPC server is HTTP/2, messages are exchanged in protobuf wire format.

In addition, Vert.x gRPC server also supports gRPC-Web protocol and HTTP/JSON transcoding.

gRPC-Web protocol

The Vert.x gRPC Server supports the gRPC-Web protocol by default.

To disable the gRPC-Web protocol support, configure options with GrpcServerOptions#setGrpcWebEnabled(false) and then create a server with GrpcServer#server(vertx, options).

If your website server and the gRPC server are different, you have to configure the gRPC server for CORS. This can be done with a Vert.x Web router and the CORS handler:

CorsHandler corsHandler = CorsHandler.create()
  .addRelativeOrigin("https://www.mycompany.com")
  .allowedHeaders(Set.of("keep-alive","user-agent","cache-control","content-type","content-transfer-encoding","x-custom-key","x-user-agent","x-grpc-web","grpc-timeout"))
  .exposedHeaders(Set.of("x-custom-key","grpc-status","grpc-message"));
router.route("/com.mycompany.MyService/*").handler(corsHandler);

gRPC Transcoding

gRPC transcoding is a feature that enables mapping between HTTP/JSON requests and gRPC services.

gRPC transcoding allows your services to accept both gRPC and HTTP/JSON requests, providing greater flexibility. This feature is particularly useful when:

  • You want to expose your gRPC services to clients that don’t support gRPC

  • You need to support both traditional REST APIs and gRPC endpoints

  • You want to leverage gRPC’s efficiency while maintaining HTTP/JSON compatibility

You do not need specific configuration for gRPC transcoding since it is serves HTTP protocol. However, services must be deployed with additional configuration such as the mount path to have transcoding operational.

transcoding is in tech preview in Vert.x 5.0 until the API becomes stable.

Transcoding error handling

If an error occurs during transcoding, the server will return an HTTP error response with the appropriate status code. Most grpc status codes are mapped to the corresponding HTTP status codes on best effort basis. If the status code is not mapped, the server will return a 500 Internal Server Error.

gRPC Status Code

HTTP Status Code

Description

OK

200

The operation completed successfully.

CANCELLED

408

The operation was cancelled (typically by the caller).

UNKNOWN

500

Unknown error.

INVALID_ARGUMENT

400

Client specified an invalid argument.

DEADLINE_EXCEEDED

504

Deadline expired before operation could complete.

NOT_FOUND

404

Some requested entity (e.g., file or directory) was not found.

ALREADY_EXISTS

409

Some entity that we attempted to create (e.g., file or directory) already exists.

PERMISSION_DENIED

403

The caller does not have permission to execute the specified operation.

RESOURCE_EXHAUSTED

429

Some resource has been exhausted, perhaps a per-user quota, or perhaps the entire file system is out of space.

FAILED_PRECONDITION

400

Operation was rejected because the system is not in a state required for the operation’s execution

ABORTED

409

The operation was aborted, typically due to a concurrency issue like sequencer check failures, transaction aborts, etc.

OUT_OF_RANGE

400

Operation was attempted past the valid range.

UNIMPLEMENTED

501

Operation is not implemented or not supported/enabled in this service.

INTERNAL

500

Internal errors. This means that some invariants expected by the underlying system have been broken.

UNAVAILABLE

503

The service is currently unavailable.

DATA_LOSS

500

Unrecoverable data loss or corruption.

UNAUTHENTICATED

401

The request does not have valid authentication credentials for the operation.

Server request/response API

The gRPC request/response server API provides an alternative way to interact with a client without the need of extending a Java class.

Request/response

Each service method is processed by a handler, the handler is bound using a ServiceMethod.

ServiceMethod<HelloRequest, HelloReply> serviceMethod = GreeterGrpcService.SayHello;

server.callHandler(serviceMethod, request -> {

  request.handler(hello -> {

    GrpcServerResponse<HelloRequest, HelloReply> response = request.response();

    HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + hello.getName()).build();

    response.end(reply);
  });
});

ServiceMethod constants are generated by the Vert.x gRPC protoc plugin.

Streaming request

You can set handlers to process request events

server.callHandler(StreamingGrpcService.Sink, request -> {
  request.handler(item -> {
    // Process item
  });
  request.endHandler(v ->{
    // No more items
    // Send the response
    request.response().end(Empty.getDefaultInstance());
  });
  request.exceptionHandler(err -> {
    // Something wrong happened
  });
});

Streaming response

A streaming response involves calling write for each element of the stream and using end to end the stream

server.callHandler(StreamingGrpcService.Source, request -> {
  GrpcServerResponse<Empty, Item> response = request.response();
  request.handler(empty -> {
    for (int i = 0;i < 10;i++) {
      response.write(Item.newBuilder().setValue("1").build());
    }
    response.end();
  });
});

Bidi request/response

A bidi request/response is simply the combination of a streaming request and a streaming response

server.callHandler(StreamingGrpcService.Pipe, request -> {

  request.handler(item -> request.response().write(item));
  request.endHandler(v -> request.response().end());
});
The gRPC-Web protocol does not support bidirectional streaming.

Flow control

Request and response are back pressured Vert.x streams.

You can pause/resume/fetch a request

request.pause();

performAsyncOperation().onComplete(ar -> {
  // And then resume
  request.resume();
});

You can check the writability of a response and set a drain handler

if (response.writeQueueFull()) {
  response.drainHandler(v -> {
    // Writable again
  });
} else {
  response.write(item);
}

Timeout and deadlines

The gRPC server handles timeout and deadlines.

Whenever the service receives a request indicating a timeout, the timeout can be retrieved.

long timeout = request.timeout();

if (timeout > 0L) {
  // A timeout has been received
}

By default, the server

  • does not schedule automatically a deadline for a given request

  • does not automatically propagate the deadline to a vertx client

The server can schedule deadlines: when a request carries a timeout, the server schedules locally a timer to cancel the request when the response has not been sent in time.

The server can propagate deadlines: when a request carries a timeout, the server calculate the deadline and associate the current server request with this deadline. Vert.x gRPC client can use this deadline to compute a timeout to be sent and cascade the timeout to another gRPC server.

GrpcServer server = GrpcServer.server(vertx, new GrpcServerOptions()
  .setScheduleDeadlineAutomatically(true)
  .setDeadlinePropagation(true)
);

JSON wire format

gRPC implicitly assumes the usage of the Protobuf wire format.

The Vert.x gRPC server supports the JSON wire format as well.

You can use a JSON service method to bind a service method accepting requests carrying the application/grpc+json content-type.

server.callHandler(GreeterGrpcService.Json.SayHello, request -> {
  request.last().onSuccess(helloRequest -> {
    request.response().end(HelloReply.newBuilder()
      .setMessage("Hello " + helloRequest.getName()).build()
    );
  });
});

The com.google.protobuf:protobuf-java-util library performs the JSON encoding/decoding.

the same service method can be bound twice with Protobuf and JSON wire formats.

Anemic JSON is also supported with Vert.x JsonObject

ServiceMethod<JsonObject, JsonObject> sayHello = ServiceMethod.server(
  ServiceName.create("helloworld", "Greeter"),
  "SayHello",
  GrpcMessageEncoder.JSON_OBJECT,
  GrpcMessageDecoder.JSON_OBJECT
);

server.callHandler(sayHello, request -> {
  request.last().onSuccess(helloRequest -> {
    request.response().end(new JsonObject().put("message", "Hello " + helloRequest.getString("name")));
  });
});

Compression

You can compress response messages by setting the response encoding prior before sending any message

response.encoding("gzip");

// Write items after encoding has been defined
response.write(Item.newBuilder().setValue("item-1").build());
response.write(Item.newBuilder().setValue("item-2").build());
response.write(Item.newBuilder().setValue("item-3").build());
Compression is not supported over the gRPC-Web protocol.

Decompression

Decompression is done transparently by the server when the client send encoded requests.

Decompression is not supported over the gRPC-Web protocol.

Transcoding

A service method is processed by a handler, the handler is bound using a TranscodingServiceMethod.

TranscodingServiceMethod<HelloRequest, HelloReply> serviceMethod = GreeterGrpcService.Transcoding.SayHello;

// Register the handler with transcoding options
server.callHandler(serviceMethod, request -> {

    request.handler(hello -> {

      GrpcServerResponse<HelloRequest, HelloReply> response = request.response();

      HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + hello.getName()).build();

      response.end(reply);
    });
  }
);

return server;

Transcoding service methods are generated by the Vert.x gRPC protoc plugin, when the service declares an HttpRule:

syntax = "proto3";

import "google/api/annotations.proto";

option java_multiple_files = true;
option java_package = "examples";
option java_outer_classname = "HelloWorldProto";

package helloworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {
    option (google.api.http) = {
      get: "/v1/hello/{name}"
    };
  }
}

To test gRPC transcoding, you can use a tool like curl to send HTTP requests to your gRPC service.

For example, to send a GET request to the /v1/hello/Steve endpoint:

curl -X GET http://localhost:8080/v1/hello/Steve

Message level API

The server provides a message level API to interact directly with protobuf encoded gRPC messages.

the server message level API can be used with the client message level API to write a gRPC reverse proxy

Such API is useful when you are not interested in the content of the messages, and instead you want to forward them to another service, e.g. you are writing a proxy.

ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");

server.callHandler(request -> {

  if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {

    request.handler(protoHello -> {
      // Handle protobuf encoded hello
      performAsyncOperation(protoHello)
        .onSuccess(protoReply -> {
          // Reply with protobuf encoded reply
          request.response().end(protoReply);
        }).onFailure(err -> {
          request.response()
            .status(GrpcStatus.ABORTED)
            .end();
        });
    });
  } else {
    request.response()
      .status(GrpcStatus.NOT_FOUND)
      .end();
  }
});

You can also set a messageHandler to handle GrpcMessage, such messages preserve the client encoding, which is useful the service you are forwarding to can handle compressed messages directly, in this case the message does not need to be decompressed and compressed again.

ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");

server.callHandler(request -> {

  if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {

    request.messageHandler(helloMessage -> {

      // Can be identity or gzip
      String helloEncoding = helloMessage.encoding();

      // Handle hello message
      handleGrpcMessage(helloMessage)
        .onSuccess(replyMessage -> {
          // Reply with reply message

          // Can be identity or gzip
          String replyEncoding = replyMessage.encoding();

          // Send the reply
          request.response().endMessage(replyMessage);
        }).onFailure(err -> {
          request.response()
            .status(GrpcStatus.ABORTED)
            .end();
        });
    });
  } else {
    request.response()
      .status(GrpcStatus.NOT_FOUND)
      .end();
  }
});

The writeMessage and endMessage will handle the message encoding:

  • when the message uses the response encoding, the message is sent as is

  • when the message uses a different encoding, it will be encoded, e.g. compressed or uncompressed

Server stub API

In addition to the request/response API, the Vert.x gRPC protoc plugin idiomatic service stubs.

Each service method comes in two flavors, you can override the method you like depending on the style.

Unary methods

Unary methods can return a Vert.x Future

GreeterService stub = new GreeterService() {
  @Override
  public Future<HelloReply> sayHello(HelloRequest request) {
    return Future.succeededFuture(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
  }
};

or process a Vert.x Promise

GreeterService stub = new GreeterService() {
  @Override
  public void sayHello(HelloRequest request, Promise<HelloReply> response) {
    response.complete(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
  }
};

In both case you need to bind the stub to an existing GrpcServer

server.addService(stub);

By default, only Protobuf methods are bound in a server.

You can also specify the JSON wire format when binding a stub.

server.addService(stub
  .builder()
  .bind(GreeterGrpcService.Json.all())
  .build());

The server will accept the application/grpc+json requests.

Streaming requests

Streaming requests are implemented with a ReadStream:

StreamingGrpcService stub = new StreamingGrpcService() {
  @Override
  public void sink(ReadStream<Item> stream, Promise<Empty> response) {
    stream.handler(item -> {
      System.out.println("Process item " + item.getValue());
    });
    // Send response
    stream.endHandler(v -> response.complete(Empty.getDefaultInstance()));
  }
};
server.addService(stub);

Streaming responses

Streaming responses are implemented with Vert.x streams and comes in two flavors.

You can return a Vert.x ReadStream and let the service send it for you:

StreamingService stub = new StreamingService() {
  @Override
  public Future<ReadStream<Item>> source(Empty request) {
    return streamOfItems();
  }
};

or you can process a WriteStream:

StreamingService stub = new StreamingService() {
  @Override
  public void source(Empty request, WriteStream<Item> response) {
    response.write(Item.newBuilder().setValue("value-1").build());
    response.end(Item.newBuilder().setValue("value-2").build());
  }
};

gRPC Reflection APIs

Support for the gRPC reflection APIs can be added to your Vert.x gRPC Server.

GrpcServer grpcServer = GrpcServer.server(vertx);

// Add reflection service
grpcServer.addService(ReflectionService.v1());

GreeterGrpcService greeterService = new GreeterGrpcService() {
  @Override
  public Future<HelloReply> sayHello(HelloRequest request) {
    return Future.succeededFuture(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
  }
};

grpcServer.addService(greeterService);

// Start the HTTP/2 server
vertx.createHttpServer(options)
  .requestHandler(grpcServer)
  .listen();

Vert.x gRPC Client

Vert.x gRPC Client is a gRPC client powered by Vert.x HTTP client.

This client provides a gRPC request/response oriented API as well as a generated stub approach with a gRPC Channel

Using Vert.x gRPC Client

To use Vert.x gRPC Client, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpc-client</artifactId>
  <version>5.0.0.CR6</version>
</dependency>
  • Gradle (in your build.gradle file):

dependencies {
  compile 'io.vertx:vertx-grpc-client:5.0.0.CR6'
}

Creating a gRPC client

You can easily create the gRPC client

GrpcClient client = GrpcClient.client(vertx);

Client request/response API

The gRPC request/response client API provides an alternative way to interact with a server without the need of a generated stub.

Request/response

Interacting with a gRPC server involves creating a request to the remote gRPC service

SocketAddress server = SocketAddress.inetSocketAddress(443, "example.com");
ServiceMethod<HelloReply, HelloRequest> sayHelloMethod = GreeterGrpcClient.SayHello;
Future<GrpcClientRequest<HelloRequest, HelloReply>> fut = client.request(server, sayHelloMethod);
fut.onSuccess(request -> {
  // The end method calls the service
  request.end(HelloRequest.newBuilder().setName("Bob").build());
});

ServiceMethod constants are generated by the Vert.x gRPC protoc plugin.

The response holds the response and the last holds the result

request.response().onSuccess(response -> {
  Future<HelloReply> fut = response.last();
  fut.onSuccess(reply -> {
    System.out.println("Received " + reply.getMessage());
  });
});

Future composition can combine all the previous steps together in a compact fashion

client
  .request(server, GreeterGrpcClient.SayHello).compose(request -> {
    request.end(HelloRequest
      .newBuilder()
      .setName("Bob")
      .build());
    return request.response().compose(response -> response.last());
  }).onSuccess(reply -> {
    System.out.println("Received " + reply.getMessage());
  });

Streaming request

Streaming requests involve calling write for each element of the stream and using end to end the stream

client
  .request(server, StreamingGrpcClient.Sink)
  .onSuccess(request -> {
  for (int i = 0;i < 10;i++) {
    request.write(Item.newBuilder().setValue("1").build());
  }
  request.end();
});

Streaming response

You can set handlers to process response events of a streaming response

client
  .request(server, StreamingGrpcClient.Source)
  .compose(request -> {
    request.end(Empty.getDefaultInstance());
    return request.response();
  })
  .onSuccess(response -> {
    response.handler(item -> {
      // Process item
    });
    response.endHandler(v -> {
      // Done
    });
    response.exceptionHandler(err -> {
      // Something went bad
    });
  });

Bidi request/response

A bidi request/response is simply the combination of a streaming request and a streaming response.

Flow control

Request and response are back pressured Vert.x streams.

You can check the writability of a request and set a drain handler

if (request.writeQueueFull()) {
  request.drainHandler(v -> {
    // Writable again
  });
} else {
  request.write(item);
}

You can pause/resume/fetch a response

response.pause();

performAsyncOperation().onComplete(ar -> {
  // And then resume
  response.resume();
});

Timeout and deadlines

The gRPC client handles timeout and deadlines, setting a timeout on a gRPC request instructs the client to send the timeout information to make the server aware that the client desires a response within a defined time.

In addition, the client shall be configured to schedule a deadline: when a timeout is set on a request, the client schedules locally a timer to cancel the request when the response has not been received in time.

GrpcClient client = GrpcClient.client(vertx, new GrpcClientOptions()
  .setTimeout(10)
  .setTimeoutUnit(TimeUnit.SECONDS)
  .setScheduleDeadlineAutomatically(true));

The timeout can also be set on a per-request basis.

Future<GrpcClientRequest<HelloRequest, HelloReply>> fut = client.request(server, GreeterGrpcClient.SayHello);
fut.onSuccess(request -> {

  request
    // Given this request, set a 10 seconds timeout that will be sent to the gRPC service
    .timeout(10, TimeUnit.SECONDS);

  request.end(HelloRequest.newBuilder().setName("Bob").build());
});

Cancellation

You can call cancel to cancel a request

request.cancel();
cancellation sends an HTTP/2 reset frame to the server

Client side load balancing

The gRPC Client can be configured to perform client side load balancing.

DNS based load balancing

DNS based load balancing works with DNS queries resolving a single host to multiple IP addresses (usually A records).

You can set a load balancer to enable DNS-based load balancing

GrpcClient client = GrpcClient
  .builder(vertx)
  .withLoadBalancer(LoadBalancer.ROUND_ROBIN)
  .build();

client
  .request(SocketAddress.inetSocketAddress(port, server), GreeterGrpcClient.SayHello)
  .compose(request -> {
    request.end(HelloRequest
      .newBuilder()
      .setName("Bob")
      .build());
    return request.response().compose(response -> response.last());
  }).onSuccess(reply -> {
    System.out.println("Received " + reply.getMessage());
  });

The usual load balancing strategies are available, you can refer to the Vert.x HTTP client side load balancing documentation to configure them.

Address based load balancing

Address based load balancing relies on the Vert.x address resolver to resolve a single address to multiple host/port socket addresses.

You can set an address resolver to enable load balancing, the Vert.x Service Resolver implements a few address resolver, e.g. like a Kubernetes resolver.

GrpcClient client = GrpcClient
  .builder(vertx)
  .withAddressResolver(KubeResolver.create())
  .withLoadBalancer(LoadBalancer.ROUND_ROBIN)
  .build();

Unlike DNS based load balancing, address based load balancing uses an abstract Address instead of a SocketAddress. The address resolver implementation resolves address to a list of socket addresses.

The Vert.x Servicer Resolver defines a ServiceAddress.

ServiceAddress address = ServiceAddress.of("GreeterService");

client
  .request(address, GreeterGrpcClient.SayHello)
  .compose(request -> {
    request.end(HelloRequest
      .newBuilder()
      .setName("Bob")
      .build());
    return request.response().compose(response -> response.last());
  }).onSuccess(reply -> {
    System.out.println("Received " + reply.getMessage());
  });

You can refer to the Vert.x Service Resolver project documentation for more details.

JSON wire format

gRPC implicitly assumes the usage of the Protobuf wire format.

The Vert.x gRPC client supports the JSON wire format as well.

You can call a JSON service method with the application/grpc+json content-type.

client
  .request(server, GreeterGrpcClient.Json.SayHello).compose(request -> {
    request.end(HelloRequest
      .newBuilder()
      .setName("Bob")
      .build());
    return request.response().compose(response -> response.last());
  }).onSuccess(reply -> {
    System.out.println("Received " + reply.getMessage());
  });
JSON encoding/decoding is achieved by com.google.protobuf:protobuf-java-util library.

Anemic JSON is also supported with Vert.x JsonObject

ServiceMethod<JsonObject, JsonObject> sayHello = ServiceMethod.client(
  ServiceName.create("helloworld", "Greeter"),
  "SayHello",
  GrpcMessageEncoder.JSON_OBJECT,
  GrpcMessageDecoder.JSON_OBJECT
);
client
  .request(server, sayHello).compose(request -> {
    request.end(new JsonObject().put("name", "Bob"));
    return request.response().compose(response -> response.last());
  }).onSuccess(reply -> {
    System.out.println("Received " + reply.getString("message"));
  });

Compression

You can compress request messages by setting the request encoding prior before sending any message

request.encoding("gzip");

// Write items after encoding has been defined
request.write(Item.newBuilder().setValue("item-1").build());
request.write(Item.newBuilder().setValue("item-2").build());
request.write(Item.newBuilder().setValue("item-3").build());

Decompression

Decompression is achieved transparently by the client when the server sends encoded responses.

Message level API

The client provides a message level API to interact directly with protobuf encoded gRPC messages.

the client message level API can be used with the server message level API to write a gRPC reverse proxy

Such API is useful when you are not interested in the content of the messages, and instead you want to forward them to another service, e.g. you are writing a proxy.

Future<GrpcClientRequest<Buffer, Buffer>> requestFut = client.request(server);

requestFut.onSuccess(request -> {

  // Set the service name and the method to call
  request.serviceName(ServiceName.create("helloworld", "Greeter"));
  request.methodName("SayHello");

  // Send the protobuf request
  request.end(protoHello);

  // Handle the response
  Future<GrpcClientResponse<Buffer, Buffer>> responseFut = request.response();
  responseFut.onSuccess(response -> {
    response.handler(protoReply -> {
      // Handle the protobuf reply
    });
  });
});

You can also set a messageHandler to handle GrpcMessage, such messages preserve the server encoding.

Future<GrpcClientRequest<Buffer, Buffer>> requestFut = client.request(server);

requestFut.onSuccess(request -> {

  // Set the service name and the method to call
  request.serviceName(ServiceName.create("helloworld", "Greeter"));
  request.methodName("SayHello");

  // Send the protobuf request
  request.endMessage(GrpcMessage.message("identity", protoHello));

  // Handle the response
  Future<GrpcClientResponse<Buffer, Buffer>> responseFut = request.response();
  responseFut.onSuccess(response -> {
    response.messageHandler(replyMessage -> {
      System.out.println("Got reply message encoded as " + replyMessage.encoding());
    });
  });
});

The writeMessage and endMessage will handle the message encoding:

  • when the message uses the response encoding, the message is sent as is

  • when the message uses a different encoding, it will be encoded, e.g. compressed or uncompressed

Generated client API

In addition to the request/response API, the Vert.x gRPC protoc plugin generates idiomatic service clients.

A client wraps a GrpcClient and provides a Vert.x idiomatic API to interact with the service

GreeterGrpcClient client = GreeterGrpcClient.create(grpcClient, SocketAddress.inetSocketAddress(port, host));

You can also specify the JSON wire format when creating a stub

GreeterGrpcClient client = GreeterGrpcClient.create(grpcClient, SocketAddress.inetSocketAddress(port, host), WireFormat.JSON);

The client will send application/grpc+json requests.

Unary services

Unary services returns a Vert.x Future

Future<HelloReply> response = client.sayHello(HelloRequest.newBuilder().setName("John").build());

response.onSuccess(result -> System.out.println("Service responded: " + response.result().getMessage()));

response.onFailure(err -> System.out.println("Service failure: " + response.cause().getMessage()));

Streaming requests

Streaming requests use a lambda passed a Vert.x WriteStream of messages sent to the service

Future<Empty> response = client.sink((stream, err) -> {
  stream.write(Item.newBuilder().setValue("Value 1").build());
  stream.write(Item.newBuilder().setValue("Value 2").build());
  stream.end(Item.newBuilder().setValue("Value 3").build());
});

Streaming responses

Streaming responses get a Vert.x ReadStream of messages sent by the service

Future<ReadStream<Item>> response = client.source(Empty.getDefaultInstance());

response.onSuccess(stream -> stream
  .handler(item -> System.out.println("Item " + item.getValue()))
  .exceptionHandler(err -> System.out.println("Stream failed " + err.getMessage()))
  .endHandler(v -> System.out.println("Stream ended")));

response.onFailure(err -> System.out.println("Service failure: " + err.getMessage()));

Vert.x gRPC/IO Server

Vert.x gRPC/IO Server extends the Vert.x gRPC server with grpc-java integration.

This server provides compatibility with the grpc-java generated stub approach with a service bridge.

Using Vert.x gRPC/IO Server

To use Vert.x gRPC/IO Server, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpcio-server</artifactId>
  <version>5.0.0.CR6</version>
</dependency>
  • Gradle (in your build.gradle file):

dependencies {
  compile 'io.vertx:vertx-grpcio-server:5.0.0.CR6'
}

Service bridge

The Vert.x gRPC Server can bridge a gRPC service to use with grpc-java generated server classes.

GrpcIoServer grpcServer = GrpcIoServer.server(vertx);

GreeterGrpc.GreeterImplBase service = new GreeterGrpc.GreeterImplBase() {
  @Override
  public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
    responseObserver.onNext(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
    responseObserver.onCompleted();
  }
};

// Bind the service bridge in the gRPC server
GrpcIoServiceBridge serverStub = GrpcIoServiceBridge.bridge(service);
serverStub.bind(grpcServer);

// Start the HTTP/2 server
vertx.createHttpServer(options)
  .requestHandler(grpcServer)
  .listen();

The bridge supports deadline automatic cancellation: when a gRPC request carrying a timeout is received, a deadline is associated with the io.grpc.Context an can be obtained from the current context. This deadline automatically cancels the request in progress when its associated timeout fires.

gRPC Reflection APIs

Support for the gRPC reflection APIs can be added to your Vert.x gRPC Server.

GrpcIoServer grpcServer = GrpcIoServer.server(vertx);

// Add reflection service
grpcServer.addService(ReflectionService.v1());

GreeterGrpc.GreeterImplBase greeterImpl = new GreeterGrpc.GreeterImplBase() {
  @Override
  public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
    responseObserver.onNext(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
    responseObserver.onCompleted();
  }
};

// Bind the service in the gRPC server
GrpcIoServiceBridge greeterService = GrpcIoServiceBridge.bridge(greeterImpl);

grpcServer.addService(greeterService);

// Start the HTTP/2 server
vertx.createHttpServer(options)
  .requestHandler(grpcServer)
  .listen();

You can then use tools like gRPCurl to explore and invoke your gRPC APIs.

grpcurl -plaintext localhost:50051 list

grpcurl -plaintext localhost:50051 describe .helloworld.HelloRequest

grpcurl -plaintext -d '{"name": "Vert.x"}' localhost:50051 helloworld.Greeter

Vert.x gRPC/IO Client

Vert.x gRPC/IO Client extends the Vert.x gRPC client with grpc-java integration.

This client provides a generated stub approach with a gRPC Channel

Using Vert.x gRPC/IO Client

To use Vert.x gRPC/IO Client, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpcio-client</artifactId>
  <version>5.0.0.CR6</version>
</dependency>
  • Gradle (in your build.gradle file):

dependencies {
  compile 'io.vertx:vertx-grpcio-client:5.0.0.CR6'
}

gRPC channel

The Vert.x gRPC/IO Client provides a gRPC channel to use with grpc-java generated client classes.

GrpcIoClientChannel channel = new GrpcIoClientChannel(client, SocketAddress.inetSocketAddress(443, "example.com"));

GreeterGrpc.GreeterStub greeter = GreeterGrpc.newStub(channel);

StreamObserver<HelloReply> observer = new StreamObserver<HelloReply>() {
  @Override
  public void onNext(HelloReply value) {
    // Process response
  }

  @Override
  public void onCompleted() {
    // Done
  }

  @Override
  public void onError(Throwable t) {
    // Something went bad
  }
};

greeter.sayHello(HelloRequest.newBuilder().setName("Bob").build(), observer);

Timeout and deadlines are supported through the usual gRPC API.

GreeterGrpc.GreeterStub greeter = GreeterGrpc.newStub(channel).withDeadlineAfter(10, TimeUnit.SECONDS);

greeter.sayHello(HelloRequest.newBuilder().setName("Bob").build(), observer);

Deadline are cascaded, e.g. when the current io.grpc.Context carries a deadline and the stub has no explicit deadline set, the client automatically inherits the implicit deadline. Such deadline can be set when using a stub within a gRPC server call.

Vert.x gRPC Context Storage

Vert.x gRPC Context Storage overrides the default io.grpc.Context.Storage implementation.

The default implementation always stores the gRPC context in a thread-local variable. This implementation stores the gRPC context the same way as Vert.x core stores request tracing data.

This means, for example, that when you implement a service method, the gRPC context is propagated across Vert.x async API calls:

Context grpcCtx1 = Context.current();

vertx.executeBlocking(() -> {

  // Same as grpcCtx1
  Context grpcCtx2 = Context.current();

  return doSomething();

}).onComplete(ar -> {

  // Same as grpcCtx1 and grpcCtx2
  Context grpcCtx3 = Context.current();

});

The gRPC context is propagated across Vert.x async API calls only when the current Vert.x Context is bound to a Vert.x HTTP server request.

It is not propagated if, for example, you invoke a stub on a non-Vert.x thread or from a verticle start method.

Using Vert.x gRPC Context Storage

To use Vert.x gRPC Context Storage, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpc-context-storage</artifactId>
  <version>5.0.0.CR6</version>
</dependency>
  • Gradle (in your build.gradle file):

dependencies {
  compile 'io.vertx:vertx-grpc-context-storage:5.0.0.CR6'
}