Skip to content

feat(ilp): binary wire protocol#7

Open
mtopolnik wants to merge 152 commits intomainfrom
jh_experiment_new_ilp
Open

feat(ilp): binary wire protocol#7
mtopolnik wants to merge 152 commits intomainfrom
jh_experiment_new_ilp

Conversation

@mtopolnik
Copy link

@mtopolnik mtopolnik commented Feb 25, 2026

TODO in run_tests_pipeline.yaml! Change before merging!

# TODO: remove branch once jh_experiment_new_ilp is merged
- script: git clone --depth 1 -b jh_experiment_new_ilp https://github.com/questdb/questdb.git ./questdb

Change to

- script: git clone --depth 1 https://github.com/questdb/questdb.git ./questdb

Summary

This PR adds a new WebSocket-based ingestion path to the Java client using QWP (QuestDB Wire Protocol), a binary protocol that replaces text-based ILP for higher-throughput data ingestion. The existing HTTP and TCP ILP senders remain unchanged.

Users select the new transport via Sender.builder(Transport.WEBSOCKET). The builder accepts WebSocket-specific options such as asyncMode, autoFlushBytes, autoFlushIntervalMillis, and inFlightWindowSize.

Architecture

The implementation follows a layered design:

Protocol layer (cutlass/qwp/protocol/)

  • QwpTableBuffer stores rows in columnar format using off-heap memory (zero-GC on the data path).
  • QwpSchemaHash computes XXHash64 over column names and types, enabling server-side schema caching. The client sends a full schema on the first batch and a hash reference on subsequent batches if the schema has not changed.
  • QwpGorillaEncoder applies delta-of-delta compression to timestamp columns.
  • QwpBitWriter handles bit-level packing for booleans and null bitmaps.
  • QwpConstants defines the wire format: "QWP1" magic bytes, type codes, feature flags, status codes.

Client layer (cutlass/qwp/client/)

  • QwpWebSocketSender implements the Sender interface. It uses a double-buffering scheme: the user thread writes rows into an active MicrobatchBuffer, which is sealed and handed to an I/O thread when an auto-flush trigger fires (row count, byte size, or time interval).
  • QwpWebSocketEncoder serializes QwpTableBuffer contents into binary QWP frames, including delta symbol dictionaries (only new symbols since the last acknowledged batch).
  • InFlightWindow implements a lock-free sliding window protocol that tracks batches awaiting server ACKs, providing backpressure from the server to the user thread.
  • WebSocketSendQueue runs the dedicated I/O thread, managing frame transmission and ACK/NACK response parsing.
  • GlobalSymbolDictionary assigns sequential integer IDs to symbol strings and supports delta encoding across batches.

WebSocket transport (cutlass/http/client/, cutlass/qwp/websocket/)

  • WebSocketClient is a zero-GC WebSocket implementation with platform-specific subclasses for Linux (epoll), macOS (kqueue), and Windows (select).
  • WebSocketFrameParser and WebSocketFrameWriter handle RFC 6455 frame serialization, including fragmentation, close-frame echo, and ping/pong.
  • WebSocketSendBuffer builds masked WebSocket frames directly in native memory.

Bug fixes and robustness improvements

The PR fixes a number of issues found during development and testing:

  • Fix native memory leaks in WebSocketClient constructor and on allocation failure.
  • Fix sendQueue leak on close when flush fails.
  • Fix integer overflows in buffer growth (WebSocketClient, WebSocketSendBuffer, QwpTableBuffer), array dimension products, and putBlockOfBytes().
  • Fix lone surrogate hash mismatch between schema hashing and wire encoding.
  • Fix receiveFrame() throwing instead of returning false, which masked I/O errors as timeouts.
  • Fix pong/close frames clobbering an in-progress send buffer.
  • Fix delta dictionary corruption on send failure by rolling back symbol IDs.
  • Fix stale array offsets after cancelRow() truncation.
  • Fix case-insensitive header validation in WebSocket handshake.
  • Cap receive buffer growth to prevent OOM.
  • Use SecureRnd (ChaCha20-based CSPRNG) for WebSocket masking keys instead of java.util.Random.
  • Validate table names, column names, WebSocket payload lengths, and UTF-8 low surrogates.

Code cleanup

The PR removes ~11,000 lines of dead code:

  • Delete unused utility classes: ConcurrentHashMap (3,791 lines), ConcurrentIntHashMap (3,612 lines), GenericLexer, Base64Helper, LongObjHashMap, FilesFacade, and others.
  • Remove unused methods from Numbers, Chars, Utf8s, Rnd, and ColumnType.
  • Delete obsolete classes: ParanoiaState, GeoHashes, BorrowedArray, HttpCookie.
  • Modernize code style: enhanced switch expressions, pattern variables in instanceof checks.
  • Upgrade minimum Java version from 11 to 17.

