syntax = "proto3";
option java_multiple_files = true;
option java_package = "examples";
option java_outer_classname = "HelloWorldProto";
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
Vert.x gRPC
The best description of gRPC can be seen at wikipedia.
gRPC is an open source remote procedure call (RPC) system initially developed at Google. It uses HTTP/2 for transport, Protocol Buffers as the interface description language, and provides features such as authentication, bidirectional streaming and flow control, blocking or nonblocking bindings, and cancellation and timeouts. It generates cross-platform client and server bindings for many languages.
wikipedia
Vert.x gRPC is a module that will align the programming style of gRPC with Vert.x style. As a user of this module you will be more familiar with the code style using Vert.x Streams and Futures while benefiting from all the benefits of gRPC.
For more information related to gRPC please consult the official documentation site http://www.grpc.io/.
Since Vert.x 4.3, this module is the new support for gRPC in the Vert.x stack, the previous implementation based on gRPC Netty is still available and has been renamed Vert.x gRPC Netty, it can be found at https://vertx.io/docs/vertx-grpc-netty/java/ . This module has Tech Preview status, this means the API can change between versions. |
Vert.x gRPC is split into several parts:
-
Vert.x gRPC Server
-
Vert.x gRPC Client
-
Vert.x gRPC/IO Server
-
Vert.x gRPC/IO Client
-
Vert.x gRPC/IO Context Storage
Vert.x gRPC Protoc Plugin
The easiest way to start using Vert.x gRPC is to utilize its built-in code generator plugin. To do so, one must define the protocol in the protobuffer
format as required by gRPC.
This is a very simple example showing the single request, single response mode.
Compile the RPC definition
Using the definition above we need to compile it.
You can compile the proto file using the protoc
compiler if you like, or you can integrate it in your build.
If you’re using Apache Maven you need to add the plugin:
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
<protocPlugins>
<protocPlugin>
<id>vertx-grpc-protoc-plugin2</id>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc-protoc-plugin2</artifactId>
<version>${stack.version}</version>
<mainClass>io.vertx.grpc.plugin.VertxGrpcGenerator</mainClass>
</protocPlugin>
</protocPlugins>
</configuration>
<executions>
<execution>
<id>compile</id>
<configuration>
<outputDirectory>${project.basedir}/src/main/java</outputDirectory>
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
The ${os.detected.classifier}
property is used to make the build OS independant, on OSX it is replaced by osx-x86_64 and so on. To use it you need to add the os-maven-plugin[https://github.com/trustin/os-maven-plugin] in the build
section of your pom.xml
:
<build>
...
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.4.1.Final</version>
</extension>
</extensions>
...
</build>
This plugin will compile your proto files under src/main/proto
and make them available to your project.
If you’re using Gradle you need to add the plugin:
...
apply plugin: 'com.google.protobuf'
...
buildscript {
...
dependencies {
// ASSUMES GRADLE 2.12 OR HIGHER. Use plugin version 0.7.5 with earlier gradle versions
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.0'
}
}
...
protobuf {
protoc {
artifact = 'com.google.protobuf:protoc:3.2.0'
}
plugins {
grpc {
artifact = "io.grpc:protoc-gen-grpc-java:1.25.0"
}
vertx {
artifact = "io.vertx:vertx-grpc-protoc-plugin2:${vertx.grpc.version}"
}
}
generateProtoTasks {
all()*.plugins {
grpc
vertx
}
}
}
This plugin will compile your proto files under build/generated/source/proto/main
and make them available to your project.
Generated RPC files
For each service definition, the plugin creates two Java RPC files.
For the Greeter
service:
-
examples/GreeterGrpcClient.java
-
examples/GreeterGrpcService.java
Besides the usual client/server generated code, these files contains service method constants:
public class GreeterGrpcClient {
public static final ServiceMethod<examples.HelloReply, examples.HelloRequest> SayHello = ServiceMethod.client(
ServiceName.create("helloworld", "Greeter"),
"SayHello",
GrpcMessageEncoder.encoder(),
GrpcMessageDecoder.decoder(examples.HelloReply.parser())
);
// ...
}
public class GreeterGrpcServer {
public static final ServiceMethod<examples.HelloRequest, examples.HelloReply> SayHello = ServiceMethod.server(
ServiceName.create("helloworld", "Greeter"),
"SayHello",
GrpcMessageEncoder.encoder(),
GrpcMessageDecoder.decoder(examples.HelloRequest.parser())
);
// ...
}
For each service method, a public static final ServiceMethod
is generated, these constants provide everything Vert.x gRPC needs to know to interact with gRPC.
-
the service name:
/helloworld.Greeter
-
the service method name:
SayHello
-
the message decoder
-
the message encoder
They can be used to bind services or interact with a remote server.
Vert.x gRPC Server
Vert.x gRPC Server is a gRPC server powered by Vert.x HTTP server superseding the integrated Netty based gRPC client.
This server provides a gRPC request/response oriented API as well as a generated stub approach with the Vert.x gRPC Generator.
Using Vert.x gRPC Server
To use Vert.x gRPC Server, add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc-server</artifactId>
<version>4.5.11</version>
</dependency>
-
Gradle (in your
build.gradle
file):
dependencies {
compile 'io.vertx:vertx-grpc-server:4.5.11'
}
Creating a gRPC server
A GrpcServer
is a Handler<HttpServerRequest>
and can be used as an HTTP server request handler.
GrpcServer grpcServer = GrpcServer.server(vertx);
HttpServer server = vertx.createHttpServer(options);
server
.requestHandler(grpcServer)
.listen();
A |
Server request/response API
The gRPC request/response server API provides an alternative way to interact with a client without the need of extending a Java class.
Request/response
Each service method is processed by a handler, the handler is bound using a ServiceMethod
.
ServiceMethod<HelloRequest, HelloReply> serviceMethod = VertxGreeterGrpcServer.SayHello;
server.callHandler(serviceMethod, request -> {
request.handler(hello -> {
GrpcServerResponse<HelloRequest, HelloReply> response = request.response();
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + hello.getName()).build();
response.end(reply);
});
});
ServiceMethod
constants are generated by the Vert.x gRPC protoc plugin.
Streaming request
You can set handlers to process request events
server.callHandler(VertxStreamingGrpcServer.Sink, request -> {
request.handler(item -> {
// Process item
});
request.endHandler(v ->{
// No more items
// Send the response
request.response().end(Empty.getDefaultInstance());
});
request.exceptionHandler(err -> {
// Something wrong happened
});
});
Streaming response
A streaming response involves calling write
for each element of the stream and using end
to end the stream
server.callHandler(VertxStreamingGrpcServer.Source, request -> {
GrpcServerResponse<Empty, Item> response = request.response();
request.handler(empty -> {
for (int i = 0;i < 10;i++) {
response.write(Item.newBuilder().setValue("1").build());
}
response.end();
});
});
Bidi request/response
A bidi request/response is simply the combination of a streaming request and a streaming response
server.callHandler(VertxStreamingGrpcServer.Pipe, request -> {
request.handler(item -> request.response().write(item));
request.endHandler(v -> request.response().end());
});
The gRPC-Web protocol does not support bidirectional streaming. |
Flow control
Request and response are back pressured Vert.x streams.
You can pause/resume/fetch a request
request.pause();
performAsyncOperation().onComplete(ar -> {
// And then resume
request.resume();
});
You can check the writability of a response and set a drain handler
if (response.writeQueueFull()) {
response.drainHandler(v -> {
// Writable again
});
} else {
response.write(item);
}
Compression
You can compress response messages by setting the response encoding prior before sending any message
response.encoding("gzip");
// Write items after encoding has been defined
response.write(Item.newBuilder().setValue("item-1").build());
response.write(Item.newBuilder().setValue("item-2").build());
response.write(Item.newBuilder().setValue("item-3").build());
Compression is not supported over the gRPC-Web protocol. |
Decompression
Decompression is done transparently by the server when the client send encoded requests.
Decompression is not supported over the gRPC-Web protocol. |
Message level API
The server provides a message level API to interact directly with protobuf encoded gRPC messages.
the server message level API can be used with the client message level API to write a gRPC reverse proxy |
Such API is useful when you are not interested in the content of the messages, and instead you want to forward them to another service, e.g. you are writing a proxy.
ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");
server.callHandler(request -> {
if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {
request.handler(protoHello -> {
// Handle protobuf encoded hello
performAsyncOperation(protoHello)
.onSuccess(protoReply -> {
// Reply with protobuf encoded reply
request.response().end(protoReply);
}).onFailure(err -> {
request.response()
.status(GrpcStatus.ABORTED)
.end();
});
});
} else {
request.response()
.status(GrpcStatus.NOT_FOUND)
.end();
}
});
You can also set a messageHandler
to handle GrpcMessage
, such messages preserve the client encoding, which is useful the service you are forwarding to can handle compressed messages directly, in this case the message does not need to be decompressed and compressed again.
ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");
server.callHandler(request -> {
if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {
request.messageHandler(helloMessage -> {
// Can be identity or gzip
String helloEncoding = helloMessage.encoding();
// Handle hello message
handleGrpcMessage(helloMessage)
.onSuccess(replyMessage -> {
// Reply with reply message
// Can be identity or gzip
String replyEncoding = replyMessage.encoding();
// Send the reply
request.response().endMessage(replyMessage);
}).onFailure(err -> {
request.response()
.status(GrpcStatus.ABORTED)
.end();
});
});
} else {
request.response()
.status(GrpcStatus.NOT_FOUND)
.end();
}
});
The writeMessage
and endMessage
will handle the message encoding:
-
when the message uses the response encoding, the message is sent as is
-
when the message uses a different encoding, it will be encoded, e.g. compressed or uncompressed
Server stub API
In addition to the request/response API, the Vert.x gRPC protoc plugin idiomatic service stubs.
Each service comes in two flavors, you can override the method you like depending on the style.
Unary services
Unary services can return a Vert.x Future
:
VertxGreeterGrpcServer.GreeterApi stub = new VertxGreeterGrpcServer.GreeterApi() {
@Override
public Future<HelloReply> sayHello(HelloRequest request) {
return Future.succeededFuture(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
}
};
or process a Vert.x Promise
VertxGreeterGrpcServer.GreeterApi stub = new VertxGreeterGrpcServer.GreeterApi() {
@Override
public void sayHello(HelloRequest request, Promise<HelloReply> response) {
response.complete(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
}
};
In both case you need to bind the stub to an existing GrpcServer
:
stub.bindAll(server);
Streaming requests
Streaming requests are implemented with a ReadStream
:
VertxStreamingGrpcServer.StreamingApi stub = new VertxStreamingGrpcServer.StreamingApi() {
@Override
public void sink(ReadStream<Item> stream, Promise<Empty> response) {
stream.handler(item -> {
System.out.println("Process item " + item.getValue());
});
// Send response
stream.endHandler(v -> response.complete(Empty.getDefaultInstance()));
}
};
Streaming responses
Streaming responses are implemented with Vert.x streams and comes in two flavors.
You can return a Vert.x ReadStream
and let the service send it for you:
VertxStreamingGrpcServer.StreamingApi stub = new VertxStreamingGrpcServer.StreamingApi() {
@Override
public ReadStream<Item> source(Empty request) {
return streamOfItems();
}
};
or you can process a WriteStream
:
VertxStreamingGrpcServer.StreamingApi stub = new VertxStreamingGrpcServer.StreamingApi() {
@Override
public void source(Empty request, WriteStream<Item> response) {
response.write(Item.newBuilder().setValue("value-1").build());
response.end(Item.newBuilder().setValue("value-2").build());
}
};
Vert.x gRPC Client
Vert.x gRPC Client is a gRPC client powered by Vert.x HTTP client.
This client provides a gRPC request/response oriented API as well as a generated stub approach with a gRPC Channel
Using Vert.x gRPC Client
To use Vert.x gRPC Client, add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc-client</artifactId>
<version>4.5.11</version>
</dependency>
-
Gradle (in your
build.gradle
file):
dependencies {
compile 'io.vertx:vertx-grpc-client:4.5.11'
}
Creating a gRPC client
You can easily create the gRPC client
GrpcClient client = GrpcClient.client(vertx);
Client request/response API
The gRPC request/response client API provides an alternative way to interact with a server without the need of a generated stub.
Request/response
Interacting with a gRPC server involves creating a request to the remote gRPC service
SocketAddress server = SocketAddress.inetSocketAddress(443, "example.com");
ServiceMethod<HelloReply, HelloRequest> sayHelloMethod = VertxGreeterGrpcClient.SayHello;
Future<GrpcClientRequest<HelloRequest, HelloReply>> fut = client.request(server, sayHelloMethod);
fut.onSuccess(request -> {
// The end method calls the service
request.end(HelloRequest.newBuilder().setName("Bob").build());
});
ServiceMethod
constants are generated by the Vert.x gRPC protoc plugin.
request.response().onSuccess(response -> {
Future<HelloReply> fut = response.last();
fut.onSuccess(reply -> {
System.out.println("Received " + reply.getMessage());
});
});
Future composition can combine all the previous steps together in a compact fashion
client
.request(server, VertxGreeterGrpcClient.SayHello).compose(request -> {
request.end(HelloRequest
.newBuilder()
.setName("Bob")
.build());
return request.response().compose(response -> response.last());
}).onSuccess(reply -> {
System.out.println("Received " + reply.getMessage());
});
Streaming request
Streaming requests involve calling write
for each element of the stream and using end
to end the stream
client
.request(server, VertxStreamingGrpcClient.Sink)
.onSuccess(request -> {
for (int i = 0;i < 10;i++) {
request.write(Item.newBuilder().setValue("1").build());
}
request.end();
});
Streaming response
You can set handlers to process response events of a streaming response
client
.request(server, VertxStreamingGrpcClient.Source)
.compose(request -> {
request.end(Empty.getDefaultInstance());
return request.response();
})
.onSuccess(response -> {
response.handler(item -> {
// Process item
});
response.endHandler(v -> {
// Done
});
response.exceptionHandler(err -> {
// Something went bad
});
});
Bidi request/response
A bidi request/response is simply the combination of a streaming request and a streaming response.
Flow control
Request and response are back pressured Vert.x streams.
You can check the writability of a request and set a drain handler
if (request.writeQueueFull()) {
request.drainHandler(v -> {
// Writable again
});
} else {
request.write(item);
}
You can pause/resume/fetch a response
response.pause();
performAsyncOperation().onComplete(ar -> {
// And then resume
response.resume();
});
Cancellation
You can call cancel
to cancel a request
request.cancel();
cancellation sends an HTTP/2 reset frame to the server |
Compression
You can compress request messages by setting the request encoding prior before sending any message
request.encoding("gzip");
// Write items after encoding has been defined
request.write(Item.newBuilder().setValue("item-1").build());
request.write(Item.newBuilder().setValue("item-2").build());
request.write(Item.newBuilder().setValue("item-3").build());
Decompression
Decompression is achieved transparently by the client when the server sends encoded responses.
Message level API
The client provides a message level API to interact directly with protobuf encoded gRPC messages.
the client message level API can be used with the server message level API to write a gRPC reverse proxy |
Such API is useful when you are not interested in the content of the messages, and instead you want to forward them to another service, e.g. you are writing a proxy.
Future<GrpcClientRequest<Buffer, Buffer>> requestFut = client.request(server);
requestFut.onSuccess(request -> {
// Set the service name and the method to call
request.serviceName(ServiceName.create("helloworld", "Greeter"));
request.methodName("SayHello");
// Send the protobuf request
request.end(protoHello);
// Handle the response
Future<GrpcClientResponse<Buffer, Buffer>> responseFut = request.response();
responseFut.onSuccess(response -> {
response.handler(protoReply -> {
// Handle the protobuf reply
});
});
});
You can also set a messageHandler
to handle GrpcMessage
, such messages preserve the server encoding.
Future<GrpcClientRequest<Buffer, Buffer>> requestFut = client.request(server);
requestFut.onSuccess(request -> {
// Set the service name and the method to call
request.serviceName(ServiceName.create("helloworld", "Greeter"));
request.methodName("SayHello");
// Send the protobuf request
request.endMessage(GrpcMessage.message("identity", protoHello));
// Handle the response
Future<GrpcClientResponse<Buffer, Buffer>> responseFut = request.response();
responseFut.onSuccess(response -> {
response.messageHandler(replyMessage -> {
System.out.println("Got reply message encoded as " + replyMessage.encoding());
});
});
});
The writeMessage
and endMessage
will handle the message encoding:
-
when the message uses the response encoding, the message is sent as is
-
when the message uses a different encoding, it will be encoded, e.g. compressed or uncompressed
Client stub API
In addition to the request/response API, the Vert.x gRPC protoc plugin idiomatic service clients.
A client wraps a GrpcClient
and provides Vert.x idiomatic API to interact with the service:
VertxGreeterGrpcClient client = new VertxGreeterGrpcClient(grpcClient, SocketAddress.inetSocketAddress(port, host));
Unary services
Unary services returns a Vert.x Future
Future<HelloReply> response = client.sayHello(HelloRequest.newBuilder().setName("John").build());
response.onSuccess(result -> System.out.println("Service responded: " + response.result().getMessage()));
response.onFailure(err -> System.out.println("Service failure: " + response.cause().getMessage()));
Streaming requests
Streaming requests use a lambda passed a Vert.x WriteStream
of messages sent to the service
Future<Empty> response = client.sink(stream -> {
stream.write(Item.newBuilder().setValue("Value 1").build());
stream.write(Item.newBuilder().setValue("Value 2").build());
stream.end(Item.newBuilder().setValue("Value 3").build());
});
Streaming responses
Streaming responses get a Vert.x ReadStream
of messages sent by the service
Future<ReadStream<Item>> response = client.source(Empty.getDefaultInstance());
response.onSuccess(stream -> stream
.handler(item -> System.out.println("Item " + item.getValue()))
.exceptionHandler(err -> System.out.println("Stream failed " + err.getMessage()))
.endHandler(v -> System.out.println("Stream ended")));
response.onFailure(err -> System.out.println("Service failure: " + err.getMessage()));
Vert.x gRPC/IO Server
Vert.x gRPC/IO Server extends the Vert.x gRPC server with grpc-java integration.
This server provides compatibility with the grpc-java generated stub approach with a service bridge.
Using Vert.x gRPC/IO Server
To use Vert.x gRPC/IO Server, add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpcio-server</artifactId>
<version>4.5.11</version>
</dependency>
-
Gradle (in your
build.gradle
file):
dependencies {
compile 'io.vertx:vertx-grpcio-server:4.5.11'
}
Service bridge
The Vert.x gRPC Server can bridge a gRPC service to use with grpc-java generated server classes.
GrpcIoServer grpcServer = GrpcIoServer.server(vertx);
GreeterGrpc.GreeterImplBase service = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
responseObserver.onNext(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
responseObserver.onCompleted();
}
};
// Bind the service bridge in the gRPC server
GrpcIoServiceBridge serverStub = GrpcIoServiceBridge.bridge(service);
serverStub.bind(grpcServer);
// Start the HTTP/2 server
vertx.createHttpServer(options)
.requestHandler(grpcServer)
.listen();
The bridge supports deadline automatic cancellation: when a gRPC request carrying a timeout is received, a deadline is associated with the io.grpc.Context
an can be obtained from the current context. This deadline automatically cancels the request in progress when its associated timeout fires.
Vert.x gRPC/IO Client
Vert.x gRPC/IO Client extends the Vert.x gRPC client with grpc-java integration.
This client provides a generated stub approach with a gRPC Channel
Using Vert.x gRPC/IO Client
To use Vert.x gRPC/IO Client, add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpcio-client</artifactId>
<version>4.5.11</version>
</dependency>
-
Gradle (in your
build.gradle
file):
dependencies {
compile 'io.vertx:vertx-grpcio-client:4.5.11'
}
gRPC channel
The Vert.x gRPC/IO Client provides a gRPC channel to use with grpc-java generated client classes.
GrpcIoClientChannel channel = new GrpcIoClientChannel(client, SocketAddress.inetSocketAddress(443, "example.com"));
GreeterGrpc.GreeterStub greeter = GreeterGrpc.newStub(channel);
StreamObserver<HelloReply> observer = new StreamObserver<HelloReply>() {
@Override
public void onNext(HelloReply value) {
// Process response
}
@Override
public void onCompleted() {
// Done
}
@Override
public void onError(Throwable t) {
// Something went bad
}
};
greeter.sayHello(HelloRequest.newBuilder().setName("Bob").build(), observer);
Vert.x gRPC Context Storage
Vert.x gRPC Context Storage overrides the default io.grpc.Context.Storage
implementation.
The default implementation always stores the gRPC context in a thread-local variable. This implementation stores the gRPC context the same way as Vert.x core stores request tracing data.
This means, for example, that when you implement a service method, the gRPC context is propagated across Vert.x async API calls:
Context grpcCtx1 = Context.current();
vertx.<String>executeBlocking(prom -> {
// Same as grpcCtx1
Context grpcCtx2 = Context.current();
String result = doSomething();
prom.complete(result);
}, ar -> {
// Same as grpcCtx1 and grpcCtx2
Context grpcCtx3 = Context.current();
});
The gRPC context is propagated across Vert.x async API calls only when the current Vert.x It is not propagated if, for example, you invoke a stub on a non-Vert.x thread or from a verticle |
Using Vert.x gRPC Context Storage
To use Vert.x gRPC Context Storage, add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc-context-storage</artifactId>
<version>4.5.11</version>
</dependency>
-
Gradle (in your
build.gradle
file):
dependencies {
compile 'io.vertx:vertx-grpc-context-storage:4.5.11'
}