diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 7978fdc9b4..3408d5be62 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -261,6 +261,8 @@ schema = Schema( tbl = catalog.create_table("default.cities", schema=schema) ``` +### Append and Overwrite + Now write the data to the table: @@ -333,6 +335,49 @@ df = pa.Table.from_pylist( table.append(df) ``` +### Write Parquet Files + +PyIceberg provides a low-level API to write Parquet files in Iceberg-compatible format without committing them to the table metadata. This is useful when you need more control over the commit process: + +```python +file_paths = tbl.write_parquet(df) +``` + +The `write_parquet()` method takes a PyArrow table and writes it to Parquet files following the table's schema and partitioning, returning the paths of the written files: + +```python +import pyarrow as pa + +df = pa.Table.from_pylist( + [ + {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, + {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, + {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, + {"city": "Paris", "lat": 48.864716, "long": 2.349014}, + ], +) + +# Write files but don't commit them +file_paths = tbl.write_parquet(df) +print(file_paths) +# ['s3a://warehouse/default/cities/data/00000-0-8e056d57-7ffa-4c22-9f99-52a0e5ea4b19.parquet'] + +# Files written but not committed - won't appear in queries until committed +``` + +To make these files visible when querying the table, you need to commit them using the [`add_files`](#add-files) API: + + +```python +# Commit the files to the table metadata +tbl.add_files(file_paths=file_paths) + +# Now the data is visible when querying the table +``` + +### Delete + + You can delete some of the data from the table by calling `tbl.delete()` with a desired `delete_filter`. ```python @@ -1055,6 +1100,19 @@ tbl.add_files(file_paths=file_paths) # A new snapshot is committed to the table with manifests pointing to the existing parquet files ``` +The `write_parquet()` method provides an easy way to write files in Iceberg-compatible format that can then be committed using `add_files`: + +```python +# Write data to parquet files without committing +file_paths = tbl.write_parquet(df) + +# Commit the files to make them visible in queries +tbl.add_files(file_paths=file_paths) +``` + +This is very useful for detaching the commit process when ingesting data into an Iceberg table with high concurrency, such as using serverless functions. By separating the write and commit phases, you can implement a queue or orchestration system to handle the concurrency lock only during the commit process, which is typically much faster than the data writing phase. + + !!! note "Name Mapping" diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index f7e3c7c082..1fa4848484 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2205,24 +2205,25 @@ def _partition_value(self, partition_field: PartitionField, schema: Schema) -> A f"Cannot infer partition value from parquet metadata for a non-linear Partition Field: {partition_field.name} with transform {partition_field.transform}" ) - lower_value = partition_record_value( + source_field = schema.find_field(partition_field.source_id) + transform = partition_field.transform.transform(source_field.field_type) + + lower_value = transform(partition_record_value( partition_field=partition_field, value=self.column_aggregates[partition_field.source_id].current_min, schema=schema, - ) - upper_value = partition_record_value( + )) + upper_value = transform(partition_record_value( partition_field=partition_field, value=self.column_aggregates[partition_field.source_id].current_max, schema=schema, - ) + )) if lower_value != upper_value: raise ValueError( f"Cannot infer partition value from parquet metadata as there are more than one partition values for Partition Field: {partition_field.name}. {lower_value=}, {upper_value=}" ) - source_field = schema.find_field(partition_field.source_id) - transform = partition_field.transform.transform(source_field.field_type) - return transform(lower_value) + return lower_value def partition(self, partition_spec: PartitionSpec, schema: Schema) -> Record: return Record(**{field.name: self._partition_value(field, schema) for field in partition_spec.fields}) @@ -2353,7 +2354,8 @@ def data_file_statistics_from_parquet_metadata( ) -def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: +def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask], include_field_ids: bool = True + ) -> Iterator[DataFile]: from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties) @@ -2380,7 +2382,7 @@ def write_parquet(task: WriteTask) -> DataFile: file_schema=task.schema, batch=batch, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, - include_field_ids=True, + include_field_ids=include_field_ids, ) for batch in task.record_batches ] @@ -2549,6 +2551,7 @@ def _dataframe_to_data_files( io: FileIO, write_uuid: Optional[uuid.UUID] = None, counter: Optional[itertools.count[int]] = None, + include_field_ids: bool = True ) -> Iterable[DataFile]: """Convert a PyArrow table into a DataFile. @@ -2578,6 +2581,7 @@ def _dataframe_to_data_files( for batches in bin_pack_arrow_table(df, target_file_size) ] ), + include_field_ids=include_field_ids ) else: partitions = _determine_partitions(spec=table_metadata.spec(), schema=table_metadata.schema(), arrow_table=df) @@ -2597,6 +2601,7 @@ def _dataframe_to_data_files( for batches in bin_pack_arrow_table(partition.arrow_table_partition, target_file_size) ] ), + include_field_ids=include_field_ids ) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 679f74d107..e6068b02bf 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1226,6 +1226,49 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) with self.transaction() as tx: tx.append(df=df, snapshot_properties=snapshot_properties) + def write_parquet(self, df: pa.Table) -> List[str]: + """ + Shorthand API for writing a PyArrow table as Parquet files for the table. + Writes data files but does not commit them to the table. + + Args: + df: The Arrow table that will be written as Parquet files + + Returns: + List of file paths to the written Parquet files + """ + try: + import pyarrow as pa + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e + + from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files + + if not isinstance(df, pa.Table): + raise ValueError(f"Expected PyArrow table, got: {df}") + + if unsupported_partitions := [ + field for field in self.metadata.spec().fields if not field.transform.supports_pyarrow_transform + ]: + raise ValueError( + f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}." + ) + downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False + _check_pyarrow_schema_compatible( + self.metadata.schema(), provided_schema=df.schema, + downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us + ) + + if df.shape[0] > 0: + data_files = list( + _dataframe_to_data_files( + table_metadata=self.metadata, write_uuid=uuid.uuid4(), df=df, io=self.io, include_field_ids=False + ) + ) + + return [data_file.file_path for data_file in data_files] + + def dynamic_partition_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: """Shorthand for dynamic overwriting the table with a PyArrow table.