Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, _manifests
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.utils.deprecated import deprecation_message

if TYPE_CHECKING:
from pyiceberg.table.metadata import TableMetadata
Expand Down Expand Up @@ -356,6 +357,11 @@ def update_snapshot_summaries(
raise ValueError(f"Operation not implemented: {summary.operation}")

if truncate_full_table and summary.operation == Operation.OVERWRITE and previous_summary is not None:
deprecation_message(
deprecated_in="0.10.0",
removed_in="0.11.0",
help_message="The truncate-full-table should be used.",
)
summary = _truncate_table_summary(summary, previous_summary)

if not previous_summary:
Expand Down
1 change: 0 additions & 1 deletion pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
return update_snapshot_summaries(
summary=Summary(operation=self._operation, **ssc.build(), **snapshot_properties),
previous_summary=previous_snapshot.summary if previous_snapshot is not None else None,
truncate_full_table=self._operation == Operation.OVERWRITE,
)

def _commit(self) -> UpdatesAndRequirements:
Expand Down
94 changes: 94 additions & 0 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,100 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi
}


@pytest.mark.integration
def test_summaries_partial_overwrite(spark: SparkSession, session_catalog: Catalog) -> None:
identifier = "default.test_summaries_partial_overwrite"
TEST_DATA = {
"id": [1, 2, 3, 1, 1],
"name": ["AB", "CD", "EF", "CD", "EF"],
}
pa_schema = pa.schema(
[
pa.field("id", pa.int32(), pa.int32()),
pa.field("name", pa.int32(), pa.string()),
]
)
arrow_table = pa.Table.from_pydict(TEST_DATA, schema=pa_schema)
tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, schema=pa_schema)
with tbl.update_spec() as txn:
txn.add_identity("id")
tbl.append(arrow_table)

# TODO: We might want to check why this ends up in 3 files
assert len(tbl.inspect.data_files()) == 3

tbl.delete(delete_filter="id == 1 and name = 'AB'") # partial overwrite data from 1 data file

rows = spark.sql(
f"""
SELECT operation, summary
FROM {identifier}.snapshots
ORDER BY committed_at ASC
"""
).collect()

operations = [row.operation for row in rows]
assert operations == ["append", "overwrite"]

summaries = [row.summary for row in rows]

file_size = int(summaries[0]["added-files-size"])
assert file_size > 0

# APPEND
assert summaries[0] == {
"added-data-files": "3",
"added-files-size": "2848",
"added-records": "5",
"changed-partition-count": "3",
"total-data-files": "3",
"total-delete-files": "0",
"total-equality-deletes": "0",
"total-files-size": "2848",
"total-position-deletes": "0",
"total-records": "5",
}
# Java produces:
# {
# "added-data-files": "1",
# "added-files-size": "707",
# "added-records": "2",
# "app-id": "local-1743678304626",
# "changed-partition-count": "1",
# "deleted-data-files": "1",
# "deleted-records": "3",
# "engine-name": "spark",
# "engine-version": "3.5.5",
# "iceberg-version": "Apache Iceberg 1.8.1 (commit 9ce0fcf0af7becf25ad9fc996c3bad2afdcfd33d)",
# "removed-files-size": "693",
# "spark.app.id": "local-1743678304626",
# "total-data-files": "3",
# "total-delete-files": "0",
# "total-equality-deletes": "0",
# "total-files-size": "1993",
# "total-position-deletes": "0",
# "total-records": "4"
# }
files = tbl.inspect.data_files()
assert len(files) == 3
assert summaries[1] == {
"added-data-files": "1",
"added-files-size": "859",
"added-records": "2",
"changed-partition-count": "1",
"deleted-data-files": "1",
"deleted-records": "3",
"removed-files-size": "950",
"total-data-files": "3",
"total-delete-files": "0",
"total-equality-deletes": "0",
"total-files-size": "2757",
"total-position-deletes": "0",
"total-records": "4",
}
assert len(tbl.scan().to_pandas()) == 4


@pytest.mark.integration
def test_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
identifier = "default.arrow_data_files"
Expand Down
2 changes: 0 additions & 2 deletions tests/table/test_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,6 @@ def test_merge_snapshot_summaries_overwrite_summary() -> None:
"total-position-deletes": "1",
"total-records": "1",
},
truncate_full_table=True,
)

expected = {
Expand Down Expand Up @@ -337,7 +336,6 @@ def test_invalid_type() -> None:
},
),
previous_summary={"total-data-files": "abc"}, # should be a number
truncate_full_table=True,
)

assert "Could not parse summary property total-data-files to an int: abc" in str(e.value)
Loading