diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 53a7846be3..eaffb84a54 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -845,6 +845,11 @@ readable_metrics: [ [6.0989]] ``` +!!! info + Content refers to type of content stored by the data file: `0` - `Data`, `1` - `Position Deletes`, `2` - `Equality Deletes` + +To show only data files or delete files in the current snapshot, use `table.inspect.data_files()` and `table.inspect.delete_files()` respectively. + ## Add Files Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them. diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 3f64255e28..470c00f464 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -17,7 +17,7 @@ from __future__ import annotations from datetime import datetime -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple from pyiceberg.conversions import from_bytes from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary @@ -473,7 +473,7 @@ def history(self) -> "pa.Table": return pa.Table.from_pylist(history, schema=history_schema) - def files(self, snapshot_id: Optional[int] = None) -> "pa.Table": + def _files(self, snapshot_id: Optional[int] = None, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table": import pyarrow as pa from pyiceberg.io.pyarrow import schema_to_pyarrow @@ -530,6 +530,8 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: for manifest_list in snapshot.manifests(io): for manifest_entry in manifest_list.fetch_manifest_entry(io): data_file = manifest_entry.data_file + if data_file_filter and data_file.content not in data_file_filter: + continue column_sizes = data_file.column_sizes or {} value_counts = data_file.value_counts or {} null_value_counts = data_file.null_value_counts or {} @@ -558,12 +560,12 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: "spec_id": data_file.spec_id, "record_count": data_file.record_count, "file_size_in_bytes": data_file.file_size_in_bytes, - "column_sizes": dict(data_file.column_sizes), - "value_counts": dict(data_file.value_counts), - "null_value_counts": dict(data_file.null_value_counts), - "nan_value_counts": dict(data_file.nan_value_counts), - "lower_bounds": dict(data_file.lower_bounds), - "upper_bounds": dict(data_file.upper_bounds), + "column_sizes": dict(data_file.column_sizes) if data_file.column_sizes is not None else None, + "value_counts": dict(data_file.value_counts) if data_file.value_counts is not None else None, + "null_value_counts": dict(data_file.null_value_counts) if data_file.null_value_counts is not None else None, + "nan_value_counts": dict(data_file.nan_value_counts) if data_file.nan_value_counts is not None else None, + "lower_bounds": dict(data_file.lower_bounds) if data_file.lower_bounds is not None else None, + "upper_bounds": dict(data_file.upper_bounds) if data_file.upper_bounds is not None else None, "key_metadata": data_file.key_metadata, "split_offsets": data_file.split_offsets, "equality_ids": data_file.equality_ids, @@ -575,3 +577,12 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: files, schema=files_schema, ) + + def files(self, snapshot_id: Optional[int] = None) -> "pa.Table": + return self._files(snapshot_id) + + def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table": + return self._files(snapshot_id, {DataFileContent.DATA}) + + def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table": + return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 9f63225846..68b10f3262 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -672,126 +672,141 @@ def test_inspect_files( # append more data tbl.append(arrow_table_with_null) - df = tbl.refresh().inspect.files() + # configure table properties + if format_version == 2: + with tbl.transaction() as txn: + txn.set_properties({"write.delete.mode": "merge-on-read"}) + spark.sql(f"DELETE FROM {identifier} WHERE int = 1") - assert df.column_names == [ - "content", - "file_path", - "file_format", - "spec_id", - "record_count", - "file_size_in_bytes", - "column_sizes", - "value_counts", - "null_value_counts", - "nan_value_counts", - "lower_bounds", - "upper_bounds", - "key_metadata", - "split_offsets", - "equality_ids", - "sort_order_id", - "readable_metrics", - ] - - # make sure the non-nullable fields are filled - for int_column in ["content", "spec_id", "record_count", "file_size_in_bytes"]: - for value in df[int_column]: - assert isinstance(value.as_py(), int) - - for split_offsets in df["split_offsets"]: - assert isinstance(split_offsets.as_py(), list) - - for file_format in df["file_format"]: - assert file_format.as_py() == "PARQUET" + files_df = tbl.refresh().inspect.files() - for file_path in df["file_path"]: - assert file_path.as_py().startswith("s3://") + data_files_df = tbl.inspect.data_files() - lhs = df.to_pandas() - rhs = spark.table(f"{identifier}.files").toPandas() + delete_files_df = tbl.inspect.delete_files() - lhs_subset = lhs[ - [ + def inspect_files_asserts(df: pa.Table, spark_df: DataFrame) -> None: + assert df.column_names == [ "content", "file_path", "file_format", "spec_id", "record_count", "file_size_in_bytes", + "column_sizes", + "value_counts", + "null_value_counts", + "nan_value_counts", + "lower_bounds", + "upper_bounds", + "key_metadata", "split_offsets", "equality_ids", "sort_order_id", + "readable_metrics", ] - ] - rhs_subset = rhs[ - [ - "content", - "file_path", - "file_format", - "spec_id", - "record_count", - "file_size_in_bytes", - "split_offsets", - "equality_ids", - "sort_order_id", + + # make sure the non-nullable fields are filled + for int_column in ["content", "spec_id", "record_count", "file_size_in_bytes"]: + for value in df[int_column]: + assert isinstance(value.as_py(), int) + + for split_offsets in df["split_offsets"]: + assert isinstance(split_offsets.as_py(), list) + + for file_format in df["file_format"]: + assert file_format.as_py() == "PARQUET" + + for file_path in df["file_path"]: + assert file_path.as_py().startswith("s3://") + + lhs = df.to_pandas() + rhs = spark_df.toPandas() + + lhs_subset = lhs[ + [ + "content", + "file_path", + "file_format", + "spec_id", + "record_count", + "file_size_in_bytes", + "split_offsets", + "equality_ids", + "sort_order_id", + ] + ] + rhs_subset = rhs[ + [ + "content", + "file_path", + "file_format", + "spec_id", + "record_count", + "file_size_in_bytes", + "split_offsets", + "equality_ids", + "sort_order_id", + ] ] - ] - assert_frame_equal(lhs_subset, rhs_subset, check_dtype=False, check_categorical=False) + assert_frame_equal(lhs_subset, rhs_subset, check_dtype=False, check_categorical=False) - for column in df.column_names: - for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): - if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right): - # NaN != NaN in Python - continue - if column in [ - "column_sizes", - "value_counts", - "null_value_counts", - "nan_value_counts", - "lower_bounds", - "upper_bounds", - ]: - if isinstance(right, dict): - left = dict(left) - assert left == right, f"Difference in column {column}: {left} != {right}" + for column in df.column_names: + for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): + if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right): + # NaN != NaN in Python + continue + if column in [ + "column_sizes", + "value_counts", + "null_value_counts", + "nan_value_counts", + "lower_bounds", + "upper_bounds", + ]: + if isinstance(right, dict): + left = dict(left) + assert left == right, f"Difference in column {column}: {left} != {right}" - elif column == "readable_metrics": - assert list(left.keys()) == [ - "bool", - "string", - "string_long", - "int", - "long", - "float", - "double", - "timestamp", - "timestamptz", - "date", - "binary", - "fixed", - ] - assert left.keys() == right.keys() - - for rm_column in left.keys(): - rm_lhs = left[rm_column] - rm_rhs = right[rm_column] - - assert rm_lhs["column_size"] == rm_rhs["column_size"] - assert rm_lhs["value_count"] == rm_rhs["value_count"] - assert rm_lhs["null_value_count"] == rm_rhs["null_value_count"] - assert rm_lhs["nan_value_count"] == rm_rhs["nan_value_count"] - - if rm_column == "timestamptz": - # PySpark does not correctly set the timstamptz - rm_rhs["lower_bound"] = rm_rhs["lower_bound"].replace(tzinfo=pytz.utc) - rm_rhs["upper_bound"] = rm_rhs["upper_bound"].replace(tzinfo=pytz.utc) - - assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"] - assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"] - else: - assert left == right, f"Difference in column {column}: {left} != {right}" + elif column == "readable_metrics": + assert list(left.keys()) == [ + "bool", + "string", + "string_long", + "int", + "long", + "float", + "double", + "timestamp", + "timestamptz", + "date", + "binary", + "fixed", + ] + assert left.keys() == right.keys() + + for rm_column in left.keys(): + rm_lhs = left[rm_column] + rm_rhs = right[rm_column] + + assert rm_lhs["column_size"] == rm_rhs["column_size"] + assert rm_lhs["value_count"] == rm_rhs["value_count"] + assert rm_lhs["null_value_count"] == rm_rhs["null_value_count"] + assert rm_lhs["nan_value_count"] == rm_rhs["nan_value_count"] + + if rm_column == "timestamptz" and rm_rhs["lower_bound"] and rm_rhs["upper_bound"]: + # PySpark does not correctly set the timstamptz + rm_rhs["lower_bound"] = rm_rhs["lower_bound"].replace(tzinfo=pytz.utc) + rm_rhs["upper_bound"] = rm_rhs["upper_bound"].replace(tzinfo=pytz.utc) + + assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"] + assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"] + else: + assert left == right, f"Difference in column {column}: {left} != {right}" + + inspect_files_asserts(files_df, spark.table(f"{identifier}.files")) + inspect_files_asserts(data_files_df, spark.table(f"{identifier}.data_files")) + inspect_files_asserts(delete_files_df, spark.table(f"{identifier}.delete_files")) @pytest.mark.integration @@ -801,26 +816,33 @@ def test_inspect_files_no_snapshot(spark: SparkSession, session_catalog: Catalog tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) - df = tbl.refresh().inspect.files() + files_df = tbl.refresh().inspect.files() + data_files_df = tbl.inspect.data_files() + delete_files_df = tbl.inspect.delete_files() - assert df.column_names == [ - "content", - "file_path", - "file_format", - "spec_id", - "record_count", - "file_size_in_bytes", - "column_sizes", - "value_counts", - "null_value_counts", - "nan_value_counts", - "lower_bounds", - "upper_bounds", - "key_metadata", - "split_offsets", - "equality_ids", - "sort_order_id", - "readable_metrics", - ] + def inspect_files_asserts(df: pa.Table) -> None: + assert df.column_names == [ + "content", + "file_path", + "file_format", + "spec_id", + "record_count", + "file_size_in_bytes", + "column_sizes", + "value_counts", + "null_value_counts", + "nan_value_counts", + "lower_bounds", + "upper_bounds", + "key_metadata", + "split_offsets", + "equality_ids", + "sort_order_id", + "readable_metrics", + ] + + assert df.to_pandas().empty is True - assert df.to_pandas().empty is True + inspect_files_asserts(files_df) + inspect_files_asserts(data_files_df) + inspect_files_asserts(delete_files_df)