diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 34e9d2c53b..5188152a6f 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -3135,16 +3135,22 @@ def _write_delete_manifest() -> List[ManifestFile]: # Check if we need to mark the files as deleted deleted_entries = self._deleted_entries() if len(deleted_entries) > 0: - with write_manifest( - format_version=self._transaction.table_metadata.format_version, - spec=self._transaction.table_metadata.spec(), - schema=self._transaction.table_metadata.schema(), - output_file=self.new_manifest_output(), - snapshot_id=self._snapshot_id, - ) as writer: - for delete_entry in deleted_entries: - writer.add_entry(delete_entry) - return [writer.to_manifest_file()] + deleted_manifests = [] + partition_groups: Dict[int, List[ManifestEntry]] = defaultdict(list) + for deleted_entry in deleted_entries: + partition_groups[deleted_entry.data_file.spec_id].append(deleted_entry) + for spec_id, entries in partition_groups.items(): + with write_manifest( + format_version=self._transaction.table_metadata.format_version, + spec=self._transaction.table_metadata.specs()[spec_id], + schema=self._transaction.table_metadata.schema(), + output_file=self.new_manifest_output(), + snapshot_id=self._snapshot_id, + ) as writer: + for entry in entries: + writer.add_entry(entry) + deleted_manifests.append(writer.to_manifest_file()) + return deleted_manifests else: return [] diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index 49de265ac8..4bddf09bd9 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. # pylint:disable=redefined-outer-name +from datetime import datetime from typing import List import pyarrow as pa @@ -25,9 +26,11 @@ from pyiceberg.exceptions import NoSuchTableError from pyiceberg.expressions import AlwaysTrue, EqualTo from pyiceberg.manifest import ManifestEntryStatus +from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table.snapshots import Operation, Summary -from pyiceberg.types import FloatType, IntegerType, NestedField +from pyiceberg.transforms import IdentityTransform +from pyiceberg.types import FloatType, IntegerType, LongType, NestedField, TimestampType def run_spark_commands(spark: SparkSession, sqls: List[str]) -> None: @@ -556,3 +559,89 @@ def test_delete_overwrite_table_with_nan(session_catalog: RestCatalog) -> None: assert 2.0 in result assert 3.0 in result assert 4.0 in result + + +@pytest.mark.integration +def test_delete_after_partition_evolution_from_unpartitioned(session_catalog: RestCatalog) -> None: + identifier = "default.test_delete_after_partition_evolution_from_unpartitioned" + + arrow_table = pa.Table.from_arrays( + [ + pa.array([2, 3, 4, 5, 6]), + ], + names=["idx"], + ) + + try: + session_catalog.drop_table(identifier) + except NoSuchTableError: + pass + + tbl = session_catalog.create_table( + identifier, + schema=Schema( + NestedField(1, "idx", LongType()), + ), + ) + + tbl.append(arrow_table) + + with tbl.transaction() as tx: + with tx.update_schema() as schema: + schema.rename_column("idx", "id") + with tx.update_spec() as spec: + spec.add_field("id", IdentityTransform()) + + # Append one more time to create data files with two partition specs + tbl.append(arrow_table.rename_columns(["id"])) + + tbl.delete("id == 4") + + # Expect 8 records: 10 records - 2 + assert len(tbl.scan().to_arrow()) == 8 + + +@pytest.mark.integration +def test_delete_after_partition_evolution_from_partitioned(session_catalog: RestCatalog) -> None: + identifier = "default.test_delete_after_partition_evolution_from_partitioned" + + arrow_table = pa.Table.from_arrays( + [ + pa.array([2, 3, 4, 5, 6]), + pa.array([ + datetime(2021, 5, 19), + datetime(2022, 7, 25), + datetime(2023, 3, 22), + datetime(2024, 7, 17), + datetime(2025, 2, 22), + ]), + ], + names=["idx", "ts"], + ) + + try: + session_catalog.drop_table(identifier) + except NoSuchTableError: + pass + + tbl = session_catalog.create_table( + identifier, + schema=Schema(NestedField(1, "idx", LongType()), NestedField(2, "ts", TimestampType())), + partition_spec=PartitionSpec(PartitionField(source_id=2, field_id=1000, transform=IdentityTransform(), name="ts")), + ) + + tbl.append(arrow_table) + + with tbl.transaction() as tx: + with tx.update_schema() as schema: + schema.rename_column("idx", "id") + with tx.update_spec() as spec: + spec.add_field("id", IdentityTransform()) + + # Append one more time to create data files with two partition specs + tbl.append(arrow_table.rename_columns(["id", "ts"])) + + tbl.delete("id == 4") + + # Expect 8 records: 10 records - 2 + assert len(tbl.scan().to_arrow()) == 8