CI changes

  • Add a ClientIntegrationTests CI stage that starts a QuestDB server and runs the client's integration tests against it (both default and authenticated configurations).
  • Cache Maven dependencies in CI to speed up builds.
  • Fix sed portability for macOS CI runners.
  • Enable the HTTP server in CI test configurations (required for WebSocket).

Test plan

  • Unit tests cover all protocol building blocks: bit writer, Gorilla encoder, schema hash, column definitions, constants, table buffer, native buffer writer, off-heap memory
  • Unit tests cover WebSocket frame parsing/writing, send buffer, send queue, in-flight window, microbatch buffer, delta/global symbol dictionaries
  • QwpSenderTest (8,346 lines) exercises the full Sender API surface for all column types, null handling, cancelRow, schema changes, and error paths
  • QwpWebSocketSenderTest tests WebSocket-specific sender behavior including async mode
  • QwpWebSocketEncoderTest validates binary encoding for all column types and encoding modes
  • LineSenderBuilderWebSocketTest covers builder validation and configuration for the WebSocket transport
  • Integration tests run the client against a real QuestDB server in CI (default and authenticated)
  • assertMemoryLeak wrappers added to client tests to detect native memory leaks

bluestreak01 and others added 30 commits February 14, 2026 20:05
sendPongFrame() used the shared sendBuffer, calling reset()
which destroyed any partially-built frame the caller had in
progress via getSendBuffer(). This could happen when a PING
arrived during receiveFrame()/tryReceiveFrame() while the
caller was mid-way through constructing a data frame.

Add a dedicated 256-byte controlFrameBuffer for sending pong
responses. RFC 6455 limits control frame payloads to 125 bytes
plus a 14-byte max header, so 256 bytes is sufficient and never
needs to grow.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
sendCloseFrame() used reason.length() (UTF-16 code units) to
calculate the payload size, but wrote reason.getBytes(UTF_8)
(UTF-8 bytes) into the buffer. For non-ASCII close reasons,
UTF-8 encoding can be longer than the UTF-16 length, causing
writes past the declared payload size. This corrupted the
frame header length, the masking range, and could overrun the
allocated buffer.

Compute the UTF-8 byte array upfront and use its length for
all sizing calculations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When receiving a CLOSE frame from the server, the client now echoes
a close frame back before marking the connection as no longer
upgraded. This is required by RFC 6455 Section 5.5.1.

The close code parsing was moved out of the handler-null check so
the code is always available for the echo. The echo uses the
dedicated controlFrameBuffer to avoid clobbering any in-progress
frame in the main send buffer.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Handle CONTINUATION frames (opcode 0x0) in tryParseFrame()
which were previously silently dropped. Fragment payloads are
accumulated in a lazily-allocated native memory buffer and
delivered as a complete message to the handler when the final
FIN=1 frame arrives.

The FIN bit is now checked on TEXT/BINARY frames: FIN=0 starts
fragment accumulation, FIN=1 delivers immediately. Protocol
errors are raised for continuation without an initial fragment
and for overlapping fragmented messages.

The fragment buffer is freed in close() and the fragmentation
state is reset on disconnect().

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add a configurable maximum size for the WebSocket receive buffer,
mirroring the pattern already used by WebSocketSendBuffer. Previously,
growRecvBuffer() doubled the buffer without any upper bound, allowing
a malicious server to trigger out-of-memory by sending arbitrarily
large frames.

Add getMaximumResponseBufferSize() to HttpClientConfiguration
(defaulting to Integer.MAX_VALUE for backwards compatibility) and
enforce the limit in both growRecvBuffer() and
appendToFragmentBuffer(), which had the same unbounded growth issue
for fragmented messages.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Tests that expect connection failure were hardcoding ports
(9000, 19999) which could collide with running services. When
a QuestDB server is running on port 9000, the WebSocket
connection succeeds and the test fails with "Expected
LineSenderException".

Replace hardcoded ports with dynamically allocated ephemeral
ports via ServerSocket(0). The port is bound and immediately
closed, guaranteeing nothing is listening when the test tries
to connect.

Affected tests:
- testBuilderWithWebSocketTransportCreatesCorrectSenderType
- testConnectionRefused
- testWsConfigString
- testWsConfigString_missingAddr_fails
- testWsConfigString_protocolAlreadyConfigured_fails

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The Sec-WebSocket-Accept header validation used case-sensitive
String.contains(), which violates RFC 7230 (HTTP headers are
case-insensitive). A server sending the header in a different
casing (e.g., sec-websocket-accept) would cause the handshake
to fail.

Replace with a containsHeaderValue() helper that uses
String.regionMatches(ignoreCase=true) for the header name
lookup, avoiding both the case-sensitivity bug and unnecessary
string allocation from toLowerCase().

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace byte-by-byte native-heap copies in writeToSocket and
readFromSocket with Unsafe.copyMemory(), using the 5-argument
form that bridges native memory and Java byte arrays via
Unsafe.BYTE_OFFSET.

