diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 99223f1253..7cdc5b120b 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2241,29 +2241,36 @@ def _partition_value(self, partition_field: PartitionField, schema: Schema) -> A if partition_field.source_id not in self.column_aggregates: return None - if not partition_field.transform.preserves_order: + source_field = schema.find_field(partition_field.source_id) + iceberg_transform = partition_field.transform + + if not iceberg_transform.preserves_order: raise ValueError( f"Cannot infer partition value from parquet metadata for a non-linear Partition Field: {partition_field.name} with transform {partition_field.transform}" ) - lower_value = partition_record_value( - partition_field=partition_field, - value=self.column_aggregates[partition_field.source_id].current_min, - schema=schema, + transform_func = iceberg_transform.transform(source_field.field_type) + + lower_value = transform_func( + partition_record_value( + partition_field=partition_field, + value=self.column_aggregates[partition_field.source_id].current_min, + schema=schema, + ) ) - upper_value = partition_record_value( - partition_field=partition_field, - value=self.column_aggregates[partition_field.source_id].current_max, - schema=schema, + upper_value = transform_func( + partition_record_value( + partition_field=partition_field, + value=self.column_aggregates[partition_field.source_id].current_max, + schema=schema, + ) ) if lower_value != upper_value: raise ValueError( f"Cannot infer partition value from parquet metadata as there are more than one partition values for Partition Field: {partition_field.name}. {lower_value=}, {upper_value=}" ) - source_field = schema.find_field(partition_field.source_id) - transform = partition_field.transform.transform(source_field.field_type) - return transform(lower_value) + return lower_value def partition(self, partition_spec: PartitionSpec, schema: Schema) -> Record: return Record(**{field.name: self._partition_value(field, schema) for field in partition_spec.fields}) diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 2c6eb4b4ab..3d36ffcf31 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -33,13 +33,13 @@ from pyiceberg.catalog import Catalog from pyiceberg.exceptions import NoSuchTableError from pyiceberg.io import FileIO -from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException +from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, schema_to_pyarrow from pyiceberg.manifest import DataFile from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import Table from pyiceberg.table.metadata import TableMetadata -from pyiceberg.transforms import BucketTransform, IdentityTransform, MonthTransform +from pyiceberg.transforms import BucketTransform, HourTransform, IdentityTransform, MonthTransform from pyiceberg.types import ( BooleanType, DateType, @@ -47,6 +47,7 @@ LongType, NestedField, StringType, + TimestampType, TimestamptzType, ) @@ -898,3 +899,30 @@ def test_add_files_that_referenced_by_current_snapshot_with_check_duplicate_file with pytest.raises(ValueError) as exc_info: tbl.add_files(file_paths=[existing_files_in_table], check_duplicate_files=True) assert f"Cannot add files that are already referenced by table, files: {existing_files_in_table}" in str(exc_info.value) + + +@pytest.mark.integration +def test_add_files_hour_transform(session_catalog: Catalog) -> None: + identifier = "default.test_add_files_hour_transform" + + schema = Schema(NestedField(1, "hourly", TimestampType())) + schema_arrow = schema_to_pyarrow(schema, include_field_ids=False) + spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=HourTransform(), name="spec_hour")) + + tbl = _create_table(session_catalog, identifier, format_version=1, schema=schema, partition_spec=spec) + + file_path = "s3://warehouse/default/test_add_files_hour_transform/test.parquet" + + from pyiceberg.utils.datetime import micros_to_timestamp + + arrow_table = pa.Table.from_pylist( + [{"hourly": micros_to_timestamp(1743465600155254)}, {"hourly": micros_to_timestamp(1743469198047855)}], + schema=schema_arrow, + ) + + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=schema_arrow) as writer: + writer.write_table(arrow_table) + + tbl.add_files(file_paths=[file_path])