feat: Add ArrivalOrder to ArrowScan for bounded-memory concurrent reads#3046
feat: Add ArrivalOrder to ArrowScan for bounded-memory concurrent reads#3046sumedhsakdeo wants to merge 30 commits intoapache:mainfrom
Conversation
ab8c31b to
7ad9910
Compare
Add batch_size parameter to _task_to_record_batches, _record_batches_from_scan_tasks_and_deletes, ArrowScan.to_record_batches, and DataScan.to_arrow_batch_reader so users can control the number of rows per RecordBatch returned by PyArrow's Scanner. Closes partially apache#3036 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
7ad9910 to
c86f0be
Compare
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
c86f0be to
05e07d1
Compare
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
05f330d to
240a860
Compare
65a5007 to
1da7eb6
Compare
Introduce ScanOrder.TASK (default) and ScanOrder.ARRIVAL to control batch ordering. TASK materializes each file before yielding; ARRIVAL yields batches as produced for lower memory usage. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add _bounded_concurrent_batches() with proper lock discipline: - Queue backpressure caps memory (scan.max-buffered-batches, default 16) - Semaphore limits concurrent file reads (concurrent_files param) - Cancel event with timeouts on all blocking ops (no lock over IO) - Error propagation and early termination support When streaming=True and concurrent_files > 1, batches are yielded as they arrive from parallel file reads. File ordering is not guaranteed (documented). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace shared ExecutorFactory + Semaphore with per-scan ThreadPoolExecutor(max_workers=concurrent_files) for deterministic shutdown and simpler concurrency control. Refactor to_record_batches into helpers: - _prepare_tasks_and_deletes: resolve delete files - _iter_batches_streaming: bounded concurrent streaming path - _iter_batches_materialized: executor.map materialization path - _apply_limit: unified row limit logic (was duplicated) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tests and docs Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Setting `mock.call_count = 0` does not actually reset the mock's internal call tracking, causing the second assertion to see accumulated calls from both test phases. Use `reset_mock()` instead. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add a parametrized benchmark case for default (executor.map) with max_workers=4 to compare memory/throughput against unbounded threading. Add TTFR (time to first record) measurement across all configurations. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add a "which config should I use?" tip box with recommended starting points for common use cases, and clarify that batch_size is an advanced tuning knob. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Remove @pytest.mark.benchmark so the read throughput tests are included in the default `make test` filter as parametrize-marked tests. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…and docs Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1da7eb6 to
afb244c
Compare
pyiceberg/io/pyarrow.py
Outdated
| finally: | ||
| cancel.set() | ||
| # Drain the queue to unblock any workers stuck on put() | ||
| while not batch_queue.empty(): |
There was a problem hiding this comment.
This could cause a worker to hang when concurrent_files > 1 and max_buffered_batches=1. Here is an example.
Starting state
max_buffered_batches=1, concurrent_files=3. Queue is full (1 item). Workers A, B, and C are all blocked on batch_queue.put().
Timeline
| Step | Main thread | Workers A, B, C |
|---|---|---|
| 1 | cancel.set() |
|
| 2 | get_nowait() → removes 1 item. Queue: 0. Internally notifies Worker A |
Worker A: woken but hasn't run yet |
| 3 | empty() → True (queue IS empty because A hasn't put yet). Exits drain loop. |
|
| 4 | executor.__exit__() → shutdown(wait=True), joins all threads... |
Worker A runs, put() completes → Queue: 1. Checks cancel → returns. ✓ |
| 5 | DEADLOCK — waiting for B and C to finish | Workers B, C: still blocked on put(). Queue is full, nobody will ever drain. |
Fix
In the worker use put with a timeout so it can check if the thread is canceled periodically.
There was a problem hiding this comment.
Good catch. I did have timeout in the previous version of the code, but it was causing performance regression. Exploring few other alternatives like condition variables, more complex but does not result in the bug.
There was a problem hiding this comment.
We have a 15% regression if timeout is added. Will need to find another way.
│ Config │ Throughput (rows/s) │ Time │ TTFR │ Peak Memory │
├──────────────────────────────┼─────────────────────┼───────┼────────┼─────────────┤
│ default (TASK, all-parallel) │ 199M │ 0.08s │ 60.9ms │ 609.5 MB │
├──────────────────────────────┼─────────────────────┼───────┼────────┼─────────────┤
│ arrival-cf1 │ 56.6M │ 0.28s │ 29.9ms │ 10.3 MB │
├──────────────────────────────┼─────────────────────┼───────┼────────┼─────────────┤
│ arrival-cf2 │ 93.3M │ 0.17s │ 32.6ms │ 42.6 MB │
├──────────────────────────────┼─────────────────────┼───────┼────────┼─────────────┤
│ arrival-cf4 │ 159.7M │ 0.10s │ 32.9ms │ 114.7 MB │
├──────────────────────────────┼─────────────────────┼───────┼────────┼─────────────┤
│ arrival-cf8 │ 186.4M │ 0.09s │ 37.8ms │ 276.2 MB │
├──────────────────────────────┼─────────────────────┼───────┼────────┼─────────────┤
│ arrival-cf16 │ 188.9M │ 0.09s │ 53.4ms │ 453.2 MB │
└──────────────────────────────┴─────────────────────┴───────┴────────┴─────────────┘
Replace ScanOrder(str, Enum) with: - ScanOrder(ABC): Base class for ordering strategies - TaskOrder(ScanOrder): Default behavior, preserves existing API - ArrivalOrder(ScanOrder): Encapsulates concurrent_streams and max_buffered_batches This addresses reviewer feedback about unused parameters and improves type safety. Parameters are now scoped to their appropriate ordering mode. Rename concurrent_files → concurrent_streams for clarity. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Replace ScanOrder.TASK/ARRIVAL with TaskOrder()/ArrivalOrder() instances. Update concurrent_files → concurrent_streams parameter usage. All existing test scenarios preserved with new type-safe API. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Restructure parameterized benchmark tests to use ScanOrder class instances: - TaskOrder() for default behavior - ArrivalOrder(concurrent_streams=N) for streaming configurations Simplifies test parameters by eliminating separate concurrent_files argument. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Replace ScanOrder enum examples with new class-based API: - TaskOrder() for default behavior - ArrivalOrder(concurrent_streams=N) for streaming - ArrivalOrder(concurrent_streams=N, max_buffered_batches=M) for memory control Add configuration guidance table and update ordering semantics. Rename concurrent_files → concurrent_streams throughout examples. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
- Remove ABC inheritance from ScanOrder since no abstract methods are defined - Remove unused enum.Enum import - Fix B008 error by moving TaskOrder() call from function default to inside function - Clean up dataclass formatting 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
- Break long line in _iter_batches_arrival call for better readability - Fix B008 error by moving TaskOrder() call from function default to inside function - Sort imports alphabetically 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
- Update all test function calls to use concurrent_streams parameter - Fix parameter name mismatch with _bounded_concurrent_batches function signature - Update variable names and comments to match new parameter name 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
- Sort imports alphabetically as required by ruff formatting - No functional changes 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
- Add batch_size parameter to ArrivalOrder class with comprehensive documentation - Include memory formula: Peak memory ≈ concurrent_streams × batch_size × max_buffered_batches × (average row size) - Update default concurrent_streams from 1 to 8 for better performance out-of-the-box - Remove batch_size parameter from to_arrow_batch_reader() and to_record_batches() methods - Simplify API by putting batch_size where it has direct memory impact (streaming orders) - TaskOrder uses PyArrow defaults, ArrivalOrder provides full memory control 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
- Update benchmark tests to use simplified parameter structure - Remove separate batch_size parameter from test calls - Fix concurrent_streams validation error message in unit tests - Maintain all existing test coverage and functionality 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
- Update all examples to use new ArrivalOrder(batch_size=X) syntax - Add comprehensive memory formula with row size calculation - Remove backward compatibility references (batch_size is new in this PR) - Include performance characteristics and use case recommendations - Provide clear guidance on TaskOrder vs ArrivalOrder memory behavior 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
- Import TaskOrder from pyiceberg.table in pyarrow.py - Change to_record_batches signature to use TaskOrder() as default instead of None, ensuring consistent default scan ordering behavior Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace TaskOrder() function call in argument default with a module-level singleton _DEFAULT_SCAN_ORDER to satisfy ruff B008 (no function calls in argument defaults). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
When concurrent_files > 1 and max_buffered_batches is small, multiple workers can be blocked on batch_queue.put() at the moment the consumer closes early (e.g. due to a limit). The previous drain loop used get_nowait() + empty() which had a race: empty() could return True before a just-notified worker had a chance to put, leaving remaining workers stuck on put() forever while executor.shutdown(wait=True) hung. Fix: replace the racy drain loop with a blocking drain-until-sentinel loop. Each get() naturally wakes one blocked worker via not_full.notify(); that worker checks cancel and returns, eventually allowing the last worker to put the sentinel. Stopping only on the sentinel guarantees all workers have finished before we exit. Also move batch_queue.put(_QUEUE_SENTINEL) outside remaining_lock to avoid holding a lock during a potentially blocking call. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Summary
Addresses #3036 — ArrowScan.to_record_batches() uses executor.map + list() which eagerly materializes all record batches per file into memory, causing OOM on large tables.
This PR adds a new
orderparameter toto_arrow_batch_reader()with two implementations:TaskOrder(default) — preserves existing behavior: batches grouped by file in task submission order, each file fully materialized before proceeding to the next.ArrivalOrder— yields batches as they are produced across files without materializing entire files into memory. Accepts three sub-parameters:concurrent_streams: int— number of files to read concurrently (default: 8). A per-scanThreadPoolExecutor(max_workers=concurrent_streams)bounds concurrency.batch_size: int | None— number of rows per batch passed to PyArrow's ds.Scanner (default: PyArrow's built-in 131,072).max_buffered_batches: int— size of the bounded queue between producers and consumer (default: 16), providing backpressure to cap memory usage.Problem
The current implementation materializes all batches from each file via list() inside executor.map, which runs up to min(32, cpu_count+4) files in parallel. For large files this means all batches from ~20 files are held in memory simultaneously before any are yielded to the consumer.
Solution
Before: OOM on large tables
After: bounded memory, tunable parallelism
Default behavior is unchanged —
TaskOrderpreserves the existing executor.map + list() path for backwards compatibility.Architecture
When
order=ArrivalOrder(...), batches flow through_bounded_concurrent_batches:ThreadPoolExecutor(max_workers=concurrent_streams)Queue(maxsize=max_buffered_batches)— when full, workers block (backpressure)queue.get()Refactored
to_record_batchesinto helpers:_prepare_tasks_and_deletes,_iter_batches_arrival,_iter_batches_materialized,_apply_limit.Ordering semantics
TaskOrder()(default)ArrivalOrder(concurrent_streams=1)ArrivalOrder(concurrent_streams>1)PR Stack
Breakdown of this large PR into smaller PRs:
batch_sizeforwardingTaskOrder/ArrivalOrderenum — stop materializing entire filesconcurrent_streams— bounded concurrent reads in arrival orderBenchmark results
32 files × 500K rows, 5 columns (int64, float64, string, bool, timestamp), batch_size=131,072 (PyArrow default):
TTFR = Time to First Record, cs = concurrent_streams
Note on throughput plateau at cs=8: This benchmark runs against local filesystem where Parquet reads are CPU-bound (decompression + decoding). Throughput plateaus once enough threads saturate available cores. On cloud storage (S3/GCS/ADLS), reads are I/O-bound with 50-200ms per-file latency, so higher
concurrent_streamsvalues (16-64+) would continue to show throughput gains until network bandwidth saturates. The optimalconcurrent_streamswill be higher for remote storage than what this local benchmark suggests.Positional deletes, row filters, and limit are handled correctly in all modes.
Are these changes tested?
Yes. 25 new unit tests across two test files, plus a micro-benchmark:
tests/io/test_pyarrow.py(16 tests): batch_size controls rows per batch, arrival order yields all rows correctly, arrival order respects limit, within-file ordering preserved, positional deletes applied correctly in all three modes (task order, arrival order, concurrent), positional deletes with limit, concurrent_streams < 1 raises ValueErrortests/io/test_bounded_concurrent_batches.py(10 tests): single/multi-file correctness, incremental streaming, backpressure blocks producers when queue is full, error propagation from workers to consumer, early termination cancels workers cleanly, no deadlock when concurrent_streams > max_buffered_batches on early termination, concurrency limit enforced, empty task list, ArrowScan integration with limittests/benchmark/test_read_benchmark.py: read throughput micro-benchmark across 6 configurations measuring rows/sec, TTFR, and peak Arrow memoryAre there any user-facing changes?
Yes. New
orderparameter onDataScan.to_arrow_batch_reader():order: ScanOrder | None— controls batch ordering. PassTaskOrder()(default) orArrivalOrder(concurrent_streams=N, batch_size=B, max_buffered_batches=M).New public classes
TaskOrderandArrivalOrder(subclasses ofScanOrder) exported frompyiceberg.table.All parameters are optional with backwards-compatible defaults. Existing code is unaffected.
Documentation updated in
mkdocs/docs/api.mdwith usage examples, ordering semantics, and configuration guidance table.