Add WebSocketChannelTest with a local echo server that verifies
data integrity through the copy paths across various payload
sizes and patterns.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Move maxSentSymbolId and sentSchemaHashes updates to after the
send/enqueue succeeds in both async and sync flush paths. Previously
these were updated before the send, so if sealAndSwapBuffer() threw
(async) or sendBinary()/waitForAck() threw (sync), the next batch's
delta dictionary would omit symbols the server never received,
silently corrupting subsequent data.

Also move sentSchemaHashes.add() inside the messageSize > 0 guard
in the sync path, where it was incorrectly marking schemas as sent
even when no data was produced.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The validate() range check used TYPE_DECIMAL256 (0x15) as the upper
bound, which excluded TYPE_CHAR (0x16). CHAR columns would throw
IllegalArgumentException on validation.

Extend the upper bound to TYPE_CHAR and add tests covering all valid
type codes, nullable CHAR, and invalid type rejection.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace raw AssertionError with LineSenderException when a token
parameter is provided in ws:: or wss:: configuration strings. The
else branch in config string parsing was unreachable when the code
only supported HTTP and TCP, but became reachable after WebSocket
support was added. Users now get a clear "token is not supported
for WebSocket protocol" error instead of a cryptic AssertionError.

Add test assertions for both ws:: and wss:: schemas with token.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
mtopolnik and others added 30 commits March 3, 2026 12:34
Remove classes that lost all production callers after the
previous round of dead-code removal:

- BufferWindowCharSequence (only implementor was GenericLexer)
- ImmutableIterator (only implementor was GenericLexer)
- CharSequenceHashSet (only used in GenericLexer)
- BiIntFunction (only used in ConcurrentIntHashMap)
- GeoHashes (only caller was Rnd.nextGeoHash())

Also remove Rnd.nextGeoHash() which has zero callers, and
delete the CharSequenceHashSetTest.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Remove 16 dead public methods/constants that have zero callers
in non-test code within the java-questdb-client submodule:

Numbers: isPow2(), parseFloat(), parseIPv4Quiet(),
parseInt000Greedy(), parseIntQuiet(), parseIntSafely(),
parseLong000000Greedy()

Unsafe: arrayGetVolatile() (2 overloads), arrayPutOrdered()
(2 overloads), defineAnonymousClass(), getNativeAllocator(),
setRssMemLimit()

Also: Misc.getWorkerAffinity(), Hash.hashLong128_32(),
Hash.hashLong128_64(), IOOperation.HEARTBEAT

Clean up infrastructure that only served the removed methods:
AnonymousClassDefiner interface and its two implementations,
NATIVE_ALLOCATORS array and constructNativeAllocator(),
INT_OFFSET/INT_SCALE fields, and newly unused imports.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Remove dead methods with no production callers from five
classes:

Numbers.java: appendLong256(), appendUuid(),
appendHexPadded(long, int), intToIPv4Sink(), and their
private helpers (appendLong256Four/Three/Two).

Chars.java: contains(), noMatch().

Utf8s.java: stringFromUtf8BytesSafe(), both toString()
overloads, validateUtf8() and its four private helpers.

ColumnType.java: encodeArrayTypeWithWeakDims().

Update test code that referenced deleted methods:
TestUtils replaces Chars.contains() with String.contains()
and inlines ipv4ToString(). TestHttpClient replaces
Utf8s.toString() with a direct null-safe call.
ColumnTypeTest and NumbersTest drop tests for removed
methods.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The client's Unsafe class had getRssMemLimit(),
checkAllocLimit(), and RSS_MEM_LIMIT_ADDR, but no
setRssMemLimit() method. The limit address was always zero,
making the allocation limit check in malloc() and realloc()
dead code. Remove the field, both methods, the static
initializer slot, and the now-stale allocator.rs layout
comment.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
QwpWebSocketEncoder had two buffer fields: ownedBuffer (the
allocated NativeBufferWriter) and buffer (the active write target,
typed as QwpBufferWriter interface). The indirection existed so
callers could inject an external buffer via setBuffer(), but no
code ever called that method.

The split caused a use-after-free: close() freed and nulled
ownedBuffer but left buffer pointing to the closed writer.

Merge the two fields into a single NativeBufferWriter buffer.
Delete setBuffer(), isUsingExternalBuffer(), and the conditional
guard in reset(). close() now nulls the only reference.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
ColumnBuffer constructor leaks allocateStorage() buffers when the
subsequent Unsafe.calloc() for the null bitmap throws. Wrap both
allocateStorage() and calloc in try/catch so close() frees any
partially allocated buffers before rethrowing.

