Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
120d2b0
Update to pre-release arrow
alamb Dec 16, 2025
40a6a71
Fix clippy due to UnionFields::new is deprecated
alamb Dec 16, 2025
f50b9c3
Fix real bug in test
alamb Dec 16, 2025
87563d8
Update test for fixed issue upstream
alamb Dec 16, 2025
4afd46f
Update tests for new feature
alamb Dec 18, 2025
70e2841
Update tests for struct casting
alamb Dec 18, 2025
b29b662
Update to latest
alamb Jan 7, 2026
6a2e631
Merge remote-tracking branch 'apache/main' into alamb/update_arrow_57…
alamb Jan 7, 2026
ca11966
Merge branch 'main' into alamb/update_arrow_57.2.0
alamb Jan 8, 2026
63a85eb
Update to pre-release main
alamb Jan 10, 2026
277aac4
Update expected value of make_interval
alamb Jan 10, 2026
bb29b0c
Update expected output
alamb Jan 10, 2026
35b97fa
Update to latest arrow
alamb Jan 10, 2026
d2e8da6
Update rev
Dandandan Jan 16, 2026
1002408
update outputs
Dandandan Jan 16, 2026
c062756
Merge
Dandandan Jan 16, 2026
917a464
Update rev
Dandandan Jan 20, 2026
c1d27d3
Update again
Dandandan Jan 22, 2026
1b979c6
Update rev
Dandandan Jan 23, 2026
c3df7be
Merge branch 'main' into alamb/update_arrow_58
Dandandan Jan 23, 2026
d79a8a0
Merge remote-tracking branch 'apache/main' into alamb/update_arrow_58
alamb Jan 30, 2026
21fabb5
Update to latest
alamb Jan 30, 2026
36cfd5e
Update for deprecated
alamb Jan 30, 2026
c273703
Update to object_store 0.13.0
alamb Jan 30, 2026
855b51b
Update more
alamb Jan 30, 2026
bdd19a6
More updates
alamb Jan 30, 2026
0d0cc96
updates
alamb Jan 30, 2026
ccf7a14
cleanups
alamb Jan 30, 2026
1a65bf7
remove uneeded code
alamb Jan 30, 2026
206de80
Merge remote-tracking branch 'apache/main' into alamb/update_arrow_58
alamb Jan 30, 2026
778ac16
update
alamb Jan 30, 2026
ddd778d
Remove old comments
alamb Jan 30, 2026
afb5ba3
fixup request counting
alamb Jan 30, 2026
0622248
fix delete
alamb Jan 30, 2026
e4449fd
clippy
alamb Jan 30, 2026
987f879
Upate tests and for clippy
alamb Jan 30, 2026
70a958a
more clipy
alamb Jan 30, 2026
5452d50
Update test as now utf8->float is supported
alamb Jan 30, 2026
9276a8a
fmt
alamb Jan 30, 2026
cae9716
Update to latest main
alamb Feb 3, 2026
d568137
Merge remote-tracking branch 'apache/main' into alamb/update_arrow_58
alamb Feb 3, 2026
6762cb7
Fix object store ugprade
alamb Feb 3, 2026
14a3601
Update expected error message
alamb Feb 3, 2026
2ad66f1
fix test
alamb Feb 3, 2026
16d34ad
Merge remote-tracking branch 'apache/main' into alamb/update_arrow_58
alamb Feb 3, 2026
8d7580a
Merge remote-tracking branch 'apache/main' into alamb/update_arrow_58
alamb Feb 8, 2026
70d22da
Update to latest
alamb Feb 8, 2026
e313fd7
Merge remote-tracking branch 'apache/main' into alamb/update_arrow_58
alamb Feb 11, 2026
50b7a72
Updated
alamb Feb 11, 2026
4ae0203
Update rev
Dandandan Feb 13, 2026
33710fd
Update rev
Dandandan Feb 13, 2026
7afb8f1
Fix
Dandandan Feb 13, 2026
189f6e0
Merge remote-tracking branch 'apache/main' into alamb/update_arrow_58
alamb Feb 14, 2026
1985cde
Update
alamb Feb 14, 2026
5eb5680
Merge remote-tracking branch 'apache/main' into alamb/update_arrow_58
alamb Feb 23, 2026
ffc49b2
Update to arrow 58 for real
alamb Feb 23, 2026
57ce0fa
Update for file size
alamb Feb 23, 2026
325831d
Add upgrade gudie
alamb Feb 23, 2026
bf17d4a
prettier
alamb Feb 23, 2026
dd0464e
Merge branch 'main' into alamb/update_arrow_58
alamb Feb 23, 2026
57d7884
Merge remote-tracking branch 'apache/main' into alamb/update_arrow_58
alamb Feb 25, 2026
258251a
Merge branch 'main' into alamb/update_arrow_58
alamb Feb 26, 2026
f00edb3
Merge branch 'main' into alamb/update_arrow_58
alamb Feb 26, 2026
f12d5cd
Merge branch 'main' into alamb/update_arrow_58
alamb Feb 27, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 71 additions & 79 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,19 @@ ahash = { version = "0.8", default-features = false, features = [
"runtime-rng",
] }
apache-avro = { version = "0.21", default-features = false }
arrow = { version = "57.3.0", features = [
arrow = { version = "58.0.0", features = [
"prettyprint",
"chrono-tz",
] }
arrow-buffer = { version = "57.2.0", default-features = false }
arrow-flight = { version = "57.3.0", features = [
arrow-buffer = { version = "58.0.0", default-features = false }
arrow-flight = { version = "58.0.0", features = [
"flight-sql-experimental",
] }
arrow-ipc = { version = "57.2.0", default-features = false, features = [
arrow-ipc = { version = "58.0.0", default-features = false, features = [
"lz4",
] }
arrow-ord = { version = "57.2.0", default-features = false }
arrow-schema = { version = "57.2.0", default-features = false }
arrow-ord = { version = "58.0.0", default-features = false }
arrow-schema = { version = "58.0.0", default-features = false }
async-trait = "0.1.89"
bigdecimal = "0.4.8"
bytes = "1.11"
Expand Down Expand Up @@ -165,9 +165,9 @@ liblzma = { version = "0.4.6", features = ["static"] }
log = "^0.4"
memchr = "2.8.0"
num-traits = { version = "0.2" }
object_store = { version = "0.12.5", default-features = false }
object_store = { version = "0.13.1", default-features = false }
parking_lot = "0.12"
parquet = { version = "57.3.0", default-features = false, features = [
parquet = { version = "58.0.0", default-features = false, features = [
"arrow",
"async",
"object_store",
Expand Down
10 changes: 5 additions & 5 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ mod tests {
use datafusion::common::plan_err;

use datafusion::prelude::SessionContext;
use datafusion_common::assert_contains;
use url::Url;

async fn create_external_table_test(location: &str, sql: &str) -> Result<()> {
Expand Down Expand Up @@ -714,25 +715,24 @@ mod tests {
let err = create_external_table_test(location, &sql)
.await
.unwrap_err();
assert!(err.to_string().contains("os error 2"));
assert_contains!(err.to_string(), "os error 2");

// for service_account_key
let sql = format!(
"CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('gcp.service_account_key' '{service_account_key}') LOCATION '{location}'"
);
let err = create_external_table_test(location, &sql)
.await
.unwrap_err()
.to_string();
assert!(err.contains("No RSA key found in pem file"), "{err}");
.unwrap_err();
assert_contains!(err.to_string(), "Error reading pem file: no items found");

// for application_credentials_path
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET
OPTIONS('gcp.application_credentials_path' '{application_credentials_path}') LOCATION '{location}'");
let err = create_external_table_test(location, &sql)
.await
.unwrap_err();
assert!(err.to_string().contains("os error 2"));
assert_contains!(err.to_string(), "os error 2");

Ok(())
}
Expand Down
12 changes: 6 additions & 6 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,8 +617,8 @@ mod tests {
| filename | file_size_bytes | metadata_size_bytes | hits | extra |
+-----------------------------------+-----------------+---------------------+------+------------------+
| alltypes_plain.parquet | 1851 | 8882 | 2 | page_index=false |
| alltypes_tiny_pages.parquet | 454233 | 269266 | 2 | page_index=true |
| lz4_raw_compressed_larger.parquet | 380836 | 1347 | 2 | page_index=false |
| alltypes_tiny_pages.parquet | 454233 | 269074 | 2 | page_index=true |
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this reduction in metadata size is a direct consequence of @WaterWhisperer's PR to improve PageEncoding representation

| lz4_raw_compressed_larger.parquet | 380836 | 1339 | 2 | page_index=false |
+-----------------------------------+-----------------+---------------------+------+------------------+
");

Expand Down Expand Up @@ -648,8 +648,8 @@ mod tests {
| filename | file_size_bytes | metadata_size_bytes | hits | extra |
+-----------------------------------+-----------------+---------------------+------+------------------+
| alltypes_plain.parquet | 1851 | 8882 | 5 | page_index=false |
| alltypes_tiny_pages.parquet | 454233 | 269266 | 2 | page_index=true |
| lz4_raw_compressed_larger.parquet | 380836 | 1347 | 3 | page_index=false |
| alltypes_tiny_pages.parquet | 454233 | 269074 | 2 | page_index=true |
| lz4_raw_compressed_larger.parquet | 380836 | 1339 | 3 | page_index=false |
+-----------------------------------+-----------------+---------------------+------+------------------+
");

Expand Down Expand Up @@ -841,8 +841,8 @@ mod tests {
+---------------------+-----------+-----------------+------+
| metadata_size_bytes | filename | file_size_bytes | etag |
+---------------------+-----------+-----------------+------+
| 212 | 0.parquet | 3645 | 0 |
| 212 | 1.parquet | 3645 | 1 |
| 212 | 0.parquet | 3642 | 0 |
| 212 | 1.parquet | 3642 | 1 |
+---------------------+-----------+-----------------+------+
");

Expand Down
120 changes: 59 additions & 61 deletions datafusion-cli/src/object_storage/instrumented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ use datafusion::{
execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
};
use futures::stream::{BoxStream, Stream};
use futures::{StreamExt, TryStreamExt};
use object_store::{
GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta,
ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
path::Path,
CopyOptions, GetOptions, GetRange, GetResult, ListResult, MultipartUpload,
ObjectMeta, ObjectStore, ObjectStoreExt, PutMultipartOptions, PutOptions, PutPayload,
PutResult, Result, path::Path,
};
use parking_lot::{Mutex, RwLock};
use url::Url;
Expand Down Expand Up @@ -230,40 +231,57 @@ impl InstrumentedObjectStore {
let timestamp = Utc::now();
let range = options.range.clone();

let head = options.head;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A substantial amount of the changes in this PR are due to the upgrade to object_store 0.13 where several of the trait methods are consolidated (e.g. get, get_opts, head, etc) have been consolidated.

You can see the upgrade guide here: https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html#upgrade-guide-for-0130

let start = Instant::now();
let ret = self.inner.get_opts(location, options).await?;
let elapsed = start.elapsed();

let (op, size) = if head {
(Operation::Head, None)
} else {
(
Operation::Get,
Some((ret.range.end - ret.range.start) as usize),
)
};

self.requests.lock().push(RequestDetails {
op: Operation::Get,
op,
path: location.clone(),
timestamp,
duration: Some(elapsed),
size: Some((ret.range.end - ret.range.start) as usize),
size,
range,
extra_display: None,
});

Ok(ret)
}

async fn instrumented_delete(&self, location: &Path) -> Result<()> {
fn instrumented_delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
) -> BoxStream<'static, Result<Path>> {
let requests_captured = Arc::clone(&self.requests);

let timestamp = Utc::now();
let start = Instant::now();
self.inner.delete(location).await?;
let elapsed = start.elapsed();

self.requests.lock().push(RequestDetails {
op: Operation::Delete,
path: location.clone(),
timestamp,
duration: Some(elapsed),
size: None,
range: None,
extra_display: None,
});

Ok(())
self.inner
.delete_stream(locations)
.and_then(move |location| {
let elapsed = start.elapsed();
requests_captured.lock().push(RequestDetails {
op: Operation::Delete,
path: location.clone(),
timestamp,
duration: Some(elapsed),
size: None,
range: None,
extra_display: None,
});
futures::future::ok(location)
})
.boxed()
}

fn instrumented_list(
Expand Down Expand Up @@ -361,25 +379,6 @@ impl InstrumentedObjectStore {

Ok(())
}

async fn instrumented_head(&self, location: &Path) -> Result<ObjectMeta> {
let timestamp = Utc::now();
let start = Instant::now();
let ret = self.inner.head(location).await?;
let elapsed = start.elapsed();

self.requests.lock().push(RequestDetails {
op: Operation::Head,
path: location.clone(),
timestamp,
duration: Some(elapsed),
size: None,
range: None,
extra_display: None,
});

Ok(ret)
}
}

impl fmt::Display for InstrumentedObjectStore {
Expand Down Expand Up @@ -429,12 +428,15 @@ impl ObjectStore for InstrumentedObjectStore {
self.inner.get_opts(location, options).await
}

async fn delete(&self, location: &Path) -> Result<()> {
fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
) -> BoxStream<'static, Result<Path>> {
if self.enabled() {
return self.instrumented_delete(location).await;
return self.instrumented_delete_stream(locations);
}

self.inner.delete(location).await
self.inner.delete_stream(locations)
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
Expand All @@ -453,28 +455,24 @@ impl ObjectStore for InstrumentedObjectStore {
self.inner.list_with_delimiter(prefix).await
}

async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy and copy_if_not_exists were consolidated

if self.enabled() {
return self.instrumented_copy(from, to).await;
}

self.inner.copy(from, to).await
}

async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
if self.enabled() {
return self.instrumented_copy_if_not_exists(from, to).await;
}

self.inner.copy_if_not_exists(from, to).await
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
async fn copy_opts(
&self,
from: &Path,
to: &Path,
options: CopyOptions,
) -> Result<()> {
if self.enabled() {
return self.instrumented_head(location).await;
return match options.mode {
object_store::CopyMode::Create => {
self.instrumented_copy_if_not_exists(from, to).await
}
object_store::CopyMode::Overwrite => {
self.instrumented_copy(from, to).await
}
};
}

self.inner.head(location).await
self.inner.copy_opts(from, to, options).await
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ use datafusion_proto::protobuf::{
};
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::{ObjectStore, PutPayload};
use object_store::{ObjectStore, ObjectStoreExt, PutPayload};
use serde::{Deserialize, Serialize};

/// Example showing how to preserve custom adapter information during plan serialization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use datafusion::{
use datafusion::datasource::physical_plan::FileScanConfigBuilder;
use datafusion_examples::utils::datasets::ExampleDataset;
use futures::StreamExt;
use object_store::{ObjectStore, local::LocalFileSystem, memory::InMemory};
use object_store::{ObjectStoreExt, local::LocalFileSystem, memory::InMemory};

/// This example demonstrates using the low level [`FileStream`] / [`FileOpener`] APIs to directly
/// read data from (CSV/JSON) into Arrow RecordBatches.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use datafusion_physical_expr_adapter::{
};
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::{ObjectStore, PutPayload};
use object_store::{ObjectStore, ObjectStoreExt, PutPayload};

// Example showing how to implement custom casting rules to adapt file schemas.
// This example enforces that casts must be strictly widening: if the file type is Int64 and the table type is Int32, it will error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use datafusion_physical_expr_adapter::{
use futures::StreamExt;
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::{ObjectStore, PutPayload};
use object_store::{ObjectStore, ObjectStoreExt, PutPayload};

// Metadata key for storing default values in field metadata
const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
Expand Down Expand Up @@ -79,7 +79,7 @@ pub async fn default_column_values() -> Result<()> {
let mut buf = vec![];

let props = WriterProperties::builder()
.set_max_row_group_size(2)
.set_max_row_group_row_count(Some(2))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.build();

let mut writer =
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/examples/data_io/json_shredding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use datafusion_physical_expr_adapter::{
};
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::{ObjectStore, PutPayload};
use object_store::{ObjectStoreExt, PutPayload};

// Example showing how to implement custom filter rewriting for JSON shredding.
//
Expand Down Expand Up @@ -76,7 +76,7 @@ pub async fn json_shredding() -> Result<()> {
let mut buf = vec![];

let props = WriterProperties::builder()
.set_max_row_group_size(2)
.set_max_row_group_row_count(Some(2))
.build();

let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), Some(props))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use datafusion::parquet::arrow::arrow_reader::{
ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector,
};
use datafusion::parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use datafusion::parquet::file::metadata::ParquetMetaData;
use datafusion::parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
use datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties};
use datafusion::parquet::schema::types::ColumnPath;
use datafusion::physical_expr::PhysicalExpr;
Expand Down Expand Up @@ -410,7 +410,7 @@ impl IndexedFile {
let options = ArrowReaderOptions::new()
// Load the page index when reading metadata to cache
// so it is available to interpret row selections
.with_page_index(true);
.with_page_index_policy(PageIndexPolicy::Required);
let reader =
ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)?;
let metadata = reader.metadata().clone();
Expand Down Expand Up @@ -567,7 +567,7 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {
.object_meta
.location
.parts()
.last()
.next_back()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clippy told me to do this -- I am not sure why it doesn't do so on main

.expect("No path in location")
.as_ref()
.to_string();
Expand Down Expand Up @@ -659,7 +659,7 @@ fn make_demo_file(path: impl AsRef<Path>, value_range: Range<i32>) -> Result<()>
// enable page statistics for the tag column,
// for everything else.
let props = WriterProperties::builder()
.set_max_row_group_size(100)
.set_max_row_group_row_count(Some(100))
// compute column chunk (per row group) statistics by default
.set_statistics_enabled(EnabledStatistics::Chunk)
// compute column page statistics for the tag column
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/file_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ mod tests {
.build();

// Verify the expected options propagated down to parquet crate WriterProperties struct
assert_eq!(properties.max_row_group_size(), 123);
assert_eq!(properties.max_row_group_row_count(), Some(123));
assert_eq!(properties.data_page_size_limit(), 123);
assert_eq!(properties.write_batch_size(), 123);
assert_eq!(properties.writer_version(), WriterVersion::PARQUET_2_0);
Expand Down
Loading