Go client for QuestDB

The QuestDB Go client connects to QuestDB over QWP — QuestDB Wire Protocol — a columnar binary protocol carried over WebSocket. It supports high-throughput data ingestion and streaming SQL queries on the same transport.

Key capabilities:

  • Ingestion: column-oriented batched writes with automatic table creation, schema evolution, and optional store-and-forward durability.
  • Querying: streaming SQL result sets, DDL and DML execution, bind parameters, and byte-credit flow control.
  • Failover: multi-endpoint connections with automatic reconnect across rolling upgrades and primary migrations.
Legacy transports

The client also supports ILP ingestion over HTTP and TCP for backward compatibility. This page documents the recommended WebSocket (QWP) path. For ILP transport details, see the ILP overview.

Quick start

The client requires Go 1.23 or later. Add it to your module:

go get github.com/questdb/go-questdb-client/v4

Ingest data

package main

import (
"context"

qdb "github.com/questdb/go-questdb-client/v4"
)

func main() {
ctx := context.TODO()

sender, err := qdb.LineSenderFromConf(ctx, "ws::addr=localhost:9000;")
if err != nil {
panic(err)
}
defer sender.Close(ctx)

err = sender.Table("trades").
Symbol("symbol", "ETH-USD").
Symbol("side", "sell").
Float64Column("price", 2615.54).
Float64Column("amount", 0.00044).
AtNow(ctx)
if err != nil {
panic(err)
}

if err := sender.Flush(ctx); err != nil {
panic(err)
}
}

Query data

package main

import (
"context"
"fmt"

qdb "github.com/questdb/go-questdb-client/v4"
)

func main() {
ctx := context.TODO()

client, err := qdb.NewQwpQueryClient(ctx,
qdb.WithQwpQueryAddress("localhost:9000"))
if err != nil {
panic(err)
}
defer client.Close(ctx)

q := client.Query(ctx,
"SELECT symbol, price FROM trades WHERE symbol = 'ETH-USD' LIMIT 10")
defer q.Close()

for batch, err := range q.Batches() {
if err != nil {
panic(err)
}
for row := 0; row < batch.RowCount(); row++ {
fmt.Println(batch.String(0, row), batch.Float64(1, row))
}
}
}
Read before building on these snippets

The two snippets above are deliberately minimal. Three behaviors will cause data loss, corruption, or panics if you carry the minimal form into real code:

  • Ingestion errors are asynchronous. Flush returning nil does not mean the server accepted the rows. Schema, parse, and write rejections are delivered out of band. Register an error handler. See Ingestion errors.
  • A sender or query client is not safe for concurrent use. Use one per goroutine. See Concurrency.
  • A query batch is valid only inside its loop iteration. Some accessors alias the network buffer. Copy out anything you keep. See Reading result batches.

Building with multi-host failover? It adds exactly three rules on top of the single-host code, listed up front in Failover and high availability. Single-host applications can ignore them.

Authentication and TLS

Authentication happens at the HTTP level during the WebSocket upgrade, before any binary frames are exchanged. The same mechanisms work for both the LineSender (ingestion) and the QwpQueryClient (querying).

HTTP basic auth

// Ingestion
sender, err := qdb.LineSenderFromConf(ctx,
"wss::addr=db.example.com:9000;username=admin;password=quest;")

// Querying
client, err := qdb.QwpQueryClientFromConf(ctx,
"wss::addr=db.example.com:9000;username=admin;password=quest;")

The options API exposes the same settings:

sender, err := qdb.NewLineSender(ctx,
qdb.WithQwp(),
qdb.WithAddress("db.example.com:9000"),
qdb.WithTls(),
qdb.WithBasicAuth("admin", "quest"))

Token authentication avoids the per-request overhead of basic auth and is the recommended path for Enterprise deployments.

sender, err := qdb.LineSenderFromConf(ctx,
"wss::addr=db.example.com:9000;token=your_bearer_token;")

client, err := qdb.NewQwpQueryClient(ctx,
qdb.WithQwpQueryAddress("db.example.com:9000"),
qdb.WithQwpQueryTls(),
qdb.WithQwpQueryBearerToken("your_bearer_token"))

The token is a static credential: the client sends exactly the string you pass and never refreshes or renews it. Acquire it out of band — QuestDB Enterprise issues bearer tokens through its OpenID Connect flow — and manage its lifetime yourself. There is no token-refresh callback: when the token expires or is rotated, construct a new sender or query client with the new token. An expired or rejected token surfaces as an authentication failure (see Connection-level errors).

Production example (TLS + token + multi-host)

A realistic Enterprise deployment combines wss, token auth, and a multi-host addr list. The target key controls which server roles the client will connect to: primary for the authoritative write node, replica for read-only replicas, or any (default) for either.

// Ingestion: connect to any writeable node
sender, err := qdb.LineSenderFromConf(ctx,
"wss::addr=db-1.example.com:9000,db-2.example.com:9000;"+
"token=your_bearer_token;")

// Querying: prefer a replica to offload the primary
client, err := qdb.QwpQueryClientFromConf(ctx,
"wss::addr=db-1.example.com:9000,db-2.example.com:9000;"+
"token=your_bearer_token;target=replica;")

TLS trust store

TLS is enabled by the wss schema (or qdb.WithTls()). The Go client verifies the server certificate against the operating-system trust store. It does not support a custom trust store: the tls_roots / tls_roots_password connect-string keys (a Java-keystore feature) are rejected by the Go connect-string parser. To trust a private CA, install it in the host trust store. For test-only certificate-verification bypass, see tls_verify in the TLS section of the connect string reference.

Creating the client

From a connect string

The connect string format is <schema>::<key>=<value>;<key>=<value>;...;. Use ws for plain WebSocket or wss for TLS:

sender, err := qdb.LineSenderFromConf(ctx, "ws::addr=localhost:9000;")

client, err := qdb.QwpQueryClientFromConf(ctx, "ws::addr=localhost:9000;")

For the full list of connect-string keys, see the connect string reference.

From an environment variable