Similarly, allocateStorage() leaks stringOffsets when the stringData
allocation fails for STRING/VARCHAR columns. Guard the second
allocation with try/catch that closes stringOffsets on failure.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
QwpSchemaHash.computeSchemaHash() and computeSchemaHashDirect()
encoded lone surrogates (high surrogate at end of string, or
lone low surrogate) as 3-byte UTF-8, while
OffHeapAppendMemory.putUtf8() replaced them with '?' (1 byte).
This mismatch caused the schema hash to diverge from the actual
wire encoding when a column name contained a lone surrogate.

Add a Character.isSurrogate(c) guard before the 3-byte else
branch in both methods, so lone surrogates hash as '?' —
consistent with putUtf8(). Add tests covering lone high
surrogate at end of string and lone low surrogate for both
computeSchemaHash and computeSchemaHashDirect.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add a Cache@2 task to the Azure Pipelines BuildAndTest matrix
so that the 4 platform jobs (linux, mac-arm, mac-x64, windows)
reuse downloaded Maven dependencies across runs instead of
fetching them from scratch each time.

The cache key incorporates the OS and all pom.xml hashes.
A restoreKeys fallback on just the OS ensures partial cache
hits when dependencies change.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add always() to the condition expressions in questdb_stop.yaml so
the stop steps execute regardless of whether prior steps succeeded.
Without this, Azure DevOps defaults to succeeded() and skips the
cleanup when Maven tests fail, leaving the QuestDB server process
orphaned until the VM is torn down.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The Cache@2 task uses $(HOME)/.m2/repository as the cache path.
On Linux/macOS agents, HOME is a native environment variable that
Azure DevOps imports as a pipeline variable. On Windows, HOME is
not set natively (Windows uses USERPROFILE), so $(HOME) remains
unexpanded as a literal string, causing tar to fail with
"could not chdir to 'D:\a\1\s\$(HOME)\.m2\repository'".

Add a step before the cache task that sets HOME from USERPROFILE
on Windows agents via ##vso[task.setvariable]. The step runs only
when Agent.OS is Windows_NT, leaving Linux/macOS unchanged.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
QwpWebSocketSender.shouldAutoFlush() accepted an
autoFlushBytes parameter but never evaluated it. This
commit implements the byte threshold by querying actual
column buffer sizes rather than maintaining an estimated
counter.

This counts the bytes in column buffers, before
wire-encoding. Encoded size will be less due to Gorilla
compression, bit-packing, etc.

QwpTableBuffer.getBufferedBytes() sums OffHeapAppendMemory
append offsets and array data across all columns.
QwpWebSocketSender.getPendingBytes() aggregates this across
all table buffers. shouldAutoFlush() now checks this sum
against autoFlushBytes between the row-count and interval
checks.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add testSenderBuilderWebSocket() to QwpSenderTest which exercises the
production Sender.builder(Transport.WEBSOCKET) path end-to-end. All
existing E2E tests use QwpWebSocketSender.connect() directly, but
production users would use Sender.builder(). The new test writes rows
with mixed column types (symbol, double, long, boolean, string) and
verifies data arrives correctly via SQL queries.

Also convert string concatenation in assertion expected values to
text blocks throughout QwpSenderTest for improved readability.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
QwpWebSocketSender.table() accepted any CharSequence without
validation, unlike the HTTP sender which validates via
TableUtils. Empty, null, or whitespace-only table names passed
through to the server, causing internal errors instead of
clear client-side diagnostics.

Add validateTableName() in table() and checkedColumnName() in
all column methods. Both use TableUtils.isValidTableName/
isValidColumnName to reject empty, null, too-long, and names
with illegal characters, throwing LineSenderException with a
descriptive message.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
QwpTableBuffer.addDecimal64/128/256 silently rescale values
when scales differ, but when rescaling down would lose
precision, Decimal256.rescale() threw a raw NumericException
that leaked to the caller unwrapped.

Catch NumericException from rescale() and wrap it in
LineSenderException with a clear message naming the column
and the incompatible scales. Add unit tests for the
Decimal64 and Decimal128 precision-loss paths.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
WebSocketClient.upgrade() now accepts an optional Authorization header,
which is included in the HTTP upgrade request. QwpWebSocketSender
connect/connectAsync factory methods accept an authorizationHeader
parameter and pass it through to upgrade(). Sender.Builder builds Basic
or Bearer headers from httpUsernamePassword/httpToken and wires them to
the WebSocket sender. The config-string parser also accepts token and
username/password for ws::/wss:: schemas.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The config string parser accepted the `token` parameter for
WebSocket protocols (ws/wss) and passed it through to httpToken(),
which caused a confusing "Failed to connect" error instead of a
clear validation message. Add an early check for PROTOCOL_WEBSOCKET
in the token parsing branch to throw a descriptive error before
attempting any connection.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants