Skip to content

Commit 3df3157

Browse files
alambsdf-jkl
andauthored
[57_maintenance] [regression] Error with adaptive predicate pushdown: "Invalid offset …(#9301) (#9309)
- Part of #9240 - Related to #9239 This is a backport of the following PRs to the 57 line - #9243 from @erratic-pattern (test) - #9301 from @sdf-jkl (the fix) Co-authored-by: Kosta Tarasov <33369833+sdf-jkl@users.noreply.github.com>
1 parent 9e822e0 commit 3df3157

File tree

2 files changed

+145
-1
lines changed
  • parquet/src/arrow

2 files changed

+145
-1
lines changed

parquet/src/arrow/async_reader/mod.rs

Lines changed: 135 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -782,7 +782,7 @@ mod tests {
782782
use arrow::error::Result as ArrowResult;
783783
use arrow_array::builder::{Float32Builder, ListBuilder, StringBuilder};
784784
use arrow_array::cast::AsArray;
785-
use arrow_array::types::Int32Type;
785+
use arrow_array::types::{Int32Type, TimestampNanosecondType};
786786
use arrow_array::{
787787
Array, ArrayRef, BooleanArray, Int8Array, Int32Array, Int64Array, RecordBatchReader,
788788
Scalar, StringArray, StructArray, UInt64Array,
@@ -2308,4 +2308,138 @@ mod tests {
23082308

23092309
Ok(())
23102310
}
2311+
2312+
/// Regression test for adaptive predicate pushdown attempting to read skipped pages.
2313+
/// Related issue: https://github.com/apache/arrow-rs/issues/9239
2314+
#[tokio::test]
2315+
async fn test_predicate_pushdown_with_skipped_pages() {
2316+
use arrow_array::TimestampNanosecondArray;
2317+
use arrow_schema::TimeUnit;
2318+
2319+
// Time range constants
2320+
const TIME_IN_RANGE_START: i64 = 1_704_092_400_000_000_000;
2321+
const TIME_IN_RANGE_END: i64 = 1_704_110_400_000_000_000;
2322+
const TIME_BEFORE_RANGE: i64 = 1_704_078_000_000_000_000;
2323+
2324+
// Create test data: 2 row groups, 300 rows each
2325+
// "tag" column: 'a', 'b', 'c' (100 rows each, sorted)
2326+
// "time" column: alternating in-range/out-of-range timestamps
2327+
let schema = Arc::new(Schema::new(vec![
2328+
Field::new(
2329+
"time",
2330+
DataType::Timestamp(TimeUnit::Nanosecond, None),
2331+
false,
2332+
),
2333+
Field::new("tag", DataType::Utf8, false),
2334+
]));
2335+
2336+
let props = WriterProperties::builder()
2337+
.set_max_row_group_size(300)
2338+
.set_data_page_row_count_limit(33)
2339+
.build();
2340+
2341+
let mut buffer = Vec::new();
2342+
let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap();
2343+
2344+
// Write 2 row groups
2345+
for _ in 0..2 {
2346+
for (tag_idx, tag) in ["a", "b", "c"].iter().enumerate() {
2347+
let times: Vec<i64> = (0..100)
2348+
.map(|j| {
2349+
let row_idx = tag_idx * 100 + j;
2350+
if row_idx % 2 == 0 {
2351+
TIME_IN_RANGE_START + (j as i64 * 1_000_000)
2352+
} else {
2353+
TIME_BEFORE_RANGE + (j as i64 * 1_000_000)
2354+
}
2355+
})
2356+
.collect();
2357+
let tags: Vec<&str> = (0..100).map(|_| *tag).collect();
2358+
2359+
let batch = RecordBatch::try_new(
2360+
schema.clone(),
2361+
vec![
2362+
Arc::new(TimestampNanosecondArray::from(times)) as ArrayRef,
2363+
Arc::new(StringArray::from(tags)) as ArrayRef,
2364+
],
2365+
)
2366+
.unwrap();
2367+
writer.write(&batch).unwrap();
2368+
}
2369+
writer.flush().unwrap();
2370+
}
2371+
writer.close().unwrap();
2372+
let buffer = Bytes::from(buffer);
2373+
// Read back with various page index policies, should get the same answer with all
2374+
for policy in [
2375+
PageIndexPolicy::Skip,
2376+
PageIndexPolicy::Optional,
2377+
PageIndexPolicy::Required,
2378+
] {
2379+
println!("Testing with page index policy: {:?}", policy);
2380+
let reader = TestReader::new(buffer.clone());
2381+
let options = ArrowReaderOptions::default().with_page_index_policy(policy);
2382+
let builder = ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
2383+
.await
2384+
.unwrap();
2385+
2386+
let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
2387+
let num_row_groups = builder.metadata().num_row_groups();
2388+
2389+
// Initial selection: skip middle 100 rows (tag='b') per row group
2390+
let mut selectors = Vec::new();
2391+
for _ in 0..num_row_groups {
2392+
selectors.push(RowSelector::select(100));
2393+
selectors.push(RowSelector::skip(100));
2394+
selectors.push(RowSelector::select(100));
2395+
}
2396+
let selection = RowSelection::from(selectors);
2397+
2398+
// Predicate 1: time >= START
2399+
let time_gte_predicate =
2400+
ArrowPredicateFn::new(ProjectionMask::roots(&schema_descr, [0]), |batch| {
2401+
let col = batch.column(0).as_primitive::<TimestampNanosecondType>();
2402+
Ok(BooleanArray::from_iter(
2403+
col.iter().map(|t| t.map(|v| v >= TIME_IN_RANGE_START)),
2404+
))
2405+
});
2406+
2407+
// Predicate 2: time < END
2408+
let time_lt_predicate =
2409+
ArrowPredicateFn::new(ProjectionMask::roots(&schema_descr, [0]), |batch| {
2410+
let col = batch.column(0).as_primitive::<TimestampNanosecondType>();
2411+
Ok(BooleanArray::from_iter(
2412+
col.iter().map(|t| t.map(|v| v < TIME_IN_RANGE_END)),
2413+
))
2414+
});
2415+
2416+
let row_filter = RowFilter::new(vec![
2417+
Box::new(time_gte_predicate),
2418+
Box::new(time_lt_predicate),
2419+
]);
2420+
2421+
// Output projection: Only tag column (time not in output)
2422+
let projection = ProjectionMask::roots(&schema_descr, [1]);
2423+
2424+
let stream = builder
2425+
.with_row_filter(row_filter)
2426+
.with_row_selection(selection)
2427+
.with_projection(projection)
2428+
.build()
2429+
.unwrap();
2430+
2431+
// Stream should complete without error and the same results
2432+
let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
2433+
2434+
let batch = concat_batches(&batches[0].schema(), &batches).unwrap();
2435+
assert_eq!(batch.num_columns(), 1);
2436+
let expected = StringArray::from_iter_values(
2437+
std::iter::repeat_n("a", 50)
2438+
.chain(std::iter::repeat_n("c", 50))
2439+
.chain(std::iter::repeat_n("a", 50))
2440+
.chain(std::iter::repeat_n("c", 50)),
2441+
);
2442+
assert_eq!(batch.column(0).as_string(), &expected);
2443+
}
2444+
}
23112445
}

parquet/src/arrow/push_decoder/reader_builder/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,16 @@ impl RowGroupReaderBuilder {
437437
.with_parquet_metadata(&self.metadata)
438438
.build_array_reader(self.fields.as_deref(), predicate.projection())?;
439439

440+
// Prepare to evaluate the filter.
441+
// Note: first update the selection strategy to properly handle any pages
442+
// pruned during fetch
443+
plan_builder = override_selector_strategy_if_needed(
444+
plan_builder,
445+
predicate.projection(),
446+
self.row_group_offset_index(row_group_idx),
447+
);
448+
// `with_predicate` actually evaluates the filter
449+
440450
plan_builder =
441451
plan_builder.with_predicate(array_reader, filter_info.current_mut())?;
442452

0 commit comments

Comments
 (0)