<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-cassandra-client</artifactId>
<version>4.5.11</version>
</dependency>
Cassandra Client for Vert.x
A Vert.x client allowing applications to interact with an Apache Cassandra service.
Getting started
To use this module, add the following to the dependencies section of your Maven POM file:
Or, if you use Gradle:
compile 'io.vertx:vertx-cassandra-client:4.5.11'
Creating a client
Client options
Cassandra is a distributed system, and it can have many nodes. To connect to Cassandra you need to specify the addresses of some cluster nodes when creating a CassandraClientOptions
object:
CassandraClientOptions options = new CassandraClientOptions()
.addContactPoint("node1.address", 9142)
.addContactPoint("node2.address", 9142)
.addContactPoint("node3.address", 9142);
CassandraClient client = CassandraClient.create(vertx, options);
By default, the Cassandra client for Vert.x connects to the local machine’s port 9042
and is not tied to any specific keyspace. But you can set either or both of these options:
CassandraClientOptions options = new CassandraClientOptions()
.addContactPoint("localhost", 9142)
.setKeyspace("my_keyspace");
CassandraClient client = CassandraClient.create(vertx, options);
For fine tuning purposes, CassandraClientOptions exposes a com.datastax.driver.core.Cluster.Builder instance. |
Shared clients
If you deploy multiple instances of your verticle or have different verticles interacting with the same database, it is recommended to create a shared client:
CassandraClientOptions options = new CassandraClientOptions()
.addContactPoint("node1.address", 9142)
.addContactPoint("node2.address", 9142)
.addContactPoint("node3.address", 9142)
.setKeyspace("my_keyspace");
CassandraClient client = CassandraClient.createShared(vertx, "sharedClientName", options);
Shared clients with the same name will use a single underlying com.datastax.driver.core.Session
.
Client lifecycle
After the client is created, it is not connected until the first query is executed.
A shared client can be connected after creation if another client with the same name has already executed a query. |
Clients created inside a verticle are automatically stopped when the verticle is undeployed. In other words, you do not need to invoke close
in the verticle stop
method.
In all other cases, you must manually close the client.
When a shared client is closed, the driver dession is not closed if other clients with the same name are still running. |
Using the API
The client API is represented by CassandraClient
.
Querying
You can get query results using three different ways.
Streaming
The streaming API is most appropriate when you need to consume results iteratively, e.g you want to process each item. This is very efficient specially for large amount of rows.
In order to give you some inspiration and ideas on how you can use the API, we’d like to you to consider this example:
cassandraClient.queryStream("SELECT my_string_col FROM my_keyspace.my_table where my_key = 'my_value'")
.onComplete(queryStream -> {
if (queryStream.succeeded()) {
CassandraRowStream stream = queryStream.result();
// resume stream when queue is ready to accept buffers again
response.drainHandler(v -> stream.resume());
stream.handler(row -> {
String value = row.getString("my_string_col");
response.write(value);
// pause row stream when we buffer queue is full
if (response.writeQueueFull()) {
stream.pause();
}
});
// end request when we reached end of the stream
stream.endHandler(end -> response.end());
} else {
queryStream.cause().printStackTrace();
// response with internal server error if we are not able to execute given query
response
.setStatusCode(500)
.end("Unable to execute the query");
}
});
In the example, we are executing a query, and stream results via HTTP.
Bulk fetching
This API should be used when you need to process all the rows at the same time.
cassandraClient.executeWithFullFetch("SELECT * FROM my_keyspace.my_table where my_key = 'my_value'")
.onComplete(executeWithFullFetch -> {
if (executeWithFullFetch.succeeded()) {
List<Row> rows = executeWithFullFetch.result();
for (Row row : rows) {
// handle each row here
}
} else {
System.out.println("Unable to execute the query");
executeWithFullFetch.cause().printStackTrace();
}
});
Use bulk fetching only if you can afford to load the full result set in memory. |
Collector queries
You can use Java collectors with the query API:
cassandraClient.execute("SELECT * FROM users", listCollector)
.onComplete(ar -> {
if (ar.succeeded()) {
// Get the string created by the collector
String list = ar.result();
System.out.println("Got " + list);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Low level fetch
This API provides greater control over loading at the expense of being a bit lower-level than the streaming and bulk fetching APIs.
cassandraClient.execute("SELECT * FROM my_keyspace.my_table where my_key = 'my_value'")
.onComplete(execute -> {
if (execute.succeeded()) {
ResultSet resultSet = execute.result();
if (resultSet.remaining() != 0) {
Row row = resultSet.one();
System.out.println("One row successfully fetched");
} else if (!resultSet.hasMorePages()) {
System.out.println("No pages to fetch");
} else {
resultSet.fetchNextPage().onComplete(fetchMoreResults -> {
if (fetchMoreResults.succeeded()) {
int availableWithoutFetching = resultSet.remaining();
System.out.println("Now we have " + availableWithoutFetching + " rows fetched, but not consumed!");
} else {
System.out.println("Unable to fetch more results");
fetchMoreResults.cause().printStackTrace();
}
});
}
} else {
System.out.println("Unable to execute the query");
execute.cause().printStackTrace();
}
});
Prepared queries
For security and efficiency reasons, it is a good idea to use prepared statements for all the queries you are using more than once.
You can prepare a query:
cassandraClient.prepare("SELECT * FROM my_keyspace.my_table where my_key = ? ")
.onComplete(preparedStatementResult -> {
if (preparedStatementResult.succeeded()) {
System.out.println("The query has successfully been prepared");
PreparedStatement preparedStatement = preparedStatementResult.result();
// now you can use this PreparedStatement object for the next queries
} else {
System.out.println("Unable to prepare the query");
preparedStatementResult.cause().printStackTrace();
}
});
And then use the PreparedStatement
for all the next queries:
cassandraClient.execute(preparedStatement.bind("my_value"))
.onComplete(done -> {
ResultSet results = done.result();
// handle results here
});
// Bulk fetching API
cassandraClient.executeWithFullFetch(preparedStatement.bind("my_value"))
.onComplete(done -> {
List<Row> results = done.result();
// handle results here
});
// Streaming API
cassandraClient.queryStream(preparedStatement.bind("my_value"))
.onComplete(done -> {
CassandraRowStream results = done.result();
// handle results here
});
Batching
In case you’d like to execute several queries at once, you can use BatchStatement
for that:
BatchStatement batchStatement = BatchStatement.newInstance(BatchType.LOGGED)
.add(SimpleStatement.newInstance("INSERT INTO NAMES (name) VALUES ('Pavel')"))
.add(SimpleStatement.newInstance("INSERT INTO NAMES (name) VALUES ('Thomas')"))
.add(SimpleStatement.newInstance("INSERT INTO NAMES (name) VALUES ('Julien')"));
cassandraClient
.execute(batchStatement)
.onComplete(result -> {
if (result.succeeded()) {
System.out.println("The given batch executed successfully");
} else {
System.out.println("Unable to execute the batch");
result.cause().printStackTrace();
}
});
Tracing queries
The Cassandra Client can trace query execution when Vert.x has tracing enabled.
The client reports the following client spans:
-
Query
operation name -
tags
-
peer.address
: list of nodes known to the driver, in the form[127_0_0_1:9042,localhost:9042,myhost_mydomain:9042]
-
span.kind
:client
-
db.instance
: the keyspace -
db.statement
: the CQL query -
db.type
:cassandra
-
The default tracing policy is PROPAGATE
, the client will only create a span when involved in an active trace.
You can change the client policy with setTracingPolicy
. For example, you can set ALWAYS
to always report a span:
CassandraClientOptions options = new CassandraClientOptions()
.setTracingPolicy(TracingPolicy.ALWAYS);
RxJava 3 API
The Cassandra client provides an Rxified version of the original API.
Creating an Rxified client
To create an Rxified Cassandra client, make sure to import the CassandraClient
class. Then use one of the create
methods to get an instance:
CassandraClientOptions options = new CassandraClientOptions()
.addContactPoint("node1.corp.int", 7000)
.addContactPoint("node2.corp.int", 7000)
.addContactPoint("node3.corp.int", 7000);
CassandraClient cassandraClient = CassandraClient.createShared(vertx, options);
Querying
In this section, we will reconsider some of the previous use cases with the Rxified API.
Streaming
A CassandraRowStream
can be converted to a Flowable
, which is handy when you have to deal with large data sets:
cassandraClient.rxQueryStream("SELECT my_key FROM my_keyspace.my_table where my_key = my_value")
// Convert the stream to a Flowable
.flatMapPublisher(CassandraRowStream::toFlowable)
.subscribe(row -> {
// Handle single row
}, t -> {
// Handle failure
}, () -> {
// End of stream
});