-
Notifications
You must be signed in to change notification settings - Fork 2k
Upgrade DataFusion to arrow-rs/parquet 58.0.0 / object_store 0.13.0
#19728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
120d2b0
40a6a71
f50b9c3
87563d8
4afd46f
70e2841
b29b662
6a2e631
ca11966
63a85eb
277aac4
bb29b0c
35b97fa
d2e8da6
1002408
c062756
917a464
c1d27d3
1b979c6
c3df7be
d79a8a0
21fabb5
36cfd5e
c273703
855b51b
bdd19a6
0d0cc96
ccf7a14
1a65bf7
206de80
778ac16
ddd778d
afb5ba3
0622248
e4449fd
987f879
70a958a
5452d50
9276a8a
cae9716
d568137
6762cb7
14a3601
2ad66f1
16d34ad
8d7580a
70d22da
e313fd7
50b7a72
4ae0203
33710fd
7afb8f1
189f6e0
1985cde
5eb5680
ffc49b2
57ce0fa
325831d
bf17d4a
dd0464e
57d7884
258251a
f00edb3
f12d5cd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,10 +36,11 @@ use datafusion::{ | |
| execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry}, | ||
| }; | ||
| use futures::stream::{BoxStream, Stream}; | ||
| use futures::{StreamExt, TryStreamExt}; | ||
| use object_store::{ | ||
| GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta, | ||
| ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, | ||
| path::Path, | ||
| CopyOptions, GetOptions, GetRange, GetResult, ListResult, MultipartUpload, | ||
| ObjectMeta, ObjectStore, ObjectStoreExt, PutMultipartOptions, PutOptions, PutPayload, | ||
| PutResult, Result, path::Path, | ||
| }; | ||
| use parking_lot::{Mutex, RwLock}; | ||
| use url::Url; | ||
|
|
@@ -230,40 +231,57 @@ impl InstrumentedObjectStore { | |
| let timestamp = Utc::now(); | ||
| let range = options.range.clone(); | ||
|
|
||
| let head = options.head; | ||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A substantial amount of the changes in this PR are due to the upgrade to object_store 0.13 where several of the trait methods are consolidated (e.g. You can see the upgrade guide here: https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html#upgrade-guide-for-0130 |
||
| let start = Instant::now(); | ||
| let ret = self.inner.get_opts(location, options).await?; | ||
| let elapsed = start.elapsed(); | ||
|
|
||
| let (op, size) = if head { | ||
| (Operation::Head, None) | ||
| } else { | ||
| ( | ||
| Operation::Get, | ||
| Some((ret.range.end - ret.range.start) as usize), | ||
| ) | ||
| }; | ||
|
|
||
| self.requests.lock().push(RequestDetails { | ||
| op: Operation::Get, | ||
| op, | ||
| path: location.clone(), | ||
| timestamp, | ||
| duration: Some(elapsed), | ||
| size: Some((ret.range.end - ret.range.start) as usize), | ||
| size, | ||
| range, | ||
| extra_display: None, | ||
| }); | ||
|
|
||
| Ok(ret) | ||
| } | ||
|
|
||
| async fn instrumented_delete(&self, location: &Path) -> Result<()> { | ||
| fn instrumented_delete_stream( | ||
| &self, | ||
| locations: BoxStream<'static, Result<Path>>, | ||
| ) -> BoxStream<'static, Result<Path>> { | ||
| let requests_captured = Arc::clone(&self.requests); | ||
|
|
||
| let timestamp = Utc::now(); | ||
| let start = Instant::now(); | ||
| self.inner.delete(location).await?; | ||
| let elapsed = start.elapsed(); | ||
|
|
||
| self.requests.lock().push(RequestDetails { | ||
| op: Operation::Delete, | ||
| path: location.clone(), | ||
| timestamp, | ||
| duration: Some(elapsed), | ||
| size: None, | ||
| range: None, | ||
| extra_display: None, | ||
| }); | ||
|
|
||
| Ok(()) | ||
| self.inner | ||
| .delete_stream(locations) | ||
| .and_then(move |location| { | ||
| let elapsed = start.elapsed(); | ||
| requests_captured.lock().push(RequestDetails { | ||
| op: Operation::Delete, | ||
| path: location.clone(), | ||
| timestamp, | ||
| duration: Some(elapsed), | ||
| size: None, | ||
| range: None, | ||
| extra_display: None, | ||
| }); | ||
| futures::future::ok(location) | ||
| }) | ||
| .boxed() | ||
| } | ||
|
|
||
| fn instrumented_list( | ||
|
|
@@ -361,25 +379,6 @@ impl InstrumentedObjectStore { | |
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| async fn instrumented_head(&self, location: &Path) -> Result<ObjectMeta> { | ||
| let timestamp = Utc::now(); | ||
| let start = Instant::now(); | ||
| let ret = self.inner.head(location).await?; | ||
| let elapsed = start.elapsed(); | ||
|
|
||
| self.requests.lock().push(RequestDetails { | ||
| op: Operation::Head, | ||
| path: location.clone(), | ||
| timestamp, | ||
| duration: Some(elapsed), | ||
| size: None, | ||
| range: None, | ||
| extra_display: None, | ||
| }); | ||
|
|
||
| Ok(ret) | ||
| } | ||
| } | ||
|
|
||
| impl fmt::Display for InstrumentedObjectStore { | ||
|
|
@@ -429,12 +428,15 @@ impl ObjectStore for InstrumentedObjectStore { | |
| self.inner.get_opts(location, options).await | ||
| } | ||
|
|
||
| async fn delete(&self, location: &Path) -> Result<()> { | ||
| fn delete_stream( | ||
| &self, | ||
| locations: BoxStream<'static, Result<Path>>, | ||
| ) -> BoxStream<'static, Result<Path>> { | ||
| if self.enabled() { | ||
| return self.instrumented_delete(location).await; | ||
| return self.instrumented_delete_stream(locations); | ||
| } | ||
|
|
||
| self.inner.delete(location).await | ||
| self.inner.delete_stream(locations) | ||
| } | ||
|
|
||
| fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> { | ||
|
|
@@ -453,28 +455,24 @@ impl ObjectStore for InstrumentedObjectStore { | |
| self.inner.list_with_delimiter(prefix).await | ||
| } | ||
|
|
||
| async fn copy(&self, from: &Path, to: &Path) -> Result<()> { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. copy and copy_if_not_exists were consolidated |
||
| if self.enabled() { | ||
| return self.instrumented_copy(from, to).await; | ||
| } | ||
|
|
||
| self.inner.copy(from, to).await | ||
| } | ||
|
|
||
| async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { | ||
| if self.enabled() { | ||
| return self.instrumented_copy_if_not_exists(from, to).await; | ||
| } | ||
|
|
||
| self.inner.copy_if_not_exists(from, to).await | ||
| } | ||
|
|
||
| async fn head(&self, location: &Path) -> Result<ObjectMeta> { | ||
| async fn copy_opts( | ||
| &self, | ||
| from: &Path, | ||
| to: &Path, | ||
| options: CopyOptions, | ||
| ) -> Result<()> { | ||
| if self.enabled() { | ||
| return self.instrumented_head(location).await; | ||
| return match options.mode { | ||
| object_store::CopyMode::Create => { | ||
| self.instrumented_copy_if_not_exists(from, to).await | ||
| } | ||
| object_store::CopyMode::Overwrite => { | ||
| self.instrumented_copy(from, to).await | ||
| } | ||
| }; | ||
| } | ||
|
|
||
| self.inner.head(location).await | ||
| self.inner.copy_opts(from, to, options).await | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,7 +48,7 @@ use datafusion_physical_expr_adapter::{ | |
| use futures::StreamExt; | ||
| use object_store::memory::InMemory; | ||
| use object_store::path::Path; | ||
| use object_store::{ObjectStore, PutPayload}; | ||
| use object_store::{ObjectStore, ObjectStoreExt, PutPayload}; | ||
|
|
||
| // Metadata key for storing default values in field metadata | ||
| const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value"; | ||
|
|
@@ -79,7 +79,7 @@ pub async fn default_column_values() -> Result<()> { | |
| let mut buf = vec![]; | ||
|
|
||
| let props = WriterProperties::builder() | ||
| .set_max_row_group_size(2) | ||
| .set_max_row_group_row_count(Some(2)) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This configuration was renamed in |
||
| .build(); | ||
|
|
||
| let mut writer = | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,7 +43,7 @@ use datafusion::parquet::arrow::arrow_reader::{ | |
| ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector, | ||
| }; | ||
| use datafusion::parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; | ||
| use datafusion::parquet::file::metadata::ParquetMetaData; | ||
| use datafusion::parquet::file::metadata::{PageIndexPolicy, ParquetMetaData}; | ||
| use datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties}; | ||
| use datafusion::parquet::schema::types::ColumnPath; | ||
| use datafusion::physical_expr::PhysicalExpr; | ||
|
|
@@ -410,7 +410,7 @@ impl IndexedFile { | |
| let options = ArrowReaderOptions::new() | ||
| // Load the page index when reading metadata to cache | ||
| // so it is available to interpret row selections | ||
| .with_page_index(true); | ||
| .with_page_index_policy(PageIndexPolicy::Required); | ||
| let reader = | ||
| ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)?; | ||
| let metadata = reader.metadata().clone(); | ||
|
|
@@ -567,7 +567,7 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { | |
| .object_meta | ||
| .location | ||
| .parts() | ||
| .last() | ||
| .next_back() | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. clippy told me to do this -- I am not sure why it doesn't do so on main |
||
| .expect("No path in location") | ||
| .as_ref() | ||
| .to_string(); | ||
|
|
@@ -659,7 +659,7 @@ fn make_demo_file(path: impl AsRef<Path>, value_range: Range<i32>) -> Result<()> | |
| // enable page statistics for the tag column, | ||
| // for everything else. | ||
| let props = WriterProperties::builder() | ||
| .set_max_row_group_size(100) | ||
| .set_max_row_group_row_count(Some(100)) | ||
| // compute column chunk (per row group) statistics by default | ||
| .set_statistics_enabled(EnabledStatistics::Chunk) | ||
| // compute column page statistics for the tag column | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this reduction in metadata size is a direct consequence of @WaterWhisperer's PR to improve PageEncoding representation
PageEncodingStatsto bitmask arrow-rs#9051