Reactive MSSQL Client
The Reactive MSSQL Client is a client for Microsoft SQL Server with a straightforward API focusing on scalability and low overhead.
The client is reactive and non-blocking, allowing to handle many database connections with a single thread.
Features
-
Event driven
-
Lightweight
-
Built-in connection pooling
-
Direct memory to object without unnecessary copies
-
Java 8 Date and Time
-
RxJava API
-
SSL/TLS
-
Cursor
-
Row streaming
Not supported yet
-
Prepared queries caching
Usage
To use the Reactive MSSQL Client add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mssql-client</artifactId>
<version>4.4.9</version>
</dependency>
-
Gradle (in your
build.gradle
file):
dependencies {
compile 'io.vertx:vertx-mssql-client:4.4.9'
}
Getting started
Here is the simplest way to connect, query and disconnect
MSSQLConnectOptions connectOptions = new MSSQLConnectOptions()
.setPort(1433)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the client pool
MSSQLPool client = MSSQLPool.pool(connectOptions, poolOptions);
// A simple query
client
.query("SELECT * FROM users WHERE id='julien'")
.execute()
.onComplete(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
System.out.println("Got " + result.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
// Now close the pool
client.close();
});
Connecting to SQL Server
Most of the time you will use a pool to connect to MSSQL:
MSSQLConnectOptions connectOptions = new MSSQLConnectOptions()
.setPort(1433)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
MSSQLPool client = MSSQLPool.pool(connectOptions, poolOptions);
The pooled client uses a connection pool and any operation will borrow a connection from the pool to execute the operation and release it to the pool.
If you are running with Vert.x you can pass it your Vertx instance:
MSSQLConnectOptions connectOptions = new MSSQLConnectOptions()
.setPort(1433)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
MSSQLPool client = MSSQLPool.pool(vertx, connectOptions, poolOptions);
You need to release the pool when you don’t need it anymore:
pool.close();
When you need to execute several operations on the same connection, you need to use a client
connection
.
You can easily get one from the pool:
MSSQLConnectOptions connectOptions = new MSSQLConnectOptions()
.setPort(1433)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
MSSQLPool client = MSSQLPool.pool(vertx, connectOptions, poolOptions);
// Get a connection from the pool
client.getConnection().compose(conn -> {
System.out.println("Got a connection from the pool");
// All operations execute on the same connection
return conn
.query("SELECT * FROM users WHERE id='julien'")
.execute()
.compose(res -> conn
.query("SELECT * FROM users WHERE id='emad'")
.execute())
.onComplete(ar -> {
// Release the connection to the pool
conn.close();
});
}).onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("Done");
} else {
System.out.println("Something went wrong " + ar.cause().getMessage());
}
});
Once you are done with the connection you must close it to release it to the pool, so it can be reused.
Configuration
Data Object
A simple way to configure the client is to specify a MSSQLConnectOptions
data object.
MSSQLConnectOptions connectOptions = new MSSQLConnectOptions()
.setPort(1433)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool Options
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);
// Create the pool from the data object
MSSQLPool pool = MSSQLPool.pool(vertx, connectOptions, poolOptions);
pool.getConnection()
.onComplete(ar -> {
// Handling your connection
});
Connection URI
As an alternative to configuring the client with a MSSQLConnectOptions
data object, you can use a connection URI:
String connectionUri = "sqlserver://dbuser:[email protected]:1433/mydb";
// Create the pool from the connection URI
MSSQLPool pool = MSSQLPool.pool(connectionUri);
// Create the connection from the connection URI
MSSQLConnection.connect(vertx, connectionUri)
.onComplete(res -> {
// Handling your connection
});
The connection URI format is defined by the client in an idiomatic way:
sqlserver://[user[:[password]]@]host[:port][/database][?<key1>=<value1>[&<key2>=<value2>]]
Currently, the client supports the following parameter keys:
-
host
-
port
-
user
-
password
-
database
Note
|
Configuring parameters in connection URI will override the default properties. |
Connect retries
You can configure the client to retry when a connection fails to be established.
options
.setReconnectAttempts(2)
.setReconnectInterval(1000);
Running queries
When you don’t need a transaction or run single queries, you can run queries directly on the pool; the pool will use one of its connection to run the query and return the result to you.
Here is how to run simple queries:
client
.query("SELECT * FROM users WHERE id='julien'")
.execute()
.onComplete(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
System.out.println("Got " + result.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Prepared queries
You can do the same with prepared queries.
The SQL string can refer to parameters by position, using the database syntax `@p1`, `@p2`, etc…
client
.preparedQuery("SELECT * FROM users WHERE id=@p1")
.execute(Tuple.of("julien"))
.onComplete(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Got " + rows.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Query methods provides an asynchronous RowSet
instance that works for SELECT queries
client
.preparedQuery("SELECT first_name, last_name FROM users")
.execute()
.onComplete(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
for (Row row : rows) {
System.out.println("User " + row.getString(0) + " " + row.getString(1));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
or UPDATE/INSERT queries:
client
.preparedQuery("INSERT INTO users (first_name, last_name) VALUES (@p1, @p2)")
.execute(Tuple.of("Julien", "Viet"))
.onComplete(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println(rows.rowCount());
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
The Row
gives you access to your data by index
System.out.println("User " + row.getString(0) + " " + row.getString(1));
Caution
|
Column indexes start at 0, not at 1. |
Alternatively, data can be retrieved by name:
System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));
The client will not do any magic here and the column name is identified with the name in the table regardless of how your SQL text is.
You can access a wide variety of of types
String firstName = row.getString("first_name");
Boolean male = row.getBoolean("male");
Integer age = row.getInteger("age");
You can use cached prepared statements to execute one-shot prepared queries:
connectOptions.setCachePreparedStatements(true);
client
.preparedQuery("SELECT * FROM users WHERE id = @p1")
.execute(Tuple.of("julien"))
.onComplete(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Got " + rows.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
You can create a PreparedStatement
and manage the lifecycle by yourself.
sqlConnection
.prepare("SELECT * FROM users WHERE id = @p1")
.onComplete(ar -> {
if (ar.succeeded()) {
PreparedStatement preparedStatement = ar.result();
preparedStatement.query()
.execute(Tuple.of("julien"))
.onComplete(ar2 -> {
if (ar2.succeeded()) {
RowSet<Row> rows = ar2.result();
System.out.println("Got " + rows.size() + " rows ");
preparedStatement.close();
} else {
System.out.println("Failure: " + ar2.cause().getMessage());
}
});
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Batches
You can execute prepared batch
List<Tuple> batch = new ArrayList<>();
batch.add(Tuple.of("julien", "Julien Viet"));
batch.add(Tuple.of("emad", "Emad Alblueshi"));
// Execute the prepared batch
client
.preparedQuery("INSERT INTO USERS (id, name) VALUES (@p1, @p2)")
.executeBatch(batch)
.onComplete(res -> {
if (res.succeeded()) {
// Process rows
RowSet<Row> rows = res.result();
} else {
System.out.println("Batch failed " + res.cause());
}
});
Working with identity
columns
You can retrieve the value of an identity
column after inserting new data using the OUTPUT
clause:
client
.preparedQuery("INSERT INTO movies (title) OUTPUT INSERTED.id VALUES (@p1)")
.execute(Tuple.of("The Man Who Knew Too Much"))
.onComplete(res -> {
if (res.succeeded()) {
Row row = res.result().iterator().next();
System.out.println(row.getLong("id"));
}
});
Using connections
Getting a connection
When you need to execute sequential queries (without a transaction), you can create a new connection or borrow one from the pool. Remember that between acquiring the connection from the pool and returning it to the pool, you should take care of the connection because it might be closed by the server for some reason such as an idle time out.
pool
.getConnection()
.compose(connection ->
connection
.preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (@p1, @p2)")
.executeBatch(Arrays.asList(
Tuple.of("Julien", "Viet"),
Tuple.of("Emad", "Alblueshi")
))
.compose(res -> connection
// Do something with rows
.query("SELECT COUNT(*) FROM Users")
.execute()
.map(rows -> rows.iterator().next().getInteger(0)))
// Return the connection to the pool
.eventually(v -> connection.close())
).onSuccess(count -> {
System.out.println("Insert users, now the number of users is " + count);
});
Prepared queries can be created:
connection
.prepare("SELECT * FROM users WHERE first_name LIKE @p1")
.compose(pq ->
pq.query()
.execute(Tuple.of("Julien"))
.eventually(v -> pq.close())
).onSuccess(rows -> {
// All rows
});
Simplified connection API
When you use a pool, you can call withConnection
to pass it a function executed
within a connection.
It borrows a connection from the pool and calls the function with this connection.
The function must return a future of an arbitrary result.
After the future completes, the connection is returned to the pool and the overall result is provided.
pool.withConnection(connection ->
connection
.preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (@p1, @p2)")
.executeBatch(Arrays.asList(
Tuple.of("Julien", "Viet"),
Tuple.of("Emad", "Alblueshi")
))
.compose(res -> connection
// Do something with rows
.query("SELECT COUNT(*) FROM Users")
.execute()
.map(rows -> rows.iterator().next().getInteger(0)))
).onSuccess(count -> {
System.out.println("Insert users, now the number of users is " + count);
});
Using transactions
Transactions with connections
You can execute transaction using SQL BEGIN
/COMMIT
/ROLLBACK
, if you do so you must use
a SqlConnection
and manage it yourself.
Or you can use the transaction API of SqlConnection
:
pool.getConnection()
// Transaction must use a connection
.onSuccess(conn -> {
// Begin the transaction
conn.begin()
.compose(tx -> conn
// Various statements
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute()
.compose(res2 -> conn
.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
.execute())
// Commit the transaction
.compose(res3 -> tx.commit()))
// Return the connection to the pool
.eventually(v -> conn.close())
.onSuccess(v -> System.out.println("Transaction succeeded"))
.onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
});
When the database server reports the current transaction is failed (e.g the infamous current transaction is aborted, commands ignored until
end of transaction block), the transaction is rollbacked and the completion
future
is failed with a TransactionRollbackException
:
tx.completion()
.onFailure(err -> {
System.out.println("Transaction failed => rolled back");
});
Simplified transaction API
When you use a pool, you can call withTransaction
to pass it a function executed
within a transaction.
It borrows a connection from the pool, begins the transaction and calls the function with a client executing all operations in the scope of this transaction.
The function must return a future of an arbitrary result:
-
when the future succeeds the client will commit the transaction
-
when the future fails the client will rollback the transaction
After the transaction completes, the connection is returned to the pool and the overall result is provided.
pool.withTransaction(client -> client
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute()
.flatMap(res -> client
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute()
// Map to a message result
.map("Users inserted")))
.onSuccess(v -> System.out.println("Transaction succeeded"))
.onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
Cursors and streaming
By default, prepared query execution fetches all rows, you can use a
Cursor
to control the amount of rows you want to read:
connection.prepare("SELECT * FROM users WHERE age > @p1")
.onComplete(ar1 -> {
if (ar1.succeeded()) {
PreparedStatement pq = ar1.result();
// Create a cursor
Cursor cursor = pq.cursor(Tuple.of(18));
// Read 50 rows
cursor
.read(50)
.onComplete(ar2 -> {
if (ar2.succeeded()) {
RowSet<Row> rows = ar2.result();
// Check for more ?
if (cursor.hasMore()) {
// Repeat the process...
} else {
// No more rows - close the cursor
cursor.close();
}
}
});
}
});
Cursors shall be closed when they are released prematurely:
cursor
.read(50)
.onComplete(ar2 -> {
if (ar2.succeeded()) {
// Close the cursor
cursor.close();
}
});
A stream API is also available for cursors, which can be more convenient, specially with the Rxified version.
connection
.prepare("SELECT * FROM users WHERE age > @p1")
.onComplete(ar1 -> {
if (ar1.succeeded()) {
PreparedStatement pq = ar1.result();
// Fetch 50 rows at a time
RowStream<Row> stream = pq.createStream(50, Tuple.of(18));
// Use the stream
stream.exceptionHandler(err -> {
System.out.println("Error: " + err.getMessage());
});
stream.endHandler(v -> {
System.out.println("End of stream");
});
stream.handler(row -> {
System.out.println("User: " + row.getString("last_name"));
});
}
});
The stream read the rows by batch of 50
and stream them, when the rows have been passed to the handler,
a new batch of 50
is read and so on.
The stream can be resumed or paused, the loaded rows will remain in memory until they are delivered and the cursor will stop iterating.
Tracing queries
The SQL client can trace query execution when Vert.x has tracing enabled.
The client reports the following client spans:
-
Query
operation name -
tags
-
db.user
: the database username -
db.instance
: the database instance -
db.statement
: the SQL query -
db.type
: sql
The default tracing policy is PROPAGATE
, the client
will only create a span when involved in an active trace.
You can change the client policy with setTracingPolicy
,
e.g you can set ALWAYS
to always report
a span:
options.setTracingPolicy(TracingPolicy.ALWAYS);
Data types supported
Currently, the client supports the following SQL Server types:
-
TINYINT(
java.lang.Short
) -
SMALLINT(
java.lang.Short
) -
INT(
java.lang.Integer
) -
BIGINT(
java.lang.Long
) -
BIT(
java.lang.Boolean
) -
REAL(
java.lang.Float
) -
DOUBLE(
java.lang.Double
) -
NUMERIC/DECIMAL(
BigDecimal
) -
CHAR/VARCHAR(
java.lang.String
) -
NCHAR/NVARCHAR(
java.lang.String
) -
DATE(
java.time.LocalDate
) -
TIME(
java.time.LocalTime
) -
SMALLDATETIME(
java.time.LocalDateTime
) -
DATETIME(
java.time.LocalDateTime
) -
DATETIME2(
java.time.LocalDateTime
) -
DATETIMEOFFSET(
java.time.OffsetDateTime
) -
BINARY/VARBINARY(
io.vertx.core.buffer.Buffer
) -
MONEY (
BigDecimal
) -
SMALLMONEY (
BigDecimal
) -
GUID (
UUID
)
Tuple decoding uses the above types when storing values.
Using Java enum
types
SQL Server does not have ENUM
data type, but the client can map the retrieved string/numeric data type to enum.
You can encode Java enums like this:
client
.preparedQuery("INSERT INTO colors VALUES (@p1)")
.execute(Tuple.of(Color.red))
.onComplete(res -> {
// ...
});
You can decode a Java enum like this:
client
.preparedQuery("SELECT color FROM colors")
.execute()
.onComplete(res -> {
if (res.succeeded()) {
RowSet<Row> rows = res.result();
for (Row row : rows) {
System.out.println(row.get(Color.class, "color"));
}
}
});
Handling NULL
If you modify a Tuple
with one of the addXXX
methods, null
values are handled transparently.
The client can infer the right SQL type when executing a prepared query:
Tuple tuple = Tuple.tuple()
.addInteger(17)
.addString("The Man Who Knew Too Much")
.addString(null);
client
.preparedQuery("INSERT INTO movies (id, title, plot) VALUES (@p1, @p2, @p3)")
.execute(tuple)
.onComplete(res -> {
// ...
});
Otherwise, you should declare the type explicitly using one of the NullValue
constants or the NullValue.of
method:
Tuple tuple = Tuple.of(17, "The Man Who Knew Too Much", NullValue.String);
client
.preparedQuery("INSERT INTO movies (id, title, plot) VALUES (@p1, @p2, @p3)")
.execute(tuple)
.onComplete(res -> {
// ...
});
Collector queries
You can use Java collectors with the query API:
Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
row -> row.getLong("id"),
row -> row.getString("last_name"));
// Run the query with the collector
client.query("SELECT * FROM users")
.collecting(collector)
.execute()
.onComplete(ar -> {
if (ar.succeeded()) {
SqlResult<Map<Long, String>> result = ar.result();
// Get the map created by the collector
Map<Long, String> map = result.value();
System.out.println("Got " + map);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
The collector processing must not keep a reference on the Row
as
there is a single row used for processing the entire set.
The Java Collectors
provides many interesting predefined collectors, for example you can
create easily create a string directly from the row set:
Collector<Row, ?, String> collector = Collectors.mapping(
row -> row.getString("last_name"),
Collectors.joining(",", "(", ")")
);
// Run the query with the collector
client.query("SELECT * FROM users")
.collecting(collector)
.execute()
.onComplete(ar -> {
if (ar.succeeded()) {
SqlResult<String> result = ar.result();
// Get the string created by the collector
String list = result.value();
System.out.println("Got " + list);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Information messages
SQL Server can send information messages to the client as part of responses to queries.
By default, they are logged at WARN
level.
You can set a handler on a connection to catch them and do something useful with them.
connection.infoHandler(info -> {
System.out.println("Received info " + info.getSeverity() + "" + info.getMessage());
});
Using SSL/TLS
Encryption level negotiation
When a connection is established, the client and the server negotiate the encryption level.
The negotiated level depends on the client config in MSSQLConnectOptions
and the server config:
-
no encryption: if
ssl
is set tofalse
in client options, and the server does not support encryption -
encrypt login packet only: if
ssl
is set tofalse
in client options, and the server supports encryption -
encrypt entire connection: if
ssl
is set totrue
in client options, or the server requires encryption
Note
|
The negotiation fails if |
Configuration
To configure ssl
in client options, use the setSsl
method.
By default, ssl
is set to false
.
MSSQLConnectOptions connectOptions = new MSSQLConnectOptions().setSsl(true);
When ssl
is set to false
, the client trusts all server certificates.
Otherwise, it performs hostname validation.
If ssl
is set to true in client options, and the server uses a self-signed certificate, hostname validation can be disabled:
MSSQLConnectOptions connectOptions = new MSSQLConnectOptions()
.setSsl(true)
.setTrustAll(true);
Alternatively, the client can be configured to trust the server certificate with TrustOptions
.
For example, PemTrustOptions
can be used if a PEM file contains the server certificate:
MSSQLConnectOptions connectOptions = new MSSQLConnectOptions()
.setSsl(true)
.setPemTrustOptions(new PemTrustOptions().addCertPath("/path/to/server-cert.pem"));
For further details about SSL support in Vert.x, please refer to the Vert.x Core documentation.
Pool sharing
You can share an pool between multiple verticles or instances of the same verticle. Such pool should be created outside a verticle otherwise it will be closed when the verticle that created it is undeployed
MSSQLPool pool = MSSQLPool.pool(database, new PoolOptions().setMaxSize(maxSize));
vertx.deployVerticle(() -> new AbstractVerticle() {
@Override
public void start() throws Exception {
// Use the pool
}
}, new DeploymentOptions().setInstances(4));
You can also create a shared pool in each verticle:
vertx.deployVerticle(() -> new AbstractVerticle() {
MSSQLPool pool;
@Override
public void start() {
// Get or create a shared pool
// this actually creates a lease to the pool
// when the verticle is undeployed, the lease will be released automaticaly
pool = MSSQLPool.pool(database, new PoolOptions()
.setMaxSize(maxSize)
.setShared(true)
.setName("my-pool"));
}
}, new DeploymentOptions().setInstances(4));
The first time a shared pool is created it will create the resources for the pool. Subsequent calls will reuse this pool and create a lease to this pool. The resources are disposed after all leases have been closed.
By default, a pool reuses the current event-loop when it needs to create a TCP connection. The shared pool will therefore randomly use event-loops of verticles using it.
You can assign a number of event loop a pool will use independently of the context using it
MSSQLPool pool = MSSQLPool.pool(database, new PoolOptions()
.setMaxSize(maxSize)
.setShared(true)
.setName("my-pool")
.setEventLoopSize(4));
Advanced pool configuration
Server load balancing
You can configure the pool with a list of servers instead of a single server.
MSSQLPool pool = MSSQLPool.pool(Arrays.asList(server1, server2, server3), options);
The pool uses a round-robin load balancing when a connection is created to select different servers.
Note
|
this provides load balancing when the connection is created and not when the connection is borrowed from the pool. |
Pool connection initialization
You can use the connectHandler
to interact with a connection after it
has been created and before it is inserted in the pool.
pool.connectHandler(conn -> {
conn.query(sql).execute().onSuccess(res -> {
// Release the connection to the pool, ready to be used by the application
conn.close();
});
});
Once you are done with the connection, you should simply close it to signal the pool to use it.
Dynamic connection configuration
You can configure the pool connection details using a Java supplier instead of an instance of SqlConnectOptions
.
Since the supplier is asynchronous, it can be used to provide dynamic pool configuration (e.g. password rotation).
MSSQLPool pool = MSSQLPool.pool(vertx, () -> {
Future<MSSQLConnectOptions> connectOptions = retrieveOptions();
return connectOptions;
}, poolOptions);