Set QDB_CLIENT_CONF to avoid hard-coding credentials:

export QDB_CLIENT_CONF="wss::addr=db.example.com:9000;username=admin;password=quest;"
sender, err := qdb.LineSenderFromEnv(ctx)

Using the options API

The options API exposes the same options as the connect string, with type-safe Go signatures (e.g., sf_append_deadline_millis becomes qdb.WithSfAppendDeadline(30*time.Second)). For the full list of keys, see the connect string reference.

NewLineSender requires exactly one transport option (qdb.WithQwp() here); LineSenderFromConf infers the transport from the ws/wss schema instead. An error handler can only be set through the options API:

sender, err := qdb.NewLineSender(ctx,
qdb.WithQwp(),
qdb.WithAddress("localhost:9000"),
qdb.WithAutoFlushRows(500),
qdb.WithAutoFlushInterval(50*time.Millisecond),
qdb.WithErrorHandler(func(e *qdb.SenderError) { /* see Error handling */ }))

client, err := qdb.NewQwpQueryClient(ctx,
qdb.WithQwpQueryAddress("localhost:9000"),
qdb.WithQwpQueryInitialCredit(256*1024))

Data ingestion

Concurrency

A LineSender owns a single connection and is not safe for concurrent use. Sharing one across goroutines corrupts the buffer and interleaves rows. Create one sender per goroutine, or hand rows to a single dedicated writer goroutine through a channel.

Connection pooling (LineSenderPool) targets the stateless HTTP transport and is not available for QWP, so it is not the answer to QWP concurrency.

General usage pattern

  1. Create a sender via qdb.LineSenderFromConf() or qdb.NewLineSender().
  2. Call Table(name) to select a table.
  3. Call column methods to add values:
    • Symbol(name, value)
    • StringColumn(name, value), BoolColumn(name, value)
    • Int64Column(name, value), Float64Column(name, value)
    • TimestampColumn(name, time.Time) for non-designated timestamps
    • Long256Column(name, *big.Int)
    • Float64Array1DColumn / 2D / 3D / NDColumn (see Ingest arrays)
    • DecimalColumn, DecimalColumnFromString (see Decimal columns)
  4. Call At(ctx, time.Time) or AtNow(ctx) to finalize the row.
  5. Repeat from step 2, or call Flush(ctx) to send buffered data.
  6. Call Close(ctx) when done.

The call order is fixed: Table, then Symbols, then column setters, then At/AtNow. The fluent methods do not return errors; the first error is latched and surfaces from At, AtNow, or Flush, so always check that return value.

The error from At/AtNow/Flush is only the local error

It reports a client-side problem: a bad value, wrong call order, or store-and-forward backpressure. Server-side rejections (schema mismatch, parse error, write error) are asynchronous and are delivered to the error handler, never returned here. A nil return does not mean the server accepted the data. See Ingestion errors.

Tables and columns are created automatically if they do not exist. The full runnable example registers an error handler, the minimum correct shape for a QWP producer:

