Pyarrow IO property for configuring large v small types on read#986
Pyarrow IO property for configuring large v small types on read#986Fokko merged 11 commits intoapache:mainfrom
Conversation
|
Once approved/merged, I'd like to bring this up on the discussion thread to add this item to 0.7.1 patch release as well. It's a small feature, and it would help with alleviate the memory issues we are running into (I expect other users would as well) |
kevinjqliu
left a comment
There was a problem hiding this comment.
Thanks for working on this, I left a few comments!
| GCS_ENDPOINT = "gcs.endpoint" | ||
| GCS_DEFAULT_LOCATION = "gcs.default-bucket-location" | ||
| GCS_VERSION_AWARE = "gcs.version-aware" | ||
| PYARROW_USE_LARGE_TYPES_ON_READ = "pyarrow.use-large-types-on-read" |
There was a problem hiding this comment.
nit: if this is a pyarrow specific setting, lets move it to the pyarrow file
There was a problem hiding this comment.
I thought of that, but decided to leave it here because I liked having the FileIO properties together in one place. WDYT?
There was a problem hiding this comment.
Pyarrow is one of the FileIO implementations and this setting is specifically for Pyarrow. In the future, when we add more FileIO implementations, such as the rust one, it'll be good to have a clear separation between the FileIO settings.
There was a problem hiding this comment.
I think it also makes more sense to move this inside of the Arrow file.
There was a problem hiding this comment.
Thanks @Fokko and @kevinjqliu - I'll keep this in mind the next time I touch these files 🙂
|
|
||
|
|
||
| class _ConvertToSmallTypes(PyArrowSchemaVisitor[Union[pa.DataType, pa.Schema]]): | ||
| def schema(self, schema: pa.Schema, struct_result: pa.StructType) -> pa.Schema: |
There was a problem hiding this comment.
nit: looks like this is the same function definition as the one in _ConvertToLargeTypes, as with other functions here.
Perhaps abstract into a common class and extend/override specific functions.
There was a problem hiding this comment.
I thought of that, but I didn't like naming one as _ConvertToLargeTypes, and then having an arg like reverse: bool
There was a problem hiding this comment.
I was thinking something with inheritance like:
_ConvertToArrowTypes
_ConvertToLargeTypes(_ConvertToArrowTypes)
_ConvertToSmallTypes(_ConvertToArrowTypes)
tests/io/test_pyarrow_visitor.py
Outdated
|
|
||
|
|
||
| def test_pyarrow_schema_ensure_small_types(pyarrow_schema_nested_without_ids: pa.Schema) -> None: | ||
| schema_with_large_types = _pyarrow_schema_ensure_small_types(pyarrow_schema_nested_without_ids) |
There was a problem hiding this comment.
nit: name is large_type, function is small_type
what is this function testing for?
There was a problem hiding this comment.
This was for testing the roundtrip conversion - fixed it to use the correct function
sungwy
left a comment
There was a problem hiding this comment.
Thank you for the review feedback @kevinjqliu ! Adopted most of the feedback and left some comments for the others.
| GCS_ENDPOINT = "gcs.endpoint" | ||
| GCS_DEFAULT_LOCATION = "gcs.default-bucket-location" | ||
| GCS_VERSION_AWARE = "gcs.version-aware" | ||
| PYARROW_USE_LARGE_TYPES_ON_READ = "pyarrow.use-large-types-on-read" |
There was a problem hiding this comment.
I thought of that, but decided to leave it here because I liked having the FileIO properties together in one place. WDYT?
|
|
||
|
|
||
| class _ConvertToSmallTypes(PyArrowSchemaVisitor[Union[pa.DataType, pa.Schema]]): | ||
| def schema(self, schema: pa.Schema, struct_result: pa.StructType) -> pa.Schema: |
There was a problem hiding this comment.
I was thinking something with inheritance like:
_ConvertToArrowTypes
_ConvertToLargeTypes(_ConvertToArrowTypes)
_ConvertToSmallTypes(_ConvertToArrowTypes)
|
Thanks for the review @kevinjqliu ! Just updated it to make use of @ndrluis 's cleaned up function |
|
@sungwy Thanks for working on this! It seems we also need to update iceberg-python/pyiceberg/io/pyarrow.py Lines 1471 to 1478 in 846713b If we have type promotion from string to binary, the schema_to_parrow will convert BinaryType() to pa.large_binary
Example to reproduce: @pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive")])
def test_table_scan_override_with_small_types(catalog: Catalog) -> None:
identifier = "default.test_table_scan_override_with_small_types"
arrow_table = pa.Table.from_arrays(
[pa.array(["a", "b", "c"]), pa.array([b"a", b"b", b"c"]), pa.array([["a", "b"], ["c", "d"], ["e", "f"]])],
names=["string", "binary", "list"],
)
try:
catalog.drop_table(identifier)
except NoSuchTableError:
pass
tbl = catalog.create_table(
identifier,
schema=arrow_table.schema,
)
tbl.append(arrow_table)
with tbl.update_schema() as update_schema:
update_schema.update_column("string", BinaryType())
tbl.io.properties[PYARROW_USE_LARGE_TYPES_ON_READ] = "False"
result_table = tbl.scan().to_arrow()
expected_schema = pa.schema([
pa.field("string", pa.large_binary()), # should be pa.binary()
pa.field("binary", pa.binary()),
pa.field("list", pa.list_(pa.string())),
])
assert result_table.schema.equals(expected_schema)
##### result_table.schema #####
string: large_binary
binary: binary
list: list<element: string>
child 0, element: string |
| GCS_ENDPOINT = "gcs.endpoint" | ||
| GCS_DEFAULT_LOCATION = "gcs.default-bucket-location" | ||
| GCS_VERSION_AWARE = "gcs.version-aware" | ||
| PYARROW_USE_LARGE_TYPES_ON_READ = "pyarrow.use-large-types-on-read" |
There was a problem hiding this comment.
I think it also makes more sense to move this inside of the Arrow file.
| # When FsSpec is not installed | ||
| raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e | ||
|
|
||
| use_large_types = property_as_bool(io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True) |
There was a problem hiding this comment.
This is the only part I wouldn't say I like where we now force the table to use large or normal tables. When we read record batches I agree that we need to force the schema, but for the table, we have to read all the footers anyway.
Once #929 goes in, I think we still need to change that, but let's defer that question for now.
|
Does anyone know when this will be released? |
|
Hi @fusion2222 - This will be released with 0.8.0, which will be a few months away (roughly 1~3 months) |
…he#986) * upyarrow IO property for configuring large v small types on read * tests * adopt feedback * use property_as_bool * fix * docs * nits * respect flag on promotion * lint --------- Co-authored-by: Sung Yun <107272191+syun64@users.noreply.github.com>
…he#986) * upyarrow IO property for configuring large v small types on read * tests * adopt feedback * use property_as_bool * fix * docs * nits * respect flag on promotion * lint --------- Co-authored-by: Sung Yun <107272191+syun64@users.noreply.github.com>
This addresses the issue discussed in the formal proposal discussed in the Google Doc.
The current behavior to always cast to large types results in RSS memory usage explosion as is highlighted in the benchmark discussed in the documentation.