Rust client for QuestDB
The QuestDB Rust client ingests and queries data over
QWP, a columnar binary
protocol carried over WebSocket. The single entry point is a connection pool,
QuestDb. Open it once, then borrow:
- a column-major writer for bulk/columnar ingestion (
borrow_column_sender), - a row-major writer for event-at-a-time ingestion (
borrow_row_sender), - a reader for SQL queries (
borrow_reader).
QuestDb and the three Borrowed* handles are re-exported at the crate root:
use questdb::QuestDb;.
Cargo setup
cargo add questdb-rs
[dependencies]
# Writing works with default features. Querying needs `sync-reader-qwp-ws`.
questdb-rs = { version = "7", features = ["sync-reader-qwp-ws"] }
| Feature | Default | Enables |
|---|---|---|
sync-sender | ✅ | The ILP/TCP, ILP/HTTP and QWP/WebSocket senders — including the pool's column-major and row-major writers. (QWP/UDP is best-effort and opt-in via sync-sender-qwp-udp; the pool does not use it.) |
sync-reader-qwp-ws | ❌ | The query reader (QuestDb::borrow_reader, Reader, Cursor). Required for reads. |
sync-reader-zstd | ❌ | Decompress zstd-compressed result batches on the read path. |
ndarray | ❌ | Buffer::column_arr from ndarray views (row-major arrays). |
rust_decimal / bigdecimal | ❌ | Buffer::column_dec* from those decimal types (string decimals work without). |
chrono-timestamp | ❌ | Build timestamps from chrono::DateTime. |
arrow (arrow-ingress / arrow-egress) | ❌ | Apache Arrow: db.flush_arrow_batch (ingest) and Cursor::*_arrow (read). The umbrella arrow enables both directions; pick one direction to stay lean. |
polars (polars-ingress / polars-egress) | ❌ | polars: db.flush_polars_dataframe (ingest) and Cursor::*_polars (read). Umbrella enables both; directional features available. |
tls-native-certs | ❌ | Validate TLS against the OS certificate store. |
Direction tip: ingest methods (flush_*) need the *-ingress feature; query
methods (Cursor::*) need the *-egress feature. The arrow / polars umbrellas
enable both directions — enable a single directional feature (e.g. arrow-egress)
to keep a read-only or write-only build lean.
The pre-rename names sync-reader-ws, compression-zstd, and chrono_timestamp
still work as deprecated aliases but will be removed in the next major — prefer
the names above.
Add features = ["sync-reader-qwp-ws"]. Without it, QuestDb::borrow_reader and
the egress module do not exist and reads will not compile.
Quick start
use questdb::{QuestDb, ingress::column_sender::{Chunk, AckLevel}};
use std::time::Duration;
fn main() -> questdb::Result<()> {
let db = QuestDb::connect("ws::addr=localhost:9000;")?;
// write a batch of trades (column-major)
let mut sender = db.borrow_column_sender()?;
let mut chunk = Chunk::new("trades");
chunk.column_f64("price", &[2615.54, 2616.00], None)?;
chunk.designated_timestamp_nanos(&[1_700_000_000_000_000_000, 1_700_000_000_001_000_000])?;
sender.flush(&mut chunk)?; // queued for delivery (a background thread sends it)
sender.wait(AckLevel::Ok, Duration::ZERO)?; // optional: block until the server has it
Ok(())
}
That is the whole write path: open the pool, borrow a sender, flush a chunk. To
read the data back, add features = ["sync-reader-qwp-ws"] and use a reader —
see Querying data.
API overview
| Concern | Type | Obtained via |
|---|---|---|
| Connection pool | QuestDb | QuestDb::connect(conf) |
| Column-major writer | SfColumnSender (store-and-forward) | db.borrow_column_sender() |
| Row-major writer | BorrowedRowSender (derefs to Sender) | db.borrow_row_sender() |
| Query reader | BorrowedReader (derefs to Reader) | db.borrow_reader() |
| Column batch | Chunk | Chunk::new(table) |
| Row buffer | Buffer | sender.new_buffer() |
| Query builder | ReaderQuery | reader.prepare(sql) |
| Streaming result | Cursor → BatchView → ColumnView | reader.prepare(sql).execute() |
Each kind of borrow draws from its own pool within the same QuestDb, capped
independently by pool_max, so heavy ingest can't starve queries. All borrows recycle on Drop and are !Send — each belongs to the
thread that took it. The pool handle is Send/Sync.
QuestDb methods:
fn connect(conf: &str) -> questdb::Result<QuestDb>
fn borrow_column_sender(&self) -> questdb::Result<SfColumnSender<'_>>
fn borrow_row_sender(&self) -> questdb::Result<BorrowedRowSender<'_>>
fn borrow_reader(&self) -> questdb::egress::error::Result<BorrowedReader<'_>> // needs `sync-reader-qwp-ws`
fn flush_arrow_batch<T>(&self, table: T, batch: &RecordBatch, ts_col: Option<ColumnName>, overrides: &[ArrowColumnOverride], ack: Option<AckLevel>) -> Result<()> // feature `arrow-ingress`
fn flush_polars_dataframe<T>(&self, table: T, df: &DataFrame, opts: &PolarsIngestOptions) -> Result<()> // feature `polars-ingress`
fn reap_idle(&self) -> usize
fn close(self)
Connecting
use questdb::QuestDb;
let db = QuestDb::connect("ws::addr=localhost:9000;")?;
Use ws (plain) or wss (TLS); qwpws / qwpwss are accepted aliases. These
are the only supported schemes. For the full connect-string grammar, see the
connect string reference.
QuestDb::connect is the only constructor: every option — pool sizing, auth,
TLS, store-and-forward, failover, compression — is a connect-string key, so one
string fully configures the client. There is no separate builder API.
connect is lazy: it validates the connect string and starts the pool but
opens no connection. The first borrow (borrow_column_sender /
borrow_row_sender / borrow_reader, or a db.flush_* DataFrame call) opens the
connection, so an unreachable server, TLS, or auth failure surfaces there — not
at connect. By default that first attempt fails fast; set
initial_connect_retry=on to retry it within the reconnect budget instead.
Pool keys
| Key | Default | Meaning |
|---|---|---|
pool_size | 1 | Warm minimum the reaper keeps once connections exist. Opened lazily on first borrow, not at connect(). |
pool_max | 64 | Hard cap on auto-grow. Borrowing at the cap returns an error. |
pool_idle_timeout_ms | 60000 | Idle connections above pool_size are closed after this long. |
pool_reap | auto | auto runs a background reaper; manual requires calling reap_idle(). |
initial_connect_retry | off | How the first borrow's connect handles failure: off fails fast; on retries within the reconnect budget; async retries in the background. |
Authentication and TLS
Pass auth/TLS in the connect string:
// Basic auth over TLS
let db = QuestDb::connect("wss::addr=db:9000;username=admin;password=quest;")?;
// Bearer token (Enterprise)
let db = QuestDb::connect("wss::addr=db:9000;token=your_bearer_token;")?;
tls_ca selects the root store: webpki_roots (default), os_roots,
webpki_and_os_roots; tls_roots=/path/ca.pem loads a PEM; tls_verify=unsafe_off
disables verification (testing only).
Both the column-major and row-major senders are always store-and-forward:
flush() lands in a local durable queue instead of blocking on the server, and a
background thread delivers it for you — reconnecting, rotating endpoints, and
replaying as needed. By default the queue is in-memory; set sf_dir to make it
disk-backed, so it survives a client restart. In disk-backed mode the column
pool is single-borrower — an explicit pool_size > 1 or pool_max > 1 is
rejected, and an omitted pool_max is treated as 1. sender_id and the other
sf_* keys require an explicit sf_dir.
Sending data: column-major
Borrow a column sender, build a Chunk of columns (each a slice), set the
designated timestamp, then flush. Each flush lands in a durable
store-and-forward queue and returns immediately — there is no server round-trip
on the hot path. A background thread delivers the queue for you, so you normally
never call wait; reach for it only when your own code must block until the data
has reached the server.
use questdb::{QuestDb, ingress::column_sender::{Chunk, AckLevel}};
use std::time::Duration;
fn main() -> questdb::Result<()> {
let db = QuestDb::connect("ws::addr=localhost:9000;")?;
let mut sender = db.borrow_column_sender()?;
let price = [2615.54_f64, 2616.00, 2617.25];
let amount = [0.00044_f64, 0.00050, 0.00021];
let ts_ns = [1_700_000_000_000_000_000_i64, 1_700_000_000_001_000_000, 1_700_000_000_002_000_000];
let mut chunk = Chunk::new("trades");
chunk.column_f64("price", &price, None)?; // (name, &[T], validity)
chunk.column_f64("amount", &amount, None)?;
chunk.designated_timestamp_nanos(&ts_ns)?;
sender.flush(&mut chunk)?; // publish to the durable queue; chunk cleared on success
sender.wait(AckLevel::Ok, Duration::ZERO)?; // optional: block until the server has received it (ZERO = wait forever)
Ok(())
}
borrow_column_sender() returns an SfColumnSender — a store-and-forward
handle with a deliberately small surface:
fn flush(&mut self, chunk: &mut Chunk) -> Result<()> // publish to the durable queue; auto-clears the chunk; no round-trip
fn wait(&mut self, ack_level: AckLevel, timeout: Duration) -> Result<()> // block until `ack_level` (or `timeout` of no ack progress)
fn must_close(&self) -> bool
fn mark_must_close(&mut self)
AckLevel::Ok waits for the server to accept every published frame;
AckLevel::Durable waits for durable storage and requires the pool to have
been opened with request_durable_ack=on (Enterprise). For whole-RecordBatch
Arrow or polars DataFrame ingestion, prefer the pool-level db.flush_arrow_batch
/ db.flush_polars_dataframe (see DataFrame and Arrow ingestion below).
wait() is rarely neededflush() does not round-trip — it appends the batch to a client-side
store-and-forward queue and returns. A background thread delivers the queue
for you: it reconnects, rotates across endpoints, and replays across client
restarts when the pool is disk-backed (sf_dir). A successful flush()
already means "safely queued for delivery" — you do not call wait() to make
delivery happen.
Reach for wait() only when your own application logic cannot proceed until the
data has reached the server:
wait(AckLevel::Ok, timeout)blocks until the server has accepted every frame published so far.wait(AckLevel::Durable, timeout)blocks until those frames are durably stored (Enterprise, needsrequest_durable_ack=on).
timeout is a no-progress deadline — it fires only if the ack watermark
stops advancing for that long (Duration::ZERO waits indefinitely); on expiry
wait() returns ErrorCode::FailoverRetry and the queued frames are kept for
replay.
wait() does not yet guarantee query visibility. It confirms the server has
received the data, not that it is queryable. A read-your-write guarantee is
planned but not implemented — do not rely on wait() to make rows immediately
selectable.
Throughput. flush() never round-trips, so a few rows per flush is fine —
but each flush is still a wire frame and a queue append. If your source trickles
tiny batches, accumulate rows into larger chunks and flush less often to amortise
the per-frame overhead, then wait() once per batch if you need confirmation.
The only calls that ever block are wait() and backpressure when you sustainably
outrun the server.
Reuse the chunk across flushes
Reuse one Chunk — do not allocate per batch. On a successful flush it is
cleared but keeps its capacity; on failure it is left untouched. The chunk
borrows your slices, so they only need to outlive each flush.
let mut chunk = Chunk::new("trades");
for batch in incoming_batches() {
chunk.column_f64("price", &batch.price, None)?;
chunk.column_f64("amount", &batch.amount, None)?;
chunk.designated_timestamp_nanos(&batch.ts_ns)?;
sender.flush(&mut chunk)?; // clears the chunk
}
sender.wait(AckLevel::Ok, Duration::ZERO)?; // confirm the whole run (optional)
Chunk column setters
Every setter is fn(&mut self, name: &str, data: &[T], validity: Option<&Validity>) -> Result<&mut Self>
unless noted, and all columns plus the timestamp must share the same row count.
validity = None means no nulls.
| QuestDB type | Chunk method | data slice |
|---|---|---|
BYTE/SHORT/INT/LONG | column_i8 / column_i16 / column_i32 / column_i64 | &[i8/i16/i32/i64] |
FLOAT / DOUBLE | column_f32 / column_f64 | &[f32] / &[f64] |
BOOLEAN | column_bool(name, data, row_count, validity) | bit-packed &[u8] (LSB-first) + explicit row_count |
TIMESTAMP (micros) / timestamp_ns | column_ts_micros / column_ts_nanos | &[i64] |
DATE | column_date_millis | &[i64] |
UUID | column_uuid | &[[u8; 16]] |
LONG256 | column_long256 | &[[u8; 32]] |
IPV4 | column_ipv4 | &[u32] |
VARCHAR | column_varchar(name, offsets: &[i32], bytes: &[u8], validity) | Arrow Utf8: row_count+1 offsets + UTF-8 bytes (column_varchar_large for &[i64] offsets) |
BINARY | column_binary(name, offsets: &[i32], bytes: &[u8], validity) | same offset+byte layout |
SYMBOL | symbol_dict_i32 (or _i8/_i16, _large_*) | codes: &[i32] + dict_offsets: &[i32] + dict_bytes: &[u8] |
| designated timestamp | designated_timestamp_nanos / _micros / _millis / _seconds | &[i64] (no validity) |
Other Chunk methods: new(table), table(), row_count(), is_empty(),
clear().
Sending data: row-major
For event-at-a-time ingestion, borrow a row sender. BorrowedRowSender derefs to
Sender, so the classic Buffer API works unchanged.
use questdb::{QuestDb, ingress::timestamp::TimestampNanos};
fn main() -> questdb::Result<()> {
let db = QuestDb::connect("ws::addr=localhost:9000;")?;
let mut sender = db.borrow_row_sender()?;
let mut buffer = sender.new_buffer();
for (sym, price, amount) in [("ETH-USD", 2615.54, 0.00044), ("BTC-USD", 65432.10, 0.00120)] {
buffer
.table("trades")?
.symbol("symbol", sym)?
.column_f64("price", price)?
.column_f64("amount", amount)?
.at(TimestampNanos::now())?; // .at_now() = server-assigned timestamp
}
sender.flush(&mut buffer)?; // sends + clears the buffer
Ok(())
}
Tables/columns are auto-created. A row is committed to the buffer only on
at/at_now. flush publishes every completed row to the durable queue and
clears the buffer (flush_and_keep retains it). Reuse the Buffer across
flushes — it keeps its capacity.
The row sender is store-and-forward too: flush() publishes to the local queue
and a background thread delivers it — you do not call wait() for delivery to
happen. Use the same wait() the column sender uses only when your code must
block until the data has reached the server:
use questdb::ingress::AckLevel;
use std::time::Duration;
sender.flush(&mut buffer)?; // publish to the durable queue
sender.wait(AckLevel::Ok, Duration::ZERO)?; // optional: block until the server has received it
wait() blocks until every frame published so far reaches ack_level
(Ok = accepted by the server, Durable = durably stored); timeout is a
no-progress deadline (Duration::ZERO waits indefinitely). As on the column
sender, wait() confirms receipt, not query visibility (a future guarantee).
For non-blocking progress instead of a barrier, use flush_and_get_fsn()
(publish + return this frame's FSN), then compare published_fsn() (highest
sent) against acked_fsn() (highest server-confirmed).
Buffer column setters
Each takes the column name first and returns Result<&mut Buffer> so calls chain.
Every setter has an _opt variant taking Option<T> that is a no-op on None
(how you leave a value null on a row).
| QuestDB type | Buffer setter |
|---|---|
SYMBOL | symbol(name, &str) |
BOOLEAN | column_bool(name, bool) |
BYTE/SHORT/INT/LONG | column_i8 / column_i16 / column_i32 / column_i64 |
FLOAT / DOUBLE | column_f32(name, f32) / column_f64(name, f64) |
VARCHAR | column_str(name, &str) |
DECIMAL (≤256-bit) | column_dec / column_dec64 / column_dec128 (&str, or rust_decimal/bigdecimal with those features) |
CHAR | column_char(name, u16) (UTF-16 code unit) |
UUID | column_uuid(name, lo: u64, hi: u64) |
LONG256 | column_long256(name, &[u8; 32]) |
IPV4 | column_ipv4(name, Ipv4Addr) |
DATE | column_date(name, millis: i64) |
BINARY | column_binary(name, &[u8]) |
GEOHASH | column_geohash(name, bits: u64, precision_bits: u8) |
DOUBLE[] (arrays) | column_arr(name, &view) — slices/vecs (≤3D) and ndarray views (needs ndarray) |
TIMESTAMP (non-designated) | column_ts(name, TimestampMicros / TimestampNanos) |
| designated timestamp | at(TimestampMicros / TimestampNanos) or at_now() |
Useful Sender methods (via deref): new_buffer(), flush(&mut Buffer),
flush_and_keep(&Buffer), wait(ack_level, timeout), flush_and_get_fsn,
published_fsn/acked_fsn, poll_qwp_ws_error(), must_close().
DataFrame and Arrow ingestion
With the arrow or polars feature, ingest a whole Arrow RecordBatch or
polars DataFrame in one call on the pool — no sender, no chunk, no buffer.
These helpers borrow a connection internally, drive the frame, and return it.
table accepts anything convertible into a TableName (a bare &str works).
Unlike the store-and-forward row- and column-major senders, these two calls are synchronous and blocking and do not use the on-client queue: they drive a direct connection and return only once the data has been delivered to the server.
use questdb::ingress::AckLevel;
// feature = "arrow-ingress" (or the `arrow` umbrella).
// ts_col: Some(col) sources the designated timestamp from that column; None lets
// the server stamp each row. overrides: &[ArrowColumnOverride] (&[] = none).
// ack: None uses the pool default (Durable if request_durable_ack=on, else Ok).
db.flush_arrow_batch("trades", &record_batch, None, &[], None)?;
// feature = "polars-ingress" (or the `polars` umbrella)
use questdb::ingress::polars::PolarsIngestOptions;
db.flush_polars_dataframe("trades", &df, &PolarsIngestOptions::new().max_rows(10_000))?;
PolarsIngestOptions is a builder (all default off): .max_rows(n) slices the
frame into batches, .timestamp_column(name) picks the designated timestamp,
.overrides(&[…]) remaps column interpretation (e.g. treat a Utf8 column as
SYMBOL).
Because they block until delivery, each owns its failover handling inline:
flush_polars_dataframe re-drives the uncommitted tail onto a live endpoint
across a transient failure, while flush_arrow_batch surfaces the error for you
to re-call (the batch is yours to resend). Delivery is at-least-once — a replayed
tail can duplicate rows, so cover the table with
deduplication upsert keys.
Querying data
Get a reader, prepare SQL (optionally binding parameters), execute to a
Cursor, then pull BatchViews and read typed columns.
use questdb::QuestDb;
use questdb::egress::column::ColumnView;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let db = QuestDb::connect("ws::addr=localhost:9000;")?;
let mut reader = db.borrow_reader()?; // BorrowedReader, derefs to Reader
let mut cursor = reader
.prepare("SELECT ts, price, symbol FROM trades WHERE price > $1")
.bind_f64(2615.0)
.execute()?;
while let Some(batch) = cursor.next_batch()? {
let ColumnView::Timestamp(ts) = batch.column(0)? else { unreachable!() };
let ColumnView::Double(price) = batch.column(1)? else { unreachable!() };
let ColumnView::Symbol(symbol) = batch.column(2)? else { unreachable!() };
for r in 0..batch.row_count() {
if price.is_null(r) { continue; }
println!("{} {} {:?}", ts.value(r), price.value(r), symbol.resolve(r));
}
}
Ok(())
}
Reader → Cursor methods
// Reader (and BorrowedReader via deref)
fn prepare(&mut self, sql: impl AsRef<str>) -> ReaderQuery<'_> // build a parametrised query
fn execute(&mut self, sql: impl AsRef<str>) -> Result<Cursor<'_>> // shortcut, no binds
fn server_info(&self) -> Option<&ServerInfo>
fn server_version(&self) -> Result<u8>
// Cursor
fn next_batch(&mut self) -> Result<Option<BatchView<'_>>> // None = end of stream
fn terminal(&self) -> Option<&Terminal> // Some after RESULT_END/EXEC_DONE
fn cancel(&mut self) -> Result<()>
fn request_id(&self) -> i64
// with feature = "polars-egress" (or the `polars` umbrella):
fn next_polars(&mut self) -> Result<Option<DataFrame>> // one batch as a DataFrame
fn iter_polars(&mut self) -> Result<CursorPolarsIter<'_, '_>> // drift-checked iterator of per-batch DataFrames
fn fetch_all_polars(&mut self) -> Result<DataFrame> // whole result as one DataFrame
// with feature = "arrow-egress" (or the `arrow` umbrella):
fn next_arrow_batch(&mut self) -> Result<Option<RecordBatch>>
fn fetch_all_arrow(&mut self) -> Result<(SchemaRef, Vec<RecordBatch>)>
Parameter binds
reader.prepare(sql) returns a ReaderQuery; chain bind_* (positional, in
order, matching $1, $2, …) then .execute():
let mut cursor = reader
.prepare("SELECT * FROM trades WHERE symbol = $1 AND ts >= $2")
.bind_varchar("ETH-USD")
.bind_timestamp_micros(1_700_000_000_000_000)
.execute()?;
Available binds: bind_bool, bind_i8/i16/i32/i64, bind_f32/f64,
bind_varchar(&str), bind_binary(&[u8]), bind_timestamp_micros/nanos(i64),
bind_date_millis(i64), bind_uuid([u8;16]), bind_long256([u8;32]),
bind_char(u16), bind_ipv4(Ipv4Addr), bind_decimal64(i64, scale),
bind_decimal128(i128, scale), bind_decimal256([u8;32], scale),
bind_geohash(u64, precision_bits), and bind_null_* variants. initial_credit(u64),
on_failover_reset/on_failover_progress configure streaming/failover.
Reading columns
batch.column(idx) returns a typed ColumnView. Match it to the concrete column,
then index per row. Every column type exposes len(), is_null(row), and
validity(); ColumnView itself also has kind(), len(), and is_null(row).
ColumnView variant | Inner type | Read a value |
|---|---|---|
Boolean/Byte/Short/Int/Long/Float/Double | FixedColumn<T> (u8/i8/i16/i32/i64/f32/f64) | col.value(row) -> T |
Timestamp/TimestampNanos/Date | FixedColumn<i64> | col.value(row) -> i64 |
Char | FixedColumn<u16> | col.value(row) -> u16 |
Ipv4 | FixedColumn<u32> | col.value(row) -> u32 (host-order) |
Symbol | SymbolColumn | col.resolve(row) -> Option<&str> |
Varchar | VarcharColumn | col.value(row) -> Option<&str> |
Binary | BinaryColumn | col.value(row) -> Option<&[u8]> |
Uuid | UuidColumn | col.value(row) -> &[u8; 16] |
Long256 | Long256Column | col.value(row) -> &[u8; 32] |
Decimal64/Decimal128/Decimal256 | Decimal*Column | see column docs (value + scale) |
Geohash | GeohashColumn | bits + precision |
DoubleArray/LongArray | DoubleArrayColumn/LongArrayColumn | per-row array slices |
FixedColumn<T>::value(row) returns the raw T even on null rows — always check
is_null(row) first (or use col.iter()). SymbolColumn::resolve,
VarcharColumn::value, and BinaryColumn::value return Option, which is None
on a null row.
With the polars (or polars-egress) feature you can pull a SQL result straight
into a polars DataFrame, skipping per-column handling:
// whole result in one DataFrame
let df = reader.prepare(sql).execute()?.fetch_all_polars()?;
// or stream it batch-by-batch (schema-drift-checked)
let mut cursor = reader.prepare(sql).execute()?;
for frame in cursor.iter_polars()? {
let df = frame?;
// ... process each DataFrame ...
}
fetch_all_polars() materialises the entire result (and replays transparently
through a mid-query failover); iter_polars() yields one DataFrame per batch
with schema-drift detection; next_polars() is the low-level per-batch form.
These live on the Cursor — there is no pool-level shortcut on the read side.
Running DDL and DML
execute / prepare also run non-SELECT statements (CREATE, INSERT, UPDATE,
ALTER, DROP, TRUNCATE). They return no rows: next_batch() yields None
immediately and the outcome lands in the cursor's terminal() as
Terminal::ExecDone { rows_affected, .. }.
use questdb::egress::Terminal;
let mut cursor =
reader.execute("UPDATE trades SET price = price * 1.01 WHERE symbol = 'ETH-USD'")?;
while cursor.next_batch()?.is_some() {} // drain (a DDL/DML statement yields no rows)
if let Some(Terminal::ExecDone { rows_affected, .. }) = cursor.terminal() {
println!("updated {rows_affected} rows");
}
The Cursor is #[must_use]: always drain it with next_batch() (or
cancel()), even when the statement returns nothing.
Cancellation, flow control, and compression
Cancel a running query at any time with cursor.cancel() — it sends a CANCEL
frame and drains to the server's terminal.
For large results, bound how far the server streams ahead with byte credits.
initial_credit(n) caps the in-flight bytes (0 = unbounded); grant more as you
consume with Cursor::add_credit(bytes), and read the running total with
credit_granted_total():
let mut cursor = reader
.prepare("SELECT * FROM trades")
.initial_credit(1 << 20) // cap at 1 MiB in flight
.execute()?;
while let Some(batch) = cursor.next_batch()? {
// ... process `batch` ...
cursor.add_credit(1 << 20)?; // top up as you drain
}
Enable zstd compression on the read path with the sync-reader-zstd feature
plus a connect-string key: compression=zstd (or compression=auto to let the
server choose), optionally tuned with compression_level=N (1–22, default 1).
Error handling
Every write call returns questdb::Result<T> (alias for
Result<T, questdb::Error>); questdb::Error carries an
ErrorCode (err.code()) and a message (err.msg()). The reader uses a parallel
questdb::egress::error::{Error, ErrorCode, Result}. To mix both in one function,
return Result<(), Box<dyn std::error::Error>> (both ? cleanly).
match sender.flush(&mut chunk) {
Ok(()) => {}
Err(e) => {
eprintln!("flush failed [{:?}]: {}", e.code(), e.msg());
if sender.must_close() { /* drop the borrow; the pool opens a fresh conn next time */ }
}
}
A borrow that has latched terminal (must_close() is true) is dropped — not
recycled — when it returns to the pool. Call mark_must_close() to force this
after an error you suspect damaged the connection.
Failover and high availability
Point the pool at several QuestDB nodes and it rotates to a live one on every
reconnect. List the endpoints in the connect string — comma-separated in one
addr=, or as repeated addr= params:
// rotate across three nodes; pin to the primary (Enterprise primary/replica)
let db = QuestDb::connect("ws::addr=node-a:9000,node-b:9000,node-c:9000;target=primary;")?;
target=primary / target=replica / target=any and zone= select which node
role the pool connects to; see the
connect string reference for the full
grammar. If every endpoint completes its handshake but none matches target=,
the borrow fails with a role-reject error (distinct from "all endpoints
unreachable").
Reconnect budget
When a connection drops, the pool dials the endpoint set with centered-jittered backoff until one answers or the budget expires. Three QWP/WebSocket-only keys tune it:
| Key | Default | Meaning |
|---|---|---|
reconnect_max_duration_millis | 300000 (5 min) | Total failover budget before the borrow gives up. |
reconnect_initial_backoff_millis | 100 | First backoff between dials; grows toward the max. |
reconnect_max_backoff_millis | 5000 | Backoff ceiling between dials. |
Auth failures and protocol-version mismatches are terminal — they are never retried, whatever the budget.
Ingestion failover
Ingestion failover is automatic — you do not write a retry loop. The column
sender publishes every flush() into its store-and-forward queue, and a
background runner forwards that queue to the server, reconnecting and rotating
across the endpoints above on its own. When the pool is disk-backed (sf_dir)
the queue is durable, so unacknowledged batches replay after a client restart
or a server failover — a batch that flush() accepted is not lost to a dropped
connection.
The only thing you react to is a terminal condition — an auth or
protocol-version rejection the runner cannot retry. The handle reports it via
must_close(); drop the borrow and the next one opens a fresh connection.
Because the durable queue replays unacknowledged batches, a delivery that was in doubt when a connection dropped may be re-sent. Enable table-level deduplication or upsert keys so replays are idempotent.
The row sender shares this model: its background runner absorbs reconnects
across the endpoint set automatically. Surface any async transport error with
poll_qwp_ws_error(), confirm the tail with wait(), and check must_close()
after a terminal error.
Query failover
A transport error mid-query fails the cursor over to another endpoint and
replays the query from the start there (a fresh request_id, batches from
zero). Because already-consumed rows arrive again, you must opt in and discard
them — install on_failover_reset:
let mut cursor = reader
.prepare("SELECT ts, price FROM trades WHERE price > $1")
.bind_f64(2615.0)
.on_failover_reset(|ev| {
// query restarts on ev.new_addr — drop rows kept from the dead node
eprintln!("failover {}:{} -> {}:{} ({} attempts)",
ev.failed_addr.host, ev.failed_addr.port,
ev.new_addr.host, ev.new_addr.port, ev.attempts);
})
.execute()?;
Without on_failover_reset (or on_failover_progress), a mid-stream failover
surfaces as an error instead of silently replaying. on_failover_progress
reports each phase — Disconnected, Retrying, Reset, GaveUp (budget
exhausted; the cursor is terminal and the error is in final_error). Both
callbacks run synchronously on the cursor's drive thread: do not call back into
the reader/cursor, block, or panic inside them.
Concurrency
QuestDb is the pool, and it is the only thread-safe handle in the client.
It is Send + Sync: create one per process and share it across every worker
thread — behind an Arc, since the pool itself is not Clone. The pool guards
its connections internally.
The borrowed handles are the opposite. SfColumnSender, BorrowedRowSender, and
BorrowedReader — and the Chunk / Buffer you build — are not thread-safe
(!Send + !Sync). A borrow belongs to the thread that took it; the compiler
stops you moving one to another thread. The model is always one pool, many
short-lived borrows: each thread borrows what it needs, uses it, and drops it.
use std::sync::Arc;
let db = Arc::new(QuestDb::connect("ws::addr=localhost:9000;pool_size=4;")?);
let workers: Vec<_> = (0..4).map(|_| {
let db = Arc::clone(&db);
std::thread::spawn(move || -> questdb::Result<()> {
let mut sender = db.borrow_column_sender()?; // this thread's own sender
// ... build and flush chunks ...
Ok(())
}) // `sender` drops here → its connection returns to the pool
}).collect();
Borrows return to the pool on Drop — you never return one by hand. A handle
holds a pool slot for as long as it is alive, so keep borrows short: borrow, use,
drop, rather than parking one open across idle periods. A returned connection is
recycled for the next borrow (one that latched must_close is discarded instead,
and the next borrow opens a fresh connection).
Borrowing is fail-fast at the cap. The column, row, and reader pools are each
capped independently by pool_max; when every slot is in use the next borrow
returns ErrorCode::InvalidApiCall rather than blocking. Size pool_max to your
peak concurrency, and set pool_size to your steady worker count so the reaper
keeps that many connections warm once they have opened.
Closing
Dropping QuestDb closes the pool (stops the reaper, drops idle connections).
Call db.close() for an explicit shutdown, or db.reap_idle() under
pool_reap=manual to trim idle connections yourself.
Complete example: write then read
use questdb::QuestDb;
use questdb::ingress::column_sender::{Chunk, AckLevel};
use questdb::egress::column::ColumnView;
use std::time::Duration;
// Cargo.toml: questdb-rs = { version = "7", features = ["sync-reader-qwp-ws"] }
fn main() -> Result<(), Box<dyn std::error::Error>> {
let db = QuestDb::connect("ws::addr=localhost:9000;")?;
// --- write (column-major) ---
{
let mut sender = db.borrow_column_sender()?;
let price = [2615.54_f64, 2616.00, 2617.25];
let ts_ns = [1_700_000_000_000_000_000_i64, 1_700_000_000_001_000_000, 1_700_000_000_002_000_000];
let mut chunk = Chunk::new("trades");
chunk.column_f64("price", &price, None)?;
chunk.designated_timestamp_nanos(&ts_ns)?;
sender.flush(&mut chunk)?;
sender.wait(AckLevel::Ok, Duration::ZERO)?;
} // sender returns to the pool here
// --- read ---
// NOTE: wait() confirms receipt, not query visibility. In a real
// write-then-read flow a freshly written row may not be selectable
// immediately — see the visibility note above.
{
let mut reader = db.borrow_reader()?;
let mut cursor = reader.prepare("SELECT ts, price FROM trades ORDER BY ts").execute()?;
while let Some(batch) = cursor.next_batch()? {
let ColumnView::Timestamp(ts) = batch.column(0)? else { unreachable!() };
let ColumnView::Double(price) = batch.column(1)? else { unreachable!() };
for r in 0..batch.row_count() {
println!("{} -> {}", ts.value(r), price.value(r));
}
}
}
db.close();
Ok(())
}
Next steps
- Connect string reference
- QWP protocol
- C/C++ pooled API (the same pool, in C/C++)
- Query overview