A Vert.x client allowing applications to interact with a Cassandra service.
Warning
|
This module has Tech Preview status, this means the API can change between versions. |
To use this module, add the following to the dependencies section of your Maven POM file:
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-lang-kotlin</artifactId>
<version>3.6.3</version>
</dependency>
Or, if you use Gradle:
compile 'io.vertx:vertx-lang-kotlin:3.6.3'
Warning
|
the Cassandra client is not compatible with the Vert.x Dropwizard Metrics library. Both are using a different major version of the Dropwizard Metrics library and the Cassandra driver won’t upgrade to the most recent version due to the drop of Java 7. The next major version of the Cassandra driver (4) will uses a more recent Dropwizard Metrics library version. |
Cassandra is a distributed system, and it can have many nodes.
To connect to Cassandra you need to specify the addresses of some cluster nodes when creating a CassandraClientOptions
object:
var options = CassandraClientOptions(
contactPoints = listOf("node1.address", "node2.address", "node3.address"))
var client = CassandraClient.createShared(vertx, options)
Note
|
By default, the Cassandra client for Vert.x will connect to the local machine’s port 9042 .
|
After the client is created you can connect using specific cluster options:
cassandraClient.connect({ connect ->
if (connect.succeeded()) {
println("Just connected")
} else {
println("Unable to connect")
connect.cause().printStackTrace()
}
})
To disconnect, you can do it in a similar way:
cassandraClient.disconnect({ disconnect ->
if (disconnect.succeeded()) {
println("Just disconnected")
} else {
println("Unable to disconnect")
disconnect.cause().printStackTrace()
}
})
The client API is represented by CassandraClient
.
You can get query results using three different ways.
The streaming API is most appropriate when you need to consume results iteratively, e.g you want to process each item. This is very efficient specially for large amount of rows.
In order to give you some inspiration and ideas on how you can use the API, we’d like to you to consider this example:
cassandraClient.queryStream("SELECT my_string_col FROM my_keyspace.my_table where my_key = 'my_value'", { queryStream ->
if (queryStream.succeeded()) {
var stream = queryStream.result()
// resume stream when queue is ready to accept buffers again
response.drainHandler({ v ->
stream.resume()
})
stream.handler({ row ->
var value = row.getString("my_string_col")
response.write(value)
// pause row stream when we buffer queue is full
if (response.writeQueueFull()) {
stream.pause()
}
})
// end request when we reached end of the stream
stream.endHandler({ end ->
response.end()
})
} else {
queryStream.cause().printStackTrace()
// response with internal server error if we are not able to execute given query
response.setStatusCode(500).end("Unable to execute the query")
}
})
In the example, we are executing a query, and stream results via HTTP.
This API should be used when you need to process all the rows at the same time.
cassandraClient.executeWithFullFetch("SELECT * FROM my_keyspace.my_table where my_key = 'my_value'", { executeWithFullFetch ->
if (executeWithFullFetch.succeeded()) {
var rows = executeWithFullFetch.result()
for (row in rows) {
// handle each row here
}
} else {
println("Unable to execute the query")
executeWithFullFetch.cause().printStackTrace()
}
})
Caution
|
Use bulk fetching only if you can afford to load the full result set in memory. |
This API provides greater control over loading at the expense of being a bit lower-level than the streaming and bulk fetching APIs.
cassandraClient.execute("SELECT * FROM my_keyspace.my_table where my_key = 'my_value'", { execute ->
if (execute.succeeded()) {
var resultSet = execute.result()
resultSet.one({ one ->
if (one.succeeded()) {
var row = one.result()
println("One row successfully fetched")
} else {
println("Unable to fetch a row")
one.cause().printStackTrace()
}
})
resultSet.fetchMoreResults({ fetchMoreResults ->
if (fetchMoreResults.succeeded()) {
var availableWithoutFetching = resultSet.getAvailableWithoutFetching()
println("Now we have ${availableWithoutFetching} rows fetched, but not consumed!")
if (resultSet.isFullyFetched()) {
println("The result is fully fetched, we don't need to call this method for one more time!")
} else {
println("The result still does not fully fetched")
}
} else {
println("Unable to fetch more results")
fetchMoreResults.cause().printStackTrace()
}
})
} else {
println("Unable to execute the query")
execute.cause().printStackTrace()
}
})
For security and efficiency reasons, it is a good idea to use prepared statements for all the queries you are using more than once.
You can prepare a query:
cassandraClient.prepare("SELECT * FROM my_keyspace.my_table where my_key = ? ", { preparedStatementResult ->
if (preparedStatementResult.succeeded()) {
println("The query has successfully been prepared")
var preparedStatement = preparedStatementResult.result()
// now you can use this PreparedStatement object for the next queries
} else {
println("Unable to prepare the query")
preparedStatementResult.cause().printStackTrace()
}
})
And then use the PreparedStatement
for all the next queries:
// You can execute you prepared statement using any way to execute queries.
// Low level fetch API
cassandraClient.execute(preparedStatement.bind("my_value"), { done ->
var results = done.result()
// handle results here
})
// Bulk fetching API
cassandraClient.executeWithFullFetch(preparedStatement.bind("my_value"), { done ->
var results = done.result()
// handle results here
})
// Streaming API
cassandraClient.queryStream(preparedStatement.bind("my_value"), { done ->
var results = done.result()
// handle results here
})
In case you’d like to execute several queries at once, you can use BatchStatement
for that:
var batchStatement = com.datastax.driver.core.BatchStatement().add(com.datastax.driver.core.SimpleStatement("INSERT INTO NAMES (name) VALUES ('Pavel')")).add(com.datastax.driver.core.SimpleStatement("INSERT INTO NAMES (name) VALUES ('Thomas')")).add(com.datastax.driver.core.SimpleStatement("INSERT INTO NAMES (name) VALUES ('Julien')"))
cassandraClient.execute(batchStatement, { result ->
if (result.succeeded()) {
println("The given batch executed successfully")
} else {
println("Unable to execute the batch")
result.cause().printStackTrace()
}
})