/*+*****************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2026 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/

// Demonstrates the minimum correct QWP (WebSocket) ingestion idiom for a
// single-host application without failover.
//
// QWP ingestion is asynchronous: the error returned by At/AtNow/Flush is the
// local, latched error (bad value, buffer state, backpressure). Server-side
// rejections (schema mismatch, parse error, ...) arrive out of band on the
// SenderErrorHandler, NOT from the Flush that sent the data. Registering a
// handler is therefore part of the baseline idiom, not an advanced option.
package main

import (
"context"
"log"
"time"

qdb "github.com/questdb/go-questdb-client/v4"
)

func main() {
ctx := context.TODO()

// WithQwp() selects the QWP binary protocol over a plain WebSocket
// (use qdb.WithTls() for wss). A LineSender is not safe for
// concurrent use: create one per goroutine.
sender, err := qdb.NewLineSender(ctx,
qdb.WithQwp(),
qdb.WithAddress("localhost:9000"),
qdb.WithErrorHandler(func(e *qdb.SenderError) {
// Dead-letter / alert here. This runs on a dedicated
// goroutine, never the producer goroutine.
log.Printf("server rejected fsn=[%d,%d] table=%s category=%s: %s",
e.FromFsn, e.ToFsn, e.TableName, e.Category, e.ServerMessage)
}),
)
if err != nil {
log.Fatal(err)
}
defer func() {
// Close flushes and drains, but a failed close can mean
// unacked data was not delivered. Always check it.
if err := sender.Close(ctx); err != nil {
log.Fatal(err)
}
}()

tradedTs, _ := time.Parse(time.RFC3339, "2022-08-06T15:04:05.123456Z")
for i := 0; i < 1000; i++ {
// Call order is fixed: Table, then Symbol(s), then columns,
// then At/AtNow. A latched fluent error surfaces here.
err := sender.
Table("trades").
Symbol("symbol", "ETH-USD").
Symbol("side", "sell").
Float64Column("price", 2615.54).
Float64Column("amount", 0.00044).
At(ctx, tradedTs)
if err != nil {
log.Fatal(err)
}
}

// Publish everything buffered so far. Flush returns once the batch
// is published to the cursor engine; it does NOT wait for the
// server ACK (rejections arrive on the handler above). Batch many
// rows per Flush rather than flushing per row. For server-ack
// confirmation, use FlushAndGetSequence paired with AwaitAckedFsn.
if err := sender.Flush(ctx); err != nil {
log.Fatal(err)
}
}

The QWP transport exposes column types that are not part of ILP. Type-assert the sender to qdb.QwpSender with the comma-ok form (only ws/wss senders implement it; an HTTP or TCP sender does not):

sender, err := qdb.LineSenderFromConf(ctx, "ws::addr=localhost:9000;")
qs, ok := sender.(qdb.QwpSender)
if !ok {
panic("not a QWP sender")
}

err = qs.Table("trades").
Symbol("symbol", "ETH-USD").
Int32Column("venue_id", 7).
CharColumn("side", 'S').
UuidColumn("order_id", hi, lo).
AtNano(ctx, time.Now())

QwpSender adds ByteColumn, ShortColumn, Int32Column, Float32Column, CharColumn, DateColumn, TimestampNanosColumn, UuidColumn, GeohashColumn, Int64Array1DColumn / 2D / 3D, the decimal columns, and AtNano for nanosecond designated timestamps.

Null values

The client has no null setter. To store a null for a column in a given row, omit that column's setter before At/AtNow/AtNano. On row commit, every column not set in the row is gap-filled with a null, so omitting a column and writing an "explicit null" are the same operation.

The buffered column set is the union across the batch: a column first used on a later row is backfilled with null for every earlier row still in the send buffer.

Ingest arrays

For 1D, 2D, and 3D double arrays, pass a Go slice directly:

prices := []float64{1.0842, 1.0843, 1.0841}
err = sender.Table("book").Float64Array1DColumn("levels", prices).AtNow(ctx)

For higher-dimensional arrays, build an NdArray once and reuse it:

arr, err := qdb.NewNDArray[float64](3, 3, 3)
if err != nil {
panic(err)
}
arr.Fill(1.5)
err = sender.Table("book").Float64ArrayNDColumn("cube", arr).AtNow(ctx)

Values are stored in row-major order: the last dimension varies fastest. Use Set(value, positions...) to write at specific coordinates, Append(value) for sequential fills, and Reshape(shape...) to change the shape without reallocating.

Designated timestamp

The designated timestamp column controls time-based partitioning and ordering:

// User-assigned (recommended for deduplication and exactly-once delivery)
err = sender.Table("trades").
Symbol("symbol", "EURUSD").
Float64Column("price", 1.0842).
At(ctx, time.Now())

// Nanosecond precision (creates a timestamp_ns column); QwpSender only
err = qs.Table("ticks").
Symbol("symbol", "EURUSD").
Float64Column("price", 1.0842).
AtNano(ctx, time.Now())

// Server-assigned (server uses its wall-clock time)
err = sender.Table("trades").
Symbol("symbol", "EURUSD").
Float64Column("price", 1.0842).
AtNow(ctx)
caution

A table's designated timestamp resolution is fixed by its first row. Mixing At (microseconds) and AtNano (nanoseconds) on rows of the same table within one flush returns a type-conflict error. Pick one resolution per table.

note

QuestDB works best when data arrives in chronological order, sorted by timestamp.

Decimal columns

caution

Decimal values require QuestDB 9.2.0 or later. Create decimal columns ahead of time with DECIMAL(precision, scale) so QuestDB ingests values with the expected precision. See the decimal data type page for details.

Construct a qdb.Decimal from an int64, a *big.Int, or a raw two's complement big-endian payload:

price := qdb.NewDecimalFromInt64(12345, 2) // 123.45, scale 2
commission, err := qdb.NewDecimal(big.NewInt(-750), 4)
if err != nil {
panic(err)
}

err = qs.Table("trade_fees").
Symbol("symbol", "ETH-USD").
Decimal128Column("settled_price", price).
Decimal128Column("commission", commission).
AtNow(ctx)

DecimalColumn serializes a 256-bit value, while Decimal64Column, Decimal128Column, and Decimal256Column (on QwpSender) target the matching column width. DecimalColumnFromString lets the server parse a validated literal, and DecimalColumnShopspring accepts github.com/shopspring/decimal values.

Flushing

The client accumulates rows in an internal buffer and sends them in batches.

Auto-flush (default) flushes when either threshold is reached:

TriggerWebSocket defaultHTTP default
Row count1,000 rows75,000 rows
Time100 ms1,000 ms

Customize via the connect string or the options API:

ws::addr=localhost:9000;auto_flush_rows=500;auto_flush_interval=50;

Flush(ctx) sends buffered data immediately. It returns once the rows are published into the cursor engine (in memory, or on disk when sf_dir is set) — it does not wait for the server to acknowledge them. Delivery and acknowledgement happen asynchronously on the send loop; a server-side rejection surfaces on the error handler, never as a Flush error (see Ingestion errors). For explicit server-ack confirmation, pair FlushAndGetSequence with AwaitAckedFsn (below). Write many rows per Flush; calling it after every row collapses throughput.

caution

If you disable auto-flush (auto_flush=off or qdb.WithAutoFlushDisabled()), nothing is sent until you call Flush yourself. Close does a final flush, but it is best-effort, bounded by close_flush_timeout_millis, and not retried on failure. An app that disables auto-flush and never calls Flush loses everything it buffered.

QwpSender.FlushAndGetSequence(ctx) returns the published frame sequence number (FSN), and AwaitAckedFsn(ctx, target) blocks until the server has acknowledged up to a given FSN. Use the FSN to correlate a publish with any later SenderError.

Store-and-forward

With store-and-forward enabled, unacknowledged data is persisted to disk and replayed after reconnection, surviving sender process restarts:

ws::addr=localhost:9000;sf_dir=/var/lib/questdb/sf;sender_id=ingest-1;

When multiple senders share the same sf_dir, each must have a distinct sender_id. Slots are exclusive: two senders with the same ID will collide. Allowed characters: A-Za-z0-9_-.

Without sf_dir, unacknowledged data lives in process memory and is lost if the sender process dies. The reconnect loop still spans transient server outages, but the RAM buffer caps how much data can accumulate.

Replay is at-least-once — enable DEDUP

After a reconnect or a sender restart, the client replays frames the server may have accepted but not yet acknowledged. Without DEDUP on the target table, replay produces duplicate rows. Tables ingested over a reconnecting or multi-host connection must declare DEDUP UPSERT KEYS(...) covering row identity. See Delivery semantics for the full at-least-once / exactly-once model.

With store-and-forward enabled, At/AtNow/Flush can block when the buffer hits its cap. The producer blocks until the wire path drains enough capacity, then returns a deadline error (sf_append_deadline_millis) if it does not drain in time. Treat a blocking call as a signal that the server is unreachable or slow, not as a reason to retry in a tight loop.

Terminal rejections (schema, parse, or security errors) latch a terminal error. The next producer call returns it as a typed *SenderError; the sender will not drain further. Close it and create a new sender to continue.

For concepts, sizing, and recovery, see store-and-forward and the store-and-forward keys of the connect string reference.

Durable acknowledgement

Enterprise

Durable acknowledgement requires QuestDB Enterprise with primary replication configured.

By default, the server confirms a batch when it is committed to the local WAL. Durable acknowledgement instead waits until the batch has been durably uploaded to object storage. See the durable ACK keys.

Not yet implemented in the Go client

Durable-ack mode is a deferred follow-up in this client. Passing request_durable_ack=on; (or =true) in the connect string is rejected at construction with an InvalidConfigStr error; the only accepted value today is request_durable_ack=off (the default). Until the feature ships, the sender confirms on the transport-level OK ACK and ignores STATUS_DURABLE_ACK frames.

Querying and SQL execution

The QwpQueryClient sends SQL statements over the QWP egress endpoint. Query returns a streaming cursor for SELECT statements; Exec runs DDL and DML and returns an ExecResult. Both block until the statement completes, so you can sequence operations safely:

/*+*****************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2026 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package main

import (
"context"
"fmt"
"log"
"strings"
"time"

qdb "github.com/questdb/go-questdb-client/v4"
)

const (
tableName = "qwp_query_example"
rowCount = 1000
)

func main() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

client, err := qdb.NewQwpQueryClient(ctx,
qdb.WithQwpQueryAddress("localhost:9000"),
)
if err != nil {
log.Fatalf("connect: %v", err)
}
defer func() {
if err := client.Close(ctx); err != nil {
log.Printf("close: %v", err)
}
}()

if _, err := client.Exec(ctx, fmt.Sprintf("DROP TABLE IF EXISTS '%s'", tableName)); err != nil {
log.Fatalf("drop: %v", err)
}
createSQL := fmt.Sprintf(
"CREATE TABLE '%s' (ts TIMESTAMP, v LONG) TIMESTAMP(ts)",
tableName)
if _, err := client.Exec(ctx, createSQL); err != nil {
log.Fatalf("create: %v", err)
}

insertSQL := buildBulkInsert(tableName, rowCount)
res, err := client.Exec(ctx, insertSQL)
if err != nil {
log.Fatalf("insert: %v", err)
}
fmt.Printf("inserted %d rows\n", res.RowsAffected)

expected := expectedSum(rowCount)
fmt.Printf("expected sum: %d\n", expected)
fmt.Printf("per-row sum: %d\n", sumPerRow(ctx, client))
fmt.Printf("bulk sum: %d\n", sumBulk(ctx, client))
}

// sumPerRow demonstrates the zero-allocation, per-row idiom.
//
// QwpColumn caches the column's layout pointer once per batch, so every
// Int64(r) call reads straight out of the QWP buffer — no intermediate
// slice. Best for ad-hoc consumers and when you also need per-row
// branching (null checks, mixed-column row builders).
func sumPerRow(ctx context.Context, client *qdb.QwpQueryClient) int64 {
q := client.Query(ctx, fmt.Sprintf("SELECT ts, v FROM '%s'", tableName))
defer q.Close()

var sum int64
for batch, err := range q.Batches() {
if err != nil {
log.Fatalf("per-row query: %v", err)
}
vCol := batch.Column(1) // column 1 is `v` (LONG)
n := vCol.RowCount()
for r := 0; r < n; r++ {
sum += vCol.Int64(r)
}
}
return sum
}

// sumBulk demonstrates the bulk-decode idiom for a tight column sweep.
//
// Int64Range decodes a row range into a caller-owned []int64 in one
// shot. On a no-null column it lowers to a single memmove out of the
// QWP buffer, after which the inner sum is a branch-free range loop the
// compiler can vectorize. Reuse the buffer across batches with [:0] —
// allocation happens once for the whole query.
func sumBulk(ctx context.Context, client *qdb.QwpQueryClient) int64 {
q := client.Query(ctx, fmt.Sprintf("SELECT ts, v FROM '%s'", tableName))
defer q.Close()

var (
sum int64
buf = make([]int64, 0, rowCount)
)
for batch, err := range q.Batches() {
if err != nil {
log.Fatalf("bulk query: %v", err)
}
buf = batch.Column(1).Int64Range(0, batch.RowCount(), buf[:0])
for _, v := range buf {
sum += v
}
}
return sum
}

func buildBulkInsert(table string, n int) string {
base := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
var sb strings.Builder
fmt.Fprintf(&sb, "INSERT INTO '%s' (ts, v) VALUES ", table)
for i := 0; i < n; i++ {
if i > 0 {
sb.WriteByte(',')
}
// QuestDB TIMESTAMP literals are microseconds since epoch.
ts := base.Add(time.Duration(i) * time.Second).UnixMicro()
fmt.Fprintf(&sb, "(%d,%d)", ts, int64(i))
}
return sb.String()
}

func expectedSum(n int) int64 {
return int64(n) * int64(n-1) / 2
}

A QwpQueryClient is not safe for concurrent Query or Exec calls, and it runs one query at a time (the protocol is single-in-flight in this release). Use one client per query-issuing goroutine. Cancel (on a *QwpQuery) and Close are safe to call from other goroutines. A *QwpQuery is single-use: once its Batches() range ends, do not iterate it again.

Results stream as a sequence of batches. Process each batch as it arrives rather than collecting an entire large result set in memory. For big result sets, bound how fast the server pushes with flow control.

Executing SELECT queries

The simple, single-host idiom is to treat any non-nil error from the iteration as terminal. This is always safe, including under failover:

type Trade struct {
TsMicros int64
Symbol string
Price float64
}

var trades []Trade
q := client.Query(ctx, "SELECT ts, symbol, price FROM trades LIMIT 1000")
defer q.Close()

for batch, err := range q.Batches() {
if err != nil {
return err // simple apps: any error is terminal
}
for row := 0; row < batch.RowCount(); row++ {
trades = append(trades, Trade{
TsMicros: batch.Int64(0, row),
Symbol: batch.String(1, row),
Price: batch.Float64(2, row),
})
}
}
Copy aliasing values out before the iteration ends

A *QwpColumnBatch is valid only during its iteration of the loop. Never store the batch itself; use batch.CopyAll() for a retainable snapshot. Which accessors alias the receive buffer and which return caller-owned data:

  • Alias the buffer (copy with bytes.Clone before the loop advances if you keep them): Str(col, row) and Binary(col, row).
  • Safe to retain: String(col, row) returns a freshly allocated Go string. Float64Array, Int64Array, the *Into accessors, and the QwpColumn *Range accessors return caller-owned slices (freshly allocated, or appended into a buffer you supply).
  • The fixed-width scalar accessors (Int64, Float64, …) return values, not views.

For tight loops over a single column, batch.Column(i) returns a QwpColumn that caches the column layout once, and Int64Range / Float64Range decode a row range into a caller-owned slice in one shot:

buf := make([]int64, 0, 4096)
for batch, err := range q.Batches() {
if err != nil {
return err
}
buf = batch.Column(1).Int64Range(0, batch.RowCount(), buf[:0])
for _, v := range buf {
// ...
}
}

q.Cancel() aborts the query and is safe to call from another goroutine. q.TotalRows() reports the row count once the cursor completes.

Reading result batches

QwpColumnBatch and QwpColumn provide typed accessors for every QuestDB column type. QwpColumnBatch accessors take (col, row); the cached QwpColumn accessors take (row).

AccessorColumn types
BoolBOOLEAN
Int8BYTE
Int16SHORT
CharCHAR
Int32INT, IPv4
Int64LONG, TIMESTAMP, timestamp_ns, DATE
Float32FLOAT
Float64DOUBLE
String / StrVARCHAR, SYMBOL (String allocates)
BinaryBINARY
UuidHi / UuidLoUUID (64-bit halves)
Decimal128Hi / Decimal128LoDECIMAL128 (two int64 words)
Long256WordLONG256 (per 64-bit word)
Float64Array / Int64ArrayDOUBLE_ARRAY, LONG_ARRAY (flattened)
ArrayNDims / ArrayDimarray dimensionality and shape
DecimalScaleDECIMAL scale metadata (per column)
GeohashPrecisionBitsGEOHASH precision metadata (per column)
IsNullall types

Representations to be aware of:

  • TIMESTAMP and timestamp_ns and DATE come back as int64, not time.Time: microseconds, nanoseconds, and milliseconds since epoch respectively. Convert with time.UnixMicro / time.Unix(0, ns) as needed.
  • UUID is two int64 halves (UuidHi / UuidLo); reassemble client-side.
  • Decimals come back as the unscaled integer plus the per-column DecimalScale(col): read DECIMAL64 with Int64, DECIMAL128 with Decimal128Hi/Decimal128Lo, and DECIMAL256 with Long256Word (words 0–3); apply the scale yourself.
  • GEOHASH result columns expose only metadata in this release (GeohashPrecisionBits(col)); there is no public value accessor for a GEOHASH cell. Cast it to a string or long in SQL if you need the value.
  • A typed accessor on a NULL cell returns the zero value (0, false, "", nil), which is indistinguishable from a real zero. Call IsNull(col, row) first whenever NULL is meaningful.

Column metadata is available via ColumnName(col), ColumnType(col), and ColumnCount().

DDL and DML statements

Non-SELECT statements run through Exec, which returns an ExecResult:

res, err := client.Exec(ctx,
"CREATE TABLE trades (ts TIMESTAMP, symbol SYMBOL, side SYMBOL, "+
"price DOUBLE, amount DOUBLE) TIMESTAMP(ts) PARTITION BY DAY WAL")
if err != nil {
return err
}
fmt.Println(res.OpType, res.RowsAffected)

RowsAffected reports the count for INSERT, UPDATE, and DELETE. Pure DDL reports 0. OpType is the server's statement discriminator, useful for distinguishing INSERT from UPDATE from pure DDL.

Exec is not retried across a reconnect by default

If the connection drops mid-statement, Exec returns a *QwpFailoverReset. This means the statement was interrupted and not confirmed, not that it succeeded. For a non-idempotent INSERT, re-issuing it may double-apply, so decide per statement whether replay is safe. To make Exec retry transparently (only for idempotent statements), construct the client with qdb.WithQwpQueryReplayExec(true).

Bind parameters

Parameterized queries use typed bind values, avoiding SQL injection and enabling server-side factory cache reuse. Pass a QwpBindFunc via qdb.WithQueryBinds:

sql := "SELECT ts, symbol, price FROM trades " +
"WHERE symbol = $1 AND price >= $2 LIMIT 1000"

for _, symbol := range []string{"EURUSD", "GBPUSD", "USDJPY"} {
q := client.Query(ctx, sql, qdb.WithQueryBinds(func(b *qdb.QwpBinds) {
b.VarcharBind(0, symbol).DoubleBind(1, 1.0)
}))
for batch, err := range q.Batches() {
if err != nil {
break
}
// ...
}
q.Close()
}

Bind indices are 0-based and must be set in strictly ascending order; index 0 maps to $1. Setters include BooleanBind, ByteBind, ShortBind, IntBind, LongBind, FloatBind, DoubleBind, CharBind, DateBind, TimestampMicrosBind, TimestampNanosBind, VarcharBind, UuidBind, Long256Bind, GeohashBind, DecimalBind (and Decimal64/128/256Bind), plus a Null...Bind variant for each type. There is no symbol bind: use VarcharBind for symbol parameters. Not bindable: BINARY (no setter); ARRAY / DOUBLE[] / LONG[] (bind frames carry no array shape — pass a SQL array literal in the statement instead); IPv4 (bind it as INT with IntBind). A gap, a duplicate index, or any out-of-order call latches an error that surfaces from Query or Exec.

Flow control

For large result sets, byte-credit flow control prevents the server from overwhelming the client:

client, err := qdb.NewQwpQueryClient(ctx,
qdb.WithQwpQueryAddress("localhost:9000"),
qdb.WithQwpQueryInitialCredit(256*1024))

The server pauses after streaming the granted budget and replenishes after each batch. A credit of 0 (the default) means unbounded: the server streams as fast as the network allows, so set a credit when consuming a large result set on a memory-constrained client.

Compression

Negotiate zstd compression to reduce bandwidth for large result sets:

client, err := qdb.QwpQueryClientFromConf(ctx,
"ws::addr=localhost:9000;compression=zstd;compression_level=3;")

Batches are decompressed automatically.

Error handling

Ingestion errors

WebSocket ingestion uses an asynchronous error model. Batch rejections are not returned from Flush. They are delivered to a SenderErrorHandler callback. If you do not register one, a built-in handler logs them, but your application is not notified and cannot dead-letter or alert, so register one in any non-trivial producer:

sender, err := qdb.NewLineSender(ctx,
qdb.WithQwp(),
qdb.WithAddress("localhost:9000"),
qdb.WithErrorHandler(func(e *qdb.SenderError) {
log.Printf("rejected: category=%s table=%s msg=%s fsn=[%d,%d]",
e.Category, e.TableName, e.ServerMessage, e.FromFsn, e.ToFsn)
}))

Full SenderError field set, for logging, alerting, and support correlation:

FieldTypeUse
CategoryCategoryStable named class (CategorySchemaMismatch, CategoryParseError, CategoryInternalError, CategorySecurityError, CategoryWriteError, CategoryProtocolViolation, CategoryUnknown). The recommended switch target.
ServerStatusByteintNumeric wire status (e.g. 0x03). NoStatusByte (-1) for CategoryProtocolViolation.
AppliedPolicyPolicyPolicyHalt or PolicyDropAndContinue — what the send loop did.
ServerMessagestringHuman-readable server text. ≤ 1024 UTF-8 bytes, English, may be empty. Safe to log; not a stable pattern-match key (switch on Category / ServerStatusByte). May echo table / column names — sanitise before forwarding to third-party error trackers.
TableNamestringRejected table; empty for unknown or multi-table batches.
FromFsn,ToFsnint64Inclusive FSN span; join to FlushAndGetSequence to identify the rejected rows.
MessageSequenceint64Server's per-frame wire sequence for the rejection frame. Resets on reconnect — only meaningful within one connection; round-trips verbatim against that connection's server-side logs. Not a standalone correlation key (see below). NoMessageSequence (-1) for protocol violations.
DetectedAttime.TimeClient-side receipt time, for ops timelines (not for correlation).

The protocol does not surface a server-issued request or connection identifier. The closest correlation handle is the (MessageSequence, FromFsn, ToFsn) tuple plus the connection start time from your application logs — MessageSequence resets on reconnect, so it only disambiguates frames within a single connection. The client sends an X-QWP-Client-Id header (default go/<version>) on the upgrade. When filing a support ticket, include the connection start time and the (MessageSequence, FromFsn, ToFsn) triple.

The per-category policy is configurable. Resolution precedence is the policy resolver, then the per-category policy, then the connect-string on_*_error keys, then the spec defaults. CategoryProtocolViolation and CategoryUnknown are always PolicyHalt:

qdb.WithErrorPolicy(qdb.CategorySchemaMismatch, qdb.PolicyDropAndContinue)
qdb.WithErrorPolicyResolver(func(c qdb.Category) qdb.Policy { ... })
qdb.WithErrorInboxCapacity(512)

After a PolicyHalt rejection, the sender stops draining and the next producer call returns the same payload as a typed error. Unwrap it with errors.As, then Close and rebuild the sender to continue:

if err := sender.Flush(ctx); err != nil {
var se *qdb.SenderError
if errors.As(err, &se) {
// se.Category, se.ServerMessage, se.FromFsn, se.ToFsn
}
}

The handler runs on a dedicated dispatcher goroutine, never on the producer goroutine. If the bounded inbox fills, surplus notifications are dropped and counted by QwpSender.DroppedErrorNotifications().

Query errors

Server-side query failures surface as a *QwpQueryError from the Batches() iteration or the Exec return value:

for batch, err := range q.Batches() {
if err != nil {
var qe *qdb.QwpQueryError
if errors.As(err, &qe) {
log.Printf("query %d failed: 0x%02X %s",
qe.RequestId, qe.Status, qe.Message)
}
break
}
// ...
}
CodeNameDescription
0x03SCHEMA_MISMATCHBind parameter type incompatible with placeholder
0x05PARSE_ERRORSQL syntax error or malformed message
0x06INTERNAL_ERRORServer-side execution failure
0x08SECURITY_ERRORAuthorization failure
0x09WRITE_ERRORWrite failure (e.g. table not accepting writes; DML)
0x0ACANCELLEDQuery terminated by Cancel
0x0BLIMIT_EXCEEDEDProtocol limit hit

QwpQueryError also carries RequestId (the client-assigned query id — the correlation key for support tickets and server-log matching) and Message (server-supplied UTF-8, English, may be empty; safe to log, but switch on Status, not on message text). Errors can arrive before any data or mid-stream. Once an error is yielded, no further batches arrive for that query.

Connection-level errors

  • Authentication failure: a 401 or 403 response before the WebSocket upgrade completes. Terminal across all endpoints.
  • Role mismatch: *QwpRoleMismatchError from NewQwpQueryClient when no configured endpoint satisfies the target= filter. It reports the endpoints tried, the last observed server role, and the last transport error.

Failover and high availability

Enterprise

Multi-host failover with automatic reconnect requires QuestDB Enterprise.

Single-host applications need nothing from this section. The simple loops shown earlier are already correct: treating any iteration error as terminal is always safe, including when a reconnect happens.

If you connect to multiple hosts for failover, a correct application must do exactly three things beyond the single-host code. This is the whole list:

  1. Ingestion: no loop changes. Configure multiple endpoints and a reconnect policy; reconnection is transparent to the producer. You still need the universal asynchronous error handling from Ingestion errors. Details: Ingestion failover.
  2. Querying: handle *QwpFailoverReset, but only if you accumulate rows. If you build up rows across batches, discard them on a reset and continue iterating. If you process each batch and keep nothing, the simple terminal-on-error loop is already correct. Pattern: Query failover.
  3. DDL/DML: Exec is not retried by default. A *QwpFailoverReset from Exec means the statement was not confirmed, not that it succeeded. Re-issue it only if it is idempotent, or opt into qdb.WithQwpQueryReplayExec(true). Details: the Exec caution.

Everything below is the detail behind these three points.

Multiple endpoints

Specify comma-separated addresses in the connect string, or pass them to the options API:

ws::addr=db-primary:9000,db-replica-1:9000,db-replica-2:9000;
client, err := qdb.NewQwpQueryClient(ctx,
qdb.WithQwpQueryEndpoints("db-primary:9000", "db-replica-1:9000"))

The client tries endpoints in order and walks the list to find the next healthy one on connection loss.

Ingestion failover

The ingestion sender uses a reconnect loop with exponential backoff. Configure it via the connect string or qdb.WithReconnectPolicy(maxDuration, initialBackoff, maxBackoff):

KeyDefaultDescription
reconnect_max_duration_millis300000Total outage budget before giving up
reconnect_initial_backoff_millis100First post-failure sleep
reconnect_max_backoff_millis5000Cap on per-attempt sleep
initial_connect_retryoffRetry on first connect

qdb.WithInitialConnectMode selects InitialConnectOff (default), InitialConnectSync (block the constructor while retrying), or InitialConnectAsync (return immediately and buffer rows until connected). Ingress is zone-blind: it pins QWP v1 and ignores the zone= key, so a connect string shared with query clients works unchanged. Reconnect is transparent to the producer; you do not change the ingestion loop for it.

Query failover

The query client drives a per-query reconnect loop. On a mid-stream transport error it reconnects and replays the query.

KeyDefaultDescription
failoveronMaster switch for reconnect
failover_max_attempts8Max reconnect attempts per query
failover_backoff_initial_ms50First post-failure sleep
failover_backoff_max_ms1000Cap on per-attempt sleep
failover_max_duration_ms30000Total wall-clock failover budget per query (0 = unbounded)
targetanyRole filter: any, primary, replica

The matching options are qdb.WithQwpQueryFailover, qdb.WithQwpQueryFailoverMaxAttempts, qdb.WithQwpQueryFailoverBackoff, qdb.WithQwpQueryFailoverMaxDuration, and qdb.WithQwpQueryTarget.

You only need the pattern below if you accumulate rows across batches and want the query to continue transparently across a reconnect. When failover occurs mid-stream, Batches() yields a non-fatal *QwpFailoverReset before the replayed batches arrive. Detect it with errors.As, discard the rows you accumulated from the prior connection (the server replays from the beginning), and continue iterating:

for batch, err := range q.Batches() {
if err != nil {
var reset *qdb.QwpFailoverReset
if errors.As(err, &reset) {
results = results[:0] // server replays from the beginning
continue
}
return err // any other error is terminal
}
// ...
}
Without the reset branch, accumulated rows are duplicated

If you accumulate rows across batches and do not handle *QwpFailoverReset, the rows you kept from the prior connection stay in your buffer while the server replays the entire result set from the beginning after the reconnect. The replayed rows are appended to the ones you already have, so every pre-failover row ends up in your result set twice. Either clear the accumulator on the reset (as shown above), or use the simple terminal-on-error loop, which discards everything on any error and so cannot duplicate.

If you do not need transparent continuation, the simple loop is correct: returning on any error treats a reset as terminal, which the client supports explicitly. When the failover budget is consumed, Batches() (and Exec) return *QwpFailoverExhaustedError.

After failover exhaustion or a total outage (all endpoints down), the query client enters a terminal state and returns errors on every subsequent call. Close it and create a new one. This differs from ingestion, where the LineSender has a continuous reconnect loop (reconnect_max_duration_millis, default 5 minutes) that spans full outages transparently. The query client reconnects only within the scope of a single query.

Failover requires multiple endpoints

Failover rotates across endpoints. With a single addr, there is no other host to try, and the loop exhausts after one attempt regardless of failover_max_attempts. For failover to be useful, provide at least two addresses.

Observability

QwpSender exposes counters for dashboards: TotalReconnectAttempts, TotalReconnectsSucceeded, TotalFramesReplayed, TotalBackpressureStalls, TotalServerErrors, and LastTerminalError. With drain_orphans=on, BackgroundDrainers() snapshots the goroutines adopting unacked data from crashed sibling senders. The query client exposes ServerInfo() and CurrentEndpoint(); QwpServerInfo.RoleName() returns the bound node's role.

There is no per-transition connection callback: connect, disconnect, reconnect, and failover are not delivered as events. Observe reconnect and failover through these counters, and terminal failures through the ingestion error handler. Poll the counters from a background goroutine:

go func() {
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for range t.C {
log.Printf("qwp: reconnects=%d/%d replayed=%d stalls=%d",
qs.TotalReconnectsSucceeded(), qs.TotalReconnectAttempts(),
qs.TotalFramesReplayed(), qs.TotalBackpressureStalls())
if e := qs.LastTerminalError(); e != nil {
// Page on-call: the sender has stopped draining.
log.Printf("qwp TERMINAL: %s", e)
}
}
}()

where qs is the qdb.QwpSender from the type assertion shown earlier.

For background and worked configurations, see client failover concepts, client failover configuration, and the multi-host failover and reconnect keys of the connect string reference.

Concurrency and parallel queries

Phase 1 limitation

The current implementation supports a single in-flight query per connection. Multi-query support is planned for a future release.

Neither the LineSender nor the QwpQueryClient is safe for concurrent use. For multi-threaded workloads, use one instance per goroutine. To run queries in parallel, create separate QwpQueryClient instances, one per goroutine. Cancel (on a *QwpQuery) and Close are safe to call from other goroutines, which is how you cancel an in-flight query or shut down cleanly.

Configuration reference

For the full list of connect-string keys and their defaults, see the connect string reference.

Common WebSocket-specific keys:

KeyDefaultDescription
auto_flush_rows1000Rows before auto-flush
auto_flush_interval100Milliseconds before auto-flush
sf_dirunsetStore-and-forward directory
sender_iddefaultSender slot identity for SF
request_durable_ackoffRequest durable upload ACK (Enterprise)
reconnect_max_duration_millis300000Ingress reconnect budget
failoveronQuery per-query reconnect switch
compressionrawQuery batch compression (raw, zstd)

Migration from ILP (HTTP/TCP)

The row-building API is unchanged across transports. The main differences:

AspectHTTP (ILP)WebSocket (QWP)
Connect string schemahttp:: / https::ws:: / wss::
Options transportqdb.WithHttp()qdb.WithQwp()
Auto-flush rows75,0001,000
Auto-flush interval1,000 ms100 ms
Error modelSynchronousAsync SenderErrorHandler
Store-and-forwardNot availableAvailable (sf_dir)
Multi-endpoint failoverLimitedFull reconnect loop
QueryingNot availableQwpQueryClient

The biggest behavioral change is the error model: on HTTP, Flush returns the rejection synchronously; on QWP it does not. To migrate, change the connect string from http:: to ws:: (or https:: to wss::), register a SenderErrorHandler, and adjust auto-flush settings if needed. QwpSender is a superset of LineSender, so existing ingestion code keeps working.

Full example: ingestion and querying with failover

This example combines ingestion with store-and-forward and connection observability, then queries the data back with the recreate-on-failure pattern for egress.

package main

import (
"context"
"errors"
"fmt"
"math/rand"
"time"

qdb "github.com/questdb/go-questdb-client/v4"
)

// ─── Ingestion (options API with store-and-forward) ─────────────────

// Multi-host with store-and-forward for failover durability.
// Without sf_dir, data buffered during an outage lives in process memory
// and is lost if the sender process dies. With sf_dir, unacknowledged
// frames are persisted to disk and replayed after reconnection.

func ingestExample() {
ctx := context.Background()

sender, err := qdb.NewLineSender(ctx,
qdb.WithQwp(),
qdb.WithAddress("db-primary:9000"), // Enterprise: multi-host
qdb.WithAddress("db-replica:9000"), // Enterprise: multi-host
qdb.WithTls(), // Enterprise: wss (TLS)
qdb.WithBearerToken("your_bearer_token"), // Enterprise: token auth
qdb.WithSfDir("/var/lib/myapp/qdb-sf"), // durability across outages
qdb.WithSenderId("ingest-1"), // unique per sender process
qdb.WithReconnectPolicy(
5*time.Minute, // max outage budget
100*time.Millisecond, // initial backoff
5*time.Second), // max backoff
qdb.WithErrorHandler(func(e *qdb.SenderError) {
fmt.Printf("batch rejected: category=%s table=%s msg=%s\n",
e.Category, e.TableName, e.ServerMessage)
}))
if err != nil {
panic(err)
}
defer sender.Close(ctx)

for i := 0; i < 100; i++ {
price := 1.0842 + (rand.Float64()-0.5)*0.002
err = sender.Table("book").
Symbol("ticker", "EURUSD").
Float64Column("price", price).
Float64Column("size", 100000+rand.Float64()*900000).
At(ctx, time.Now())
if err != nil {
fmt.Printf("row error: %s\n", err)
}
}
if err := sender.Flush(ctx); err != nil {
fmt.Printf("flush error: %s\n", err)
}
}

// With sf_dir set, unacknowledged frames are persisted to disk during
// the outage and replayed when the new primary becomes reachable.
// Without sf_dir, the reconnect loop still works but data is lost if
// the sender process dies.
//
// Observability (no per-event callback in Go):
// qs := sender.(qdb.QwpSender)
// qs.TotalReconnectAttempts()
// qs.TotalReconnectsSucceeded()
// qs.TotalFramesReplayed()
// qs.LastTerminalError()


// ─── Querying (connect string, with reconnect-on-failure) ───────────

// The QwpQueryClient becomes permanently dead after a total outage
// exhausts the failover budget. The application must close the dead
// client and create a new one. This pattern handles that:

func queryExample() {
ctx := context.Background()

connString :=
"wss::addr=db-primary:9000,db-replica:9000,db-replica2:9000;" + // Enterprise: wss, multi-host
"token=your_bearer_token;" + // Enterprise: token auth
"tls_verify=unsafe_off;" + // test only!
"failover=on;" + // Enterprise: failover
"failover_max_attempts=8;" +
"failover_max_duration_ms=30000;"

var client *qdb.QwpQueryClient

for {
// Reconnect if the client is dead
if client == nil {
var err error
client, err = qdb.QwpQueryClientFromConf(ctx, connString)
if err != nil {
fmt.Printf("connect failed: %s\n", err)
time.Sleep(2 * time.Second)
continue
}
}

q := client.Query(ctx,
"SELECT ts, ticker, price FROM book ORDER BY ts DESC LIMIT 10")

rowCount := 0
for batch, err := range q.Batches() {
if err != nil {
var reset *qdb.QwpFailoverReset
if errors.As(err, &reset) {
// Fires only when failover happens mid-query.
// Clear any accumulated partial results here.
fmt.Println("failover, clearing partial results")
rowCount = 0
continue
}
// Any other error is terminal for this client
fmt.Printf("query failed: %s\n", err)
q.Close()
client.Close(ctx)
client = nil
fmt.Println("(will reconnect on next query)")
break
}
for row := 0; row < batch.RowCount(); row++ {
ts := time.UnixMicro(batch.Int64(0, row))
ticker := batch.String(1, row)
price := batch.Float64(2, row)
fmt.Printf("%s %s price=%.5f\n",
ts.Format("2006-01-02T15:04:05.000Z"), ticker, price)
rowCount++
}
}
if client != nil {
q.Close()
fmt.Printf("(%d rows)\n", rowCount)
}

time.Sleep(2 * time.Second)
}
}