Vert.x gRPC
The best description of gRPC can be seen at wikipedia.
gRPC is an open source remote procedure call (RPC) system initially developed at Google. It uses HTTP/2 for transport, Protocol Buffers as the interface description language, and provides features such as authentication, bidirectional streaming and flow control, blocking or nonblocking bindings, and cancellation and timeouts. It generates cross-platform client and server bindings for many languages.
wikipedia
Vert.x gRPC is a module that will align the programming style of Google gRPC with Vert.x style. As a user of this module you will be more familiar with the code style using Vert.x Streams and Futures while benefiting from all the benefits of gRPC.
For more information related to gRPC please consult the official documentation site http://www.grpc.io/.
Warning
|
Since Vert.x 4.3, this module is the new support for gRPC in the Vert.x stack, the previous implementation based on gRPC Netty is still available and has been renamed Vert.x gRPC Netty, it can be found at https://vertx.io/docs/vertx-grpc-netty/java/ . This module has Tech Preview status, this means the API can change between versions. |
Vert.x gRPC is split in two parts
-
Vert.x gRPC Server
-
Vert.x gRPC Client
Vert.x gRPC Server
Vert.x gRPC Server is a new gRPC server powered by Vert.x HTTP server superseding the integrated Netty based gRPC client.
This server provides a gRPC request/response oriented API as well as a the generated stub approach with a service bridge.
Using Vert.x gRPC Server
To use Vert.x gRPC Server, add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc-server</artifactId>
<version>4.3.8</version>
</dependency>
-
Gradle (in your
build.gradle
file):
dependencies {
compile 'io.vertx:vertx-grpc-server:4.3.8'
}
gRPC request/response server API
The gRPC request/response server API provides an alternative way to interact with a client without the need of a generated stub.
A GrpcServer
is a Handler<HttpServerRequest>
and can be used as an HTTP server request handler.
GrpcServer grpcServer = GrpcServer.server(vertx);
HttpServer server = vertx.createHttpServer(options);
server
.requestHandler(grpcServer)
.listen();
Tip
|
a GrpcServer can be mounted in a Vert.x Web router
|
Request/response
Each service method is processed by a handler
server.callHandler(GreeterGrpc.getSayHelloMethod(), request -> {
request.handler(hello -> {
GrpcServerResponse<HelloRequest, HelloReply> response = request.response();
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + hello.getName()).build();
response.end(reply);
});
});
Streaming request
You can set handlers to process request events
server.callHandler(StreamingGrpc.getSinkMethod(), request -> {
request.handler(item -> {
// Process item
});
request.endHandler(v ->{
// No more items
// Send the response
request.response().end(Empty.getDefaultInstance());
});
request.exceptionHandler(err -> {
// Something wrong happened
});
});
Streaming response
A streaming response involves calling write
for each element of the stream
and using end
to end the stream
server.callHandler(StreamingGrpc.getSourceMethod(), request -> {
GrpcServerResponse<Empty, Item> response = request.response();
request.handler(empty -> {
for (int i = 0;i < 10;i++) {
response.write(Item.newBuilder().setValue("1").build());
}
response.end();
});
});
Flow control
Request and response are back pressured Vert.x streams.
You can pause/resume/fetch a request
request.pause();
performAsyncOperation().onComplete(ar -> {
// And then resume
request.resume();
});
You can check the writability of a response and set a drain handler
if (response.writeQueueFull()) {
response.drainHandler(v -> {
// Writable again
});
} else {
response.write(item);
}
Compression
You can compress response messages by setting the response encoding prior before sending any message
response.encoding("gzip");
// Write items after encoding has been defined
response.write(Item.newBuilder().setValue("item-1").build());
response.write(Item.newBuilder().setValue("item-2").build());
response.write(Item.newBuilder().setValue("item-3").build());
Decompression
Decompression is done transparently by the server when the client send encoded requests.
Stub API
The Vert.x gRPC Server can bridge a gRPC service to use with a generated server stub in a more traditional fashion
GrpcServer grpcServer = GrpcServer.server(vertx);
GreeterGrpc.GreeterImplBase service = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
responseObserver.onNext(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
responseObserver.onCompleted();
}
};
// Bind the service bridge in the gRPC server
GrpcServiceBridge serverStub = GrpcServiceBridge.bridge(service);
serverStub.bind(grpcServer);
// Start the HTTP/2 server
vertx.createHttpServer(options)
.requestHandler(grpcServer)
.listen();
Message level API
The server provides a message level API to interact directly with protobuf encoded gRPC messages.
Tip
|
the server message level API can be used with the client message level API to write a gRPC reverse proxy |
Such API is useful when you are not interested in the content of the messages, and instead you want to forward them to another service, e.g. you are writing a proxy.
ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");
server.callHandler(request -> {
if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {
request.handler(protoHello -> {
// Handle protobuf encoded hello
performAsyncOperation(protoHello)
.onSuccess(protoReply -> {
// Reply with protobuf encoded reply
request.response().end(protoReply);
}).onFailure(err -> {
request.response()
.status(GrpcStatus.ABORTED)
.end();
});
});
} else {
request.response()
.status(GrpcStatus.NOT_FOUND)
.end();
}
});
You can also set a messageHandler
to handle GrpcMessage
, such messages preserve the
client encoding, which is useful the service you are forwarding to can handle compressed messages directly, in this case
the message does not need to be decompressed and compressed again.
ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");
server.callHandler(request -> {
if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {
request.messageHandler(helloMessage -> {
// Can be identity or gzip
String helloEncoding = helloMessage.encoding();
// Handle hello message
handleGrpcMessage(helloMessage)
.onSuccess(replyMessage -> {
// Reply with reply message
// Can be identity or gzip
String replyEncoding = replyMessage.encoding();
// Send the reply
request.response().endMessage(replyMessage);
}).onFailure(err -> {
request.response()
.status(GrpcStatus.ABORTED)
.end();
});
});
} else {
request.response()
.status(GrpcStatus.NOT_FOUND)
.end();
}
});
The writeMessage
and endMessage
will
handle the message encoding:
-
when the message uses the response encoding, the message is sent as is
-
when the message uses a different encoding, it will be encoded, e.g. compressed or uncompressed
Vert.x gRPC Client
Vert.x gRPC Client is a new gRPC client powered by Vert.x HTTP client superseding the integrated Netty based gRPC client.
This client provides a gRPC request/response oriented API as well as a the generated stub approach with a gRPC Channel
Using Vert.x gRPC Client
To use Vert.x gRPC Client, add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc-client</artifactId>
<version>4.3.8</version>
</dependency>
-
Gradle (in your
build.gradle
file):
dependencies {
compile 'io.vertx:vertx-grpc-client:4.3.8'
}
gRPC request/response client API
The gRPC request/response client API provides an alternative way to interact with a server without the need of a generated stub.
You can easily create the gRPC client
GrpcClient client = GrpcClient.client(vertx);
Request/response
Any interaction with a gRPC server involves creating a request to the remote gRPC service
SocketAddress server = SocketAddress.inetSocketAddress(443, "example.com");
MethodDescriptor<HelloRequest, HelloReply> sayHelloMethod = GreeterGrpc.getSayHelloMethod();
Future<GrpcClientRequest<HelloRequest, HelloReply>> fut = client.request(server, sayHelloMethod);
fut.onSuccess(request -> {
// The end method calls the service
request.end(HelloRequest.newBuilder().setName("Bob").build());
});
request.response().onSuccess(response -> {
Future<HelloReply> fut = response.last();
fut.onSuccess(reply -> {
System.out.println("Received " + reply.getMessage());
});
});
Future composition can combine all the previous steps together in a compact fashion
client
.request(server, GreeterGrpc.getSayHelloMethod()).compose(request -> {
request.end(HelloRequest
.newBuilder()
.setName("Bob")
.build());
return request.response().compose(response -> response.last());
}).onSuccess(reply -> {
System.out.println("Received " + reply.getMessage());
});
Streaming request
A streaming request involves calling write
for each element of the stream
and using end
to end the stream
client
.request(server, StreamingGrpc.getSinkMethod())
.onSuccess(request -> {
for (int i = 0;i < 10;i++) {
request.write(Item.newBuilder().setValue("1").build());
}
request.end();
});
Streaming response
You can set handlers to process response events
client
.request(server, StreamingGrpc.getSourceMethod())
.compose(request -> {
request.end(Empty.getDefaultInstance());
return request.response();
})
.onSuccess(response -> {
response.handler(item -> {
// Process item
});
response.endHandler(v -> {
// Done
});
response.exceptionHandler(err -> {
// Something went bad
});
});
Flow control
Request and response are back pressured Vert.x streams.
You can check the writability of a request and set a drain handler
if (request.writeQueueFull()) {
request.drainHandler(v -> {
// Writable again
});
} else {
request.write(item);
}
You can pause/resume/fetch a response
response.pause();
performAsyncOperation().onComplete(ar -> {
// And then resume
response.resume();
});
Cancellation
You can call cancel
to cancel a request
request.cancel();
Note
|
cancellation sends an HTTP/2 reset frame to the server |
Compression
You can compress request messages by setting the request encoding prior before sending any message
request.encoding("gzip");
// Write items after encoding has been defined
request.write(Item.newBuilder().setValue("item-1").build());
request.write(Item.newBuilder().setValue("item-2").build());
request.write(Item.newBuilder().setValue("item-3").build());
Decompression
Decompression is done transparently by the client when the server send encoded responses.
Stub API
The Vert.x gRPC Client provides a gRPC channel to use with a generated client stub in a more traditional fashion
GrpcClientChannel channel = new GrpcClientChannel(client, SocketAddress.inetSocketAddress(443, "example.com"));
GreeterGrpc.GreeterStub greeter = GreeterGrpc.newStub(channel);
greeter.sayHello(HelloRequest.newBuilder().setName("Bob").build(), new StreamObserver<HelloReply>() {
@Override
public void onNext(HelloReply value) {
// Process response
}
@Override
public void onCompleted() {
// Done
}
@Override
public void onError(Throwable t) {
// Something went bad
}
});
Message level API
The client provides a message level API to interact directly with protobuf encoded gRPC messages.
Tip
|
the client message level API can be used with the server message level API to write a gRPC reverse proxy |
Such API is useful when you are not interested in the content of the messages, and instead you want to forward them to another service, e.g. you are writing a proxy.
Future<GrpcClientRequest<Buffer, Buffer>> requestFut = client.request(server);
requestFut.onSuccess(request -> {
// Set the service name and the method to call
request.serviceName(ServiceName.create("helloworld", "Greeter"));
request.methodName("SayHello");
// Send the protobuf request
request.end(protoHello);
// Handle the response
Future<GrpcClientResponse<Buffer, Buffer>> responseFut = request.response();
responseFut.onSuccess(response -> {
response.handler(protoReply -> {
// Handle the protobuf reply
});
});
});
You can also set a messageHandler
to handle GrpcMessage
, such messages preserve the server encoding.
Future<GrpcClientRequest<Buffer, Buffer>> requestFut = client.request(server);
requestFut.onSuccess(request -> {
// Set the service name and the method to call
request.serviceName(ServiceName.create("helloworld", "Greeter"));
request.methodName("SayHello");
// Send the protobuf request
request.endMessage(GrpcMessage.message("identity", protoHello));
// Handle the response
Future<GrpcClientResponse<Buffer, Buffer>> responseFut = request.response();
responseFut.onSuccess(response -> {
response.messageHandler(replyMessage -> {
System.out.println("Got reply message encoded as " + replyMessage.encoding());
});
});
});
The writeMessage
and endMessage
will
handle the message encoding:
-
when the message uses the response encoding, the message is sent as is
-
when the message uses a different encoding, it will be encoded, e.g. compressed or uncompressed