Reactive PostgreSQL Client
The Reactive PostgreSQL Client is a client for PostgreSQL with a straightforward API focusing on scalability and low overhead.
The client is reactive and non blocking, allowing to handle many database connections with a single thread.
-
Event driven
-
Lightweight
-
Built-in connection pooling
-
Prepared queries caching
-
Publish / subscribe using PostgreSQL
NOTIFY/LISTEN
-
Batch and cursor
-
Row streaming
-
Command pipeling
-
RxJava API
-
Direct memory to object without unnecessary copies
-
Java 8 Date and Time
-
SSL/TLS
-
Unix domain socket
-
HTTP/1.x CONNECT, SOCKS4a or SOCKS5 proxy support
Usage
To use the Reactive PostgreSQL Client add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-pg-client</artifactId>
<version>4.3.8</version>
</dependency>
-
Gradle (in your
build.gradle
file):
dependencies {
compile 'io.vertx:vertx-pg-client:4.3.8'
}
Getting started
Here is the simplest way to connect, query and disconnect
PgConnectOptions connectOptions = new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the client pool
SqlClient client = PgPool.client(connectOptions, poolOptions);
// A simple query
client
.query("SELECT * FROM users WHERE id='julien'")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
System.out.println("Got " + result.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
// Now close the pool
client.close();
});
Connecting to PostgreSQL
Most of the time you will use a pool to connect to PostgreSQL:
PgConnectOptions connectOptions = new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
SqlClient client = PgPool.client(connectOptions, poolOptions);
The pooled client uses a connection pool and any operation will borrow a connection from the pool to execute the operation and release it to the pool.
If you are running with Vert.x you can pass it your Vertx instance:
PgConnectOptions connectOptions = new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
SqlClient client = PgPool.client(vertx, connectOptions, poolOptions);
You need to release the client when you don’t need it anymore:
client.close();
When you need to execute several operations on the same connection, you need to acquire a
connection
from a pool.
You can easily get one from the pool:
PgConnectOptions connectOptions = new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
PgPool pool = PgPool.pool(vertx, connectOptions, poolOptions);
// Get a connection from the pool
pool.getConnection().compose(conn -> {
System.out.println("Got a connection from the pool");
// All operations execute on the same connection
return conn
.query("SELECT * FROM users WHERE id='julien'")
.execute()
.compose(res -> conn
.query("SELECT * FROM users WHERE id='emad'")
.execute())
.onComplete(ar -> {
// Release the connection to the pool
conn.close();
});
}).onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("Done");
} else {
System.out.println("Something went wrong " + ar.cause().getMessage());
}
});
Once you are done with the connection you must close it to release it to the pool, so it can be reused.
Command pipelining
In some use cases, command pipelining can improve database access performance.
You can configure the client to use pipelining
PgPool pool = PgPool.pool(vertx, connectOptions.setPipeliningLimit(16), poolOptions);
The default pipelining limit is 256
.
You can set this value to 1
to disable pipelining.
Pool versus pooled client
The PgPool
allows you to create a pool or a pooled client
SqlClient client = PgPool.client(vertx, connectOptions, poolOptions);
// Pipelined
Future<RowSet<Row>> res1 = client.query(sql).execute();
// Connection pool
PgPool pool = PgPool.pool(vertx, connectOptions, poolOptions);
// Not pipelined
Future<RowSet<Row>> res2 = pool.query(sql).execute();
-
pool operations are not pipelined, only connections acquired from the pool are pipelined
-
pooled client operations are pipelined, you cannot acquire a connection from a pooled client
Pool sharing
You can share an pool between multiple verticles or instances of the same verticle. Such pool should be created outside a verticle otherwise it will be closed when the verticle that created it is undeployed
PgPool pool = PgPool.pool(database, new PoolOptions().setMaxSize(maxSize));
vertx.deployVerticle(() -> new AbstractVerticle() {
@Override
public void start() throws Exception {
// Use the pool
}
}, new DeploymentOptions().setInstances(4));
You can also create a shared pool in each verticle:
vertx.deployVerticle(() -> new AbstractVerticle() {
PgPool pool;
@Override
public void start() {
// Get or create a shared pool
// this actually creates a lease to the pool
// when the verticle is undeployed, the lease will be released automaticaly
pool = PgPool.pool(database, new PoolOptions()
.setMaxSize(maxSize)
.setShared(true)
.setName("my-pool"));
}
}, new DeploymentOptions().setInstances(4));
The first time a shared pool is created it will create the resources for the pool. Subsequent calls will reuse this pool and create a lease to this pool. The resources are disposed after all leases have been closed.
By default, a pool reuses the current event-loop when it needs to create a TCP connection. The shared pool will therefore randomly use event-loops of verticles using it.
You can assign a number of event loop a pool will use independently of the context using it
PgPool pool = PgPool.pool(database, new PoolOptions()
.setMaxSize(maxSize)
.setShared(true)
.setName("my-pool")
.setEventLoopSize(4));
Unix domain sockets
Sometimes you want to improve performance via Unix domain socket connection, we achieve this with Vert.x Native transports.
Make sure you have added the required netty-transport-native
dependency in your classpath and enabled the Unix domain socket option.
PgConnectOptions connectOptions = new PgConnectOptions()
.setHost("/var/run/postgresql")
.setPort(5432)
.setDatabase("the-db");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
PgPool client = PgPool.pool(connectOptions, poolOptions);
// Create the pooled client with a vertx instance
// Make sure the vertx instance has enabled native transports
PgPool client2 = PgPool.pool(vertx, connectOptions, poolOptions);
More information can be found in the Vert.x documentation.
Connect retries
You can configure the client to retry when a connection fails to be established.
options
.setReconnectAttempts(2)
.setReconnectInterval(1000);
Configuration
There are several alternatives for you to configure the client.
data object
A simple way to configure the client is to specify a PgConnectOptions
data object.
PgConnectOptions connectOptions = new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool Options
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);
// Create the pool from the data object
PgPool pool = PgPool.pool(vertx, connectOptions, poolOptions);
pool.getConnection(ar -> {
// Handling your connection
});
You can also configure the generic properties with the setProperties
or addProperty
methods. Note setProperties
will override the default client properties.
Note
|
When using this client with CockroachDB DBaaS, the cluster option needs to be included using addProperty("options", "--cluster=<cluster-id>") or in the URL …&options=--cluster%3D<cluster-id>
|
For example, you can set a default schema for the connection with adding a search_path
property.
PgConnectOptions connectOptions = new PgConnectOptions();
// Set the default schema
Map<String, String> props = new HashMap<>();
props.put("search_path", "myschema");
connectOptions.setProperties(props);
More information about the available properties can be found in the PostgreSQL Manuals.
Connection URI
Apart from configuring with a PgConnectOptions
data object, We also provide you an alternative way to connect when you want to configure with a connection URI:
String connectionUri = "postgresql://dbuser:[email protected]:5432/mydb";
// Create the pool from the connection URI
PgPool pool = PgPool.pool(connectionUri);
// Create the connection from the connection URI
PgConnection.connect(vertx, connectionUri, res -> {
// Handling your connection
});
More information about connection string formats can be found in the PostgreSQL Manuals.
Currently, the client supports the following parameter keys:
-
host
-
hostaddr
-
port
-
user
-
password
-
dbname
-
sslmode
-
additional properties, including:
-
application_name
-
fallback_application_name
-
search_path
-
options
-
Note
|
Configuring parameters in connection URI will override the default properties. |
environment variables
You can also use environment variables to set default connection setting values, this is useful when you want to avoid hard-coding database connection information. You can refer to the official documentation for more details. The following parameters are supported:
-
PGHOST
-
PGHOSTADDR
-
PGPORT
-
PGDATABASE
-
PGUSER
-
PGPASSWORD
-
PGSSLMODE
If you don’t specify a data object or a connection URI string to connect, environment variables will take precedence over them.
$ PGUSER=user \
PGHOST=the-host \
PGPASSWORD=secret \
PGDATABASE=the-db \
PGPORT=5432 \
PGSSLMODE=DISABLE
PgPool pool = PgPool.pool();
// Create the connection from the environment variables
PgConnection.connect(vertx, res -> {
// Handling your connection
});
SASL SCRAM-SHA-256 authentication mechanism.
To use the sasl SCRAM-SHA-256 authentication add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>com.ongres.scram</groupId>
<artifactId>client</artifactId>
<version>2.1</version>
</dependency>
-
Gradle (in your
build.gradle
file):
dependencies {
compile 'com.ongres.scram:client:2.1'
}
Note
|
SCRAM-SHA-256-PLUS (added in Postgresql 11) is not supported. |
Running queries
When you don’t need a transaction or run single queries, you can run queries directly on the pool; the pool will use one of its connection to run the query and return the result to you.
Here is how to run simple queries:
client
.query("SELECT * FROM users WHERE id='julien'")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
System.out.println("Got " + result.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Prepared queries
You can do the same with prepared queries.
The SQL string can refer to parameters by position, using the database syntax `$1`, `$2`, etc…
client
.preparedQuery("SELECT * FROM users WHERE id=$1")
.execute(Tuple.of("julien"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Got " + rows.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Query methods provides an asynchronous RowSet
instance that works for SELECT queries
client
.preparedQuery("SELECT first_name, last_name FROM users")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
for (Row row : rows) {
System.out.println("User " + row.getString(0) + " " + row.getString(1));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
or UPDATE/INSERT queries:
client
.preparedQuery("INSERT INTO users (first_name, last_name) VALUES ($1, $2)")
.execute(Tuple.of("Julien", "Viet"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println(rows.rowCount());
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
The Row
gives you access to your data by index
System.out.println("User " + row.getString(0) + " " + row.getString(1));
or by name
System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));
The client will not do any magic here and the column name is identified with the name in the table regardless of how your SQL text is.
You can access a wide variety of of types
String firstName = row.getString("first_name");
Boolean male = row.getBoolean("male");
Integer age = row.getInteger("age");
You can use cached prepared statements to execute one-shot prepared queries:
connectOptions.setCachePreparedStatements(true);
client
.preparedQuery("SELECT * FROM users WHERE id = $1")
.execute(Tuple.of("julien"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Got " + rows.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
You can create a PreparedStatement
and manage the lifecycle by yourself.
sqlConnection
.prepare("SELECT * FROM users WHERE id = $1", ar -> {
if (ar.succeeded()) {
PreparedStatement preparedStatement = ar.result();
preparedStatement.query()
.execute(Tuple.of("julien"), ar2 -> {
if (ar2.succeeded()) {
RowSet<Row> rows = ar2.result();
System.out.println("Got " + rows.size() + " rows ");
preparedStatement.close();
} else {
System.out.println("Failure: " + ar2.cause().getMessage());
}
});
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Batches
You can execute prepared batch
List<Tuple> batch = new ArrayList<>();
batch.add(Tuple.of("julien", "Julien Viet"));
batch.add(Tuple.of("emad", "Emad Alblueshi"));
// Execute the prepared batch
client
.preparedQuery("INSERT INTO USERS (id, name) VALUES ($1, $2)")
.executeBatch(batch, res -> {
if (res.succeeded()) {
// Process rows
RowSet<Row> rows = res.result();
} else {
System.out.println("Batch failed " + res.cause());
}
});
Returning clauses
You can fetch generated keys with a 'RETURNING' clause in your query:
client
.preparedQuery("INSERT INTO color (color_name) VALUES ($1), ($2), ($3) RETURNING color_id")
.execute(Tuple.of("white", "red", "blue"))
.onSuccess(rows -> {
for (Row row : rows) {
System.out.println("generated key: " + row.getInteger("color_id"));
}
});
This works with any SQL as long as there is a RETURNING
clause:
client
.query("DELETE FROM color RETURNING color_name")
.execute()
.onSuccess(rows -> {
for (Row row : rows) {
System.out.println("deleted color: " + row.getString("color_name"));
}
});
A batch query with a RETURNING
clause creates a RowSet
containing a single for each element of the batch:
client
.preparedQuery("INSERT INTO color (color_name) VALUES ($1) RETURNING color_id")
.executeBatch(Arrays.asList(Tuple.of("white"), Tuple.of("red"), Tuple.of("blue")))
.onSuccess(res -> {
for (RowSet<Row> rows = res;rows.next() != null;rows = rows.next()) {
Integer colorId = rows.iterator().next().getInteger("color_id");
System.out.println("generated key: " + colorId);
}
});
Using connections
Getting a connection
When you need to execute sequential queries (without a transaction), you can create a new connection or borrow one from the pool. Remember that between acquiring the connection from the pool and returning it to the pool, you should take care of the connection because it might be closed by the server for some reason such as an idle time out.
pool
.getConnection()
.compose(connection ->
connection
.preparedQuery("INSERT INTO Users (first_name,last_name) VALUES ($1, $2)")
.executeBatch(Arrays.asList(
Tuple.of("Julien", "Viet"),
Tuple.of("Emad", "Alblueshi")
))
.compose(res -> connection
// Do something with rows
.query("SELECT COUNT(*) FROM Users")
.execute()
.map(rows -> rows.iterator().next().getInteger(0)))
// Return the connection to the pool
.eventually(v -> connection.close())
).onSuccess(count -> {
System.out.println("Insert users, now the number of users is " + count);
});
Prepared queries can be created:
connection
.prepare("SELECT * FROM users WHERE first_name LIKE $1")
.compose(pq ->
pq.query()
.execute(Tuple.of("Julien"))
.eventually(v -> pq.close())
).onSuccess(rows -> {
// All rows
});
Simplified connection API
When you use a pool, you can call withConnection
to pass it a function executed
within a connection.
It borrows a connection from the pool and calls the function with this connection.
The function must return a future of an arbitrary result.
After the future completes, the connection is returned to the pool and the overall result is provided.
pool.withConnection(connection ->
connection
.preparedQuery("INSERT INTO Users (first_name,last_name) VALUES ($1, $2)")
.executeBatch(Arrays.asList(
Tuple.of("Julien", "Viet"),
Tuple.of("Emad", "Alblueshi")
))
.compose(res -> connection
// Do something with rows
.query("SELECT COUNT(*) FROM Users")
.execute()
.map(rows -> rows.iterator().next().getInteger(0)))
).onSuccess(count -> {
System.out.println("Insert users, now the number of users is " + count);
});
Using transactions
Transactions with connections
You can execute transaction using SQL BEGIN
/COMMIT
/ROLLBACK
, if you do so you must use
a SqlConnection
and manage it yourself.
Or you can use the transaction API of SqlConnection
:
pool.getConnection()
// Transaction must use a connection
.onSuccess(conn -> {
// Begin the transaction
conn.begin()
.compose(tx -> conn
// Various statements
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute()
.compose(res2 -> conn
.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
.execute())
// Commit the transaction
.compose(res3 -> tx.commit()))
// Return the connection to the pool
.eventually(v -> conn.close())
.onSuccess(v -> System.out.println("Transaction succeeded"))
.onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
});
When the database server reports the current transaction is failed (e.g the infamous current transaction is aborted, commands ignored until
end of transaction block), the transaction is rollbacked and the completion
future
is failed with a TransactionRollbackException
:
tx.completion()
.onFailure(err -> {
System.out.println("Transaction failed => rolled back");
});
Simplified transaction API
When you use a pool, you can call withTransaction
to pass it a function executed
within a transaction.
It borrows a connection from the pool, begins the transaction and calls the function with a client executing all operations in the scope of this transaction.
The function must return a future of an arbitrary result:
-
when the future succeeds the client will commit the transaction
-
when the future fails the client will rollback the transaction
After the transaction completes, the connection is returned to the pool and the overall result is provided.
pool.withTransaction(client -> client
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute()
.flatMap(res -> client
.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
.execute()
// Map to a message result
.map("Users inserted")))
.onSuccess(v -> System.out.println("Transaction succeeded"))
.onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
Cursors and streaming
By default, prepared query execution fetches all rows, you can use a
Cursor
to control the amount of rows you want to read:
connection.prepare("SELECT * FROM users WHERE first_name LIKE $1", ar0 -> {
if (ar0.succeeded()) {
PreparedStatement pq = ar0.result();
// Cursors require to run within a transaction
connection.begin(ar1 -> {
if (ar1.succeeded()) {
Transaction tx = ar1.result();
// Create a cursor
Cursor cursor = pq.cursor(Tuple.of("julien"));
// Read 50 rows
cursor.read(50, ar2 -> {
if (ar2.succeeded()) {
RowSet<Row> rows = ar2.result();
// Check for more ?
if (cursor.hasMore()) {
// Repeat the process...
} else {
// No more rows - commit the transaction
tx.commit();
}
}
});
}
});
}
});
Cursors shall be closed when they are released prematurely:
cursor.read(50, ar2 -> {
if (ar2.succeeded()) {
// Close the cursor
cursor.close();
}
});
A stream API is also available for cursors, which can be more convenient, specially with the Rxified version.
connection.prepare("SELECT * FROM users WHERE first_name LIKE $1", ar0 -> {
if (ar0.succeeded()) {
PreparedStatement pq = ar0.result();
// Streams require to run within a transaction
connection.begin(ar1 -> {
if (ar1.succeeded()) {
Transaction tx = ar1.result();
// Fetch 50 rows at a time
RowStream<Row> stream = pq.createStream(50, Tuple.of("julien"));
// Use the stream
stream.exceptionHandler(err -> {
System.out.println("Error: " + err.getMessage());
});
stream.endHandler(v -> {
// Close the stream to release the resources in the database
stream.close(closed -> {
tx.commit(committed -> {
System.out.println("End of stream");
});
});
});
stream.handler(row -> {
System.out.println("User: " + row.getString("last_name"));
});
}
});
}
});
The stream read the rows by batch of 50
and stream them, when the rows have been passed to the handler,
a new batch of 50
is read and so on.
The stream can be resumed or paused, the loaded rows will remain in memory until they are delivered and the cursor will stop iterating.
Note
|
PostreSQL destroys cursors at the end of a transaction, so the cursor API shall be used
within a transaction, otherwise you will likely get the 34000 PostgreSQL error.
|
Tracing queries
The SQL client can trace query execution when Vert.x has tracing enabled.
The client reports the following client spans:
-
Query
operation name -
tags
-
db.user
: the database username -
db.instance
: the database instance -
db.statement
: the SQL query -
db.type
: sql
The default tracing policy is PROPAGATE
, the client
will only create a span when involved in an active trace.
You can change the client policy with setTracingPolicy
,
e.g you can set ALWAYS
to always report
a span:
options.setTracingPolicy(TracingPolicy.ALWAYS);
PostgreSQL type mapping
Currently the client supports the following PostgreSQL types
-
BOOLEAN (
java.lang.Boolean
) -
INT2 (
java.lang.Short
) -
INT4 (
java.lang.Integer
) -
INT8 (
java.lang.Long
) -
FLOAT4 (
java.lang.Float
) -
FLOAT8 (
java.lang.Double
) -
CHAR (
java.lang.String
) -
VARCHAR (
java.lang.String
) -
TEXT (
java.lang.String
) -
ENUM (
java.lang.String
) -
NAME (
java.lang.String
) -
SERIAL2 (
java.lang.Short
) -
SERIAL4 (
java.lang.Integer
) -
SERIAL8 (
java.lang.Long
) -
NUMERIC (
io.vertx.sqlclient.data.Numeric
) -
UUID (
java.util.UUID
) -
DATE (
java.time.LocalDate
) -
TIME (
java.time.LocalTime
) -
TIMETZ (
java.time.OffsetTime
) -
TIMESTAMP (
java.time.LocalDateTime
) -
TIMESTAMPTZ (
java.time.OffsetDateTime
) -
INTERVAL (
io.vertx.pgclient.data.Interval
) -
BYTEA (
io.vertx.core.buffer.Buffer
) -
JSON (
io.vertx.core.json.JsonObject
,io.vertx.core.json.JsonArray
,Number
,Boolean
,String
,io.vertx.sqlclient.Tuple#JSON_NULL
) -
JSONB (
io.vertx.core.json.JsonObject
,io.vertx.core.json.JsonArray
,Number
,Boolean
,String
,io.vertx.sqlclient.Tuple#JSON_NULL
) -
POINT (
io.vertx.pgclient.data.Point
) -
LINE (
io.vertx.pgclient.data.Line
) -
LSEG (
io.vertx.pgclient.data.LineSegment
) -
BOX (
io.vertx.pgclient.data.Box
) -
PATH (
io.vertx.pgclient.data.Path
) -
POLYGON (
io.vertx.pgclient.data.Polygon
) -
CIRCLE (
io.vertx.pgclient.data.Circle
) -
TSVECTOR (
java.lang.String
) -
TSQUERY (
java.lang.String
) -
INET (
io.vertx.pgclient.data.Inet
) -
MONEY (
io.vertx.pgclient.data.Money
)
Tuple decoding uses the above types when storing values, it also performs on the flu conversion the actual value when possible:
pool
.query("SELECT 1::BIGINT \"VAL\"")
.execute(ar -> {
RowSet<Row> rowSet = ar.result();
Row row = rowSet.iterator().next();
// Stored as java.lang.Long
Object value = row.getValue(0);
// Convert to java.lang.Integer
Integer intValue = row.getInteger(0);
});
Tuple encoding uses the above type mapping for encoding, unless the type is numeric in which case java.lang.Number
is used instead:
pool
.query("SELECT 1::BIGINT \"VAL\"")
.execute(ar -> {
RowSet<Row> rowSet = ar.result();
Row row = rowSet.iterator().next();
// Stored as java.lang.Long
Object value = row.getValue(0);
// Convert to java.lang.Integer
Integer intValue = row.getInteger(0);
});
Arrays of these types are supported.
Handling JSON
PostgreSQL JSON
and JSONB
types are represented by the following Java types:
-
String
-
Number
-
Boolean
-
io.vertx.core.json.JsonObject
-
io.vertx.core.json.JsonArray
-
io.vertx.sqlclient.Tuple#JSON_NULL
for representing the JSON null literal
Tuple tuple = Tuple.of(
Tuple.JSON_NULL,
new JsonObject().put("foo", "bar"),
3);
// Retrieving json
Object value = tuple.getValue(0); // Expect JSON_NULL
//
value = tuple.get(JsonObject.class, 1); // Expect JSON object
//
value = tuple.get(Integer.class, 2); // Expect 3
value = tuple.getInteger(2); // Expect 3
Handling NUMERIC
The Numeric
Java type is used to represent the PostgreSQL NUMERIC
type.
Numeric numeric = row.get(Numeric.class, 0);
if (numeric.isNaN()) {
// Handle NaN
} else {
BigDecimal value = numeric.bigDecimalValue();
}
Handling Date/Time infinity
PostgreSQL defines special values to represent infinity.
The max/min constants of the corresponding type represents special value.
-
OffsetDateTime.MAX
/OffsetDateTime.MIN` -
LocalDateTime.MAX
/LocalDateTime.MIN` -
LocalDate.MAX
/LocalDate.MIN`
client
.query("SELECT 'infinity'::DATE \"LocalDate\"")
.execute(ar -> {
if (ar.succeeded()) {
Row row = ar.result().iterator().next();
System.out.println(row.getLocalDate("LocalDate").equals(LocalDate.MAX));
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Handling custom types
Strings are used to represent custom types, both sent to and returned from Postgres.
You can read from PostgreSQL and get the custom type as a string
client
.preparedQuery("SELECT address, (address).city FROM address_book WHERE id=$1")
.execute(Tuple.of(3), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
for (Row row : rows) {
System.out.println("Full Address " + row.getString(0) + ", City " + row.getString(1));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
You can also write to PostgreSQL by providing a string
client
.preparedQuery("INSERT INTO address_book (id, address) VALUES ($1, $2)")
.execute(Tuple.of(3, "('Anytown', 'Second Ave', false)"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println(rows.rowCount());
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Handling text search
Text search is handling using java String
client
.preparedQuery("SELECT to_tsvector( $1 ) @@ to_tsquery( $2 )")
.execute(Tuple.of("fat cats ate fat rats", "fat & rat"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
for (Row row : rows) {
System.out.println("Match : " + row.getBoolean(0));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
tsvector
and tsquery
can be fetched from db using java String
client
.preparedQuery("SELECT to_tsvector( $1 ), to_tsquery( $2 )")
.execute(Tuple.of("fat cats ate fat rats", "fat & rat"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
for (Row row : rows) {
System.out.println("Vector : " + row.getString(0) + ", query : "+row.getString(1));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Handling enumerated types
PostgreSQL enumerated types are mapped to java strings.
client
.preparedQuery("INSERT INTO colors VALUES ($2)")
.execute(Tuple.of("red"), res -> {
// ...
});
Using Java enum types
You can map Java enum types to these column types:
-
Strings (VARCHAR, TEXT)
-
PosgreSQL enumerated types
-
Numbers (INT2, INT4, INT8)
client
.preparedQuery("INSERT INTO colors VALUES ($1)")
.execute(Tuple.of(Color.red))
.flatMap(res ->
client
.preparedQuery("SELECT color FROM colors")
.execute()
).onComplete(res -> {
if (res.succeeded()) {
RowSet<Row> rows = res.result();
for (Row row : rows) {
System.out.println(row.get(Color.class, "color"));
}
}
});
String and PostgreSQL enumerated types are matched with the Java enum’s name returned by the name()
method.
Numbers types are matched with the Java enum’s ordinal returned by the ordinal()
method.
Collector queries
You can use Java collectors with the query API:
Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
row -> row.getLong("id"),
row -> row.getString("last_name"));
// Run the query with the collector
client.query("SELECT * FROM users")
.collecting(collector)
.execute(ar -> {
if (ar.succeeded()) {
SqlResult<Map<Long, String>> result = ar.result();
// Get the map created by the collector
Map<Long, String> map = result.value();
System.out.println("Got " + map);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
The collector processing must not keep a reference on the Row
as
there is a single row used for processing the entire set.
The Java Collectors
provides many interesting predefined collectors, for example you can
create easily create a string directly from the row set:
Collector<Row, ?, String> collector = Collectors.mapping(
row -> row.getString("last_name"),
Collectors.joining(",", "(", ")")
);
// Run the query with the collector
client.query("SELECT * FROM users").collecting(collector).execute(ar -> {
if (ar.succeeded()) {
SqlResult<String> result = ar.result();
// Get the string created by the collector
String list = result.value();
System.out.println("Got " + list);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Pub/sub
PostgreSQL supports pub/sub communication channels.
You can set a notificationHandler
to receive
PostgreSQL notifications:
connection.notificationHandler(notification -> {
System.out.println("Received " + notification.getPayload() + " on channel " + notification.getChannel());
});
connection
.query("LISTEN some-channel")
.execute(ar -> {
System.out.println("Subscribed to channel");
});
The PgSubscriber
is a channel manager managing a single connection that
provides per channel subscription:
PgSubscriber subscriber = PgSubscriber.subscriber(vertx, new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
);
// You can set the channel before connect
subscriber.channel("channel1").handler(payload -> {
System.out.println("Received " + payload);
});
subscriber.connect(ar -> {
if (ar.succeeded()) {
// Or you can set the channel after connect
subscriber.channel("channel2").handler(payload -> {
System.out.println("Received " + payload);
});
}
});
The channel name that is given to the channel method will be the exact name of the channel as held by PostgreSQL for sending
notifications. Note this is different than the representation of the channel name in SQL, and
internally PgSubscriber
will prepare the submitted channel name as a quoted identifier:
PgSubscriber subscriber = PgSubscriber.subscriber(vertx, new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
);
subscriber.connect(ar -> {
if (ar.succeeded()) {
// Complex channel name - name in PostgreSQL requires a quoted ID
subscriber.channel("Complex.Channel.Name").handler(payload -> {
System.out.println("Received " + payload);
});
subscriber.channel("Complex.Channel.Name").subscribeHandler(subscribed -> {
subscriber.actualConnection()
.query("NOTIFY \"Complex.Channel.Name\", 'msg'")
.execute(notified -> {
System.out.println("Notified \"Complex.Channel.Name\"");
});
});
// PostgreSQL simple ID's are forced lower-case
subscriber.channel("simple_channel").handler(payload -> {
System.out.println("Received " + payload);
});
subscriber.channel("simple_channel").subscribeHandler(subscribed -> {
// The following simple channel identifier is forced to lower case
subscriber.actualConnection()
.query("NOTIFY Simple_CHANNEL, 'msg'")
.execute(notified -> {
System.out.println("Notified simple_channel");
});
});
// The following channel name is longer than the current
// (NAMEDATALEN = 64) - 1 == 63 character limit and will be truncated
subscriber.channel("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbb")
.handler(payload -> {
System.out.println("Received " + payload);
});
}
});
You can provide a reconnect policy as a function that takes the number of retries
as argument and returns an amountOfTime
value:
-
when
amountOfTime < 0
: the subscriber is closed and there is no retry -
when
amountOfTime = 0
: the subscriber retries to connect immediately -
when
amountOfTime > 0
: the subscriber retries afteramountOfTime
milliseconds
PgSubscriber subscriber = PgSubscriber.subscriber(vertx, new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
);
// Reconnect at most 10 times after 100 ms each
subscriber.reconnectPolicy(retries -> {
if (retries < 10) {
return 100L;
} else {
return -1L;
}
});
The default policy is to not reconnect.
Notice messages
PostgreSQL can send notice message during the lifetime of a connection.
By default, such messages are logged on the console as warns.
You can set a handler on a connection to catch them and do something useful with them.
connection.noticeHandler(notice -> {
System.out.println("Received notice " + notice.getSeverity() + "" + notice.getMessage());
});
Cancelling Request
PostgreSQL supports cancellation of requests in progress. You can cancel inflight requests using cancelRequest
. Cancelling a request opens a new connection to the server and cancels the request and then close the connection.
connection
.query("SELECT pg_sleep(20)")
.execute(ar -> {
if (ar.succeeded()) {
// imagine this is a long query and is still running
System.out.println("Query success");
} else {
// the server will abort the current query after cancelling request
System.out.println("Failed to query due to " + ar.cause().getMessage());
}
});
connection.cancelRequest(ar -> {
if (ar.succeeded()) {
System.out.println("Cancelling request has been sent");
} else {
System.out.println("Failed to send cancelling request");
}
});
The cancellation signal might or might not have any effect — for example, if it arrives after the backend has finished processing the query, then it will have no effect. If the cancellation is effective, it results in the current command being terminated early with an error message.
More information can be found in the official documentation.
Using SSL/TLS
To configure the client to use SSL connection, you can configure the PgConnectOptions
like a Vert.x NetClient
.
All SSL modes are supported and you are able to configure sslmode
. The client is in DISABLE
SSL mode by default.
ssl
parameter is kept as a mere shortcut for setting sslmode
. setSsl(true)
is equivalent to setSslMode(VERIFY_CA)
and setSsl(false)
is equivalent to setSslMode(DISABLE)
.
PgConnectOptions options = new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
.setSslMode(SslMode.VERIFY_CA)
.setPemTrustOptions(new PemTrustOptions().addCertPath("/path/to/cert.pem"));
PgConnection.connect(vertx, options, res -> {
if (res.succeeded()) {
// Connected with SSL
} else {
System.out.println("Could not connect " + res.cause());
}
});
More information can be found in the Vert.x documentation.
Using a proxy
You can also configure the client to use an HTTP/1.x CONNECT, SOCKS4a or SOCKS5 proxy.
More information can be found in the Vert.x documentation.
Advanced pool configuration
Server load balancing
You can configure the pool with a list of servers instead of a single server.
PgPool pool = PgPool.pool(Arrays.asList(server1, server2, server3), options);
The pool uses a round-robin load balancing when a connection is created to select different servers.
Note
|
this provides load balancing when the connection is created and not when the connection is borrowed from the pool. |
Pool connection initialization
You can use the connectHandler
to interact with a connection after it
has been created and before it is inserted in the pool.
pool.connectHandler(conn -> {
conn.query(sql).execute().onSuccess(res -> {
// Release the connection to the pool, ready to be used by the application
conn.close();
});
});
Once you are done with the connection, you should simply close it to signal the pool to use it.
RxJava 3 API
The client provides an Rxified version of the original API. The following examples use RxJava 3.
Simple query
Single<RowSet<Row>> single = pool.query("SELECT * FROM users WHERE id='julien'").rxExecute();
// Execute the query
single.subscribe(result -> {
System.out.println("Got " + result.size() + " rows ");
}, err -> {
System.out.println("Failure: " + err.getMessage());
});
Connection
The simplified connection API allows to easily use a connection, the withConnection
method borrows
a connection from the pool and return it for you:
Maybe<RowSet<Row>> maybe = pool.withConnection(conn ->
conn
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.rxExecute()
.flatMap(result -> conn
.query("SELECT * FROM Users")
.rxExecute())
.toMaybe());
maybe.subscribe(rows -> {
// Success
}, err -> {
// Failed
});
Transaction
The simplified transaction API allows to easily write transactional asynchronous flows, The withTransaction
method start and commit a transaction for you:
Completable completable = pool.withTransaction(conn ->
conn
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.rxExecute()
.flatMap(result -> conn
.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
.rxExecute())
.toMaybe())
.ignoreElement();
completable.subscribe(() -> {
// Transaction succeeded
}, err -> {
// Transaction failed
});
Streaming
RxJava supports Observable
and Flowable
types, these are exposed using
the RowStream
that you can get
from a PreparedQuery
:
Observable<Row> observable = pool.rxGetConnection().flatMapObservable(conn -> conn
.rxBegin()
.flatMapObservable(tx ->
conn
.rxPrepare("SELECT * FROM users WHERE first_name LIKE $1")
.flatMapObservable(preparedQuery -> {
// Fetch 50 rows at a time
RowStream<Row> stream = preparedQuery.createStream(50, Tuple.of("julien"));
return stream.toObservable();
})
.doAfterTerminate(tx::commit)));
// Then subscribe
observable.subscribe(row -> {
System.out.println("User: " + row.getString("last_name"));
}, err -> {
System.out.println("Error: " + err.getMessage());
}, () -> {
System.out.println("End of stream");
});
The same example using Flowable
:
Flowable<Row> flowable = pool.rxGetConnection().flatMapPublisher(conn -> conn
.rxBegin()
.flatMapPublisher(tx ->
conn
.rxPrepare("SELECT * FROM users WHERE first_name LIKE $1")
.flatMapPublisher(preparedQuery -> {
// Fetch 50 rows at a time
RowStream<Row> stream = preparedQuery.createStream(50, Tuple.of("julien"));
return stream.toFlowable();
})
.doAfterTerminate(tx::commit)));
// Then subscribe
flowable.subscribe(new Subscriber<Row>() {
private Subscription sub;
@Override
public void onSubscribe(Subscription subscription) {
sub = subscription;
subscription.request(1);
}
@Override
public void onNext(Row row) {
sub.request(1);
System.out.println("User: " + row.getString("last_name"));
}
@Override
public void onError(Throwable err) {
System.out.println("Error: " + err.getMessage());
}
@Override
public void onComplete() {
System.out.println("End of stream");
}
});