Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,11 @@ readable_metrics: [
[6.0989]]
```

!!! info
Content refers to type of content stored by the data file: `0` - `Data`, `1` - `Position Deletes`, `2` - `Equality Deletes`

To show only data files or delete files in the current snapshot, use `table.inspect.data_files()` and `table.inspect.delete_files()` respectively.

## Add Files

Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them.
Expand Down
27 changes: 19 additions & 8 deletions pyiceberg/table/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from __future__ import annotations

from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple

from pyiceberg.conversions import from_bytes
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary
Expand Down Expand Up @@ -473,7 +473,7 @@ def history(self) -> "pa.Table":

return pa.Table.from_pylist(history, schema=history_schema)

def files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
def _files(self, snapshot_id: Optional[int] = None, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table":
import pyarrow as pa

from pyiceberg.io.pyarrow import schema_to_pyarrow
Expand Down Expand Up @@ -530,6 +530,8 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
for manifest_list in snapshot.manifests(io):
for manifest_entry in manifest_list.fetch_manifest_entry(io):
data_file = manifest_entry.data_file
if data_file_filter and data_file.content not in data_file_filter:
continue
column_sizes = data_file.column_sizes or {}
value_counts = data_file.value_counts or {}
null_value_counts = data_file.null_value_counts or {}
Expand Down Expand Up @@ -558,12 +560,12 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
"spec_id": data_file.spec_id,
"record_count": data_file.record_count,
"file_size_in_bytes": data_file.file_size_in_bytes,
"column_sizes": dict(data_file.column_sizes),
"value_counts": dict(data_file.value_counts),
"null_value_counts": dict(data_file.null_value_counts),
"nan_value_counts": dict(data_file.nan_value_counts),
"lower_bounds": dict(data_file.lower_bounds),
"upper_bounds": dict(data_file.upper_bounds),
"column_sizes": dict(data_file.column_sizes) if data_file.column_sizes is not None else None,
"value_counts": dict(data_file.value_counts) if data_file.value_counts is not None else None,
"null_value_counts": dict(data_file.null_value_counts) if data_file.null_value_counts is not None else None,
"nan_value_counts": dict(data_file.nan_value_counts) if data_file.nan_value_counts is not None else None,
"lower_bounds": dict(data_file.lower_bounds) if data_file.lower_bounds is not None else None,
"upper_bounds": dict(data_file.upper_bounds) if data_file.upper_bounds is not None else None,
"key_metadata": data_file.key_metadata,
"split_offsets": data_file.split_offsets,
"equality_ids": data_file.equality_ids,
Expand All @@ -575,3 +577,12 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
files,
schema=files_schema,
)

def files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
return self._files(snapshot_id)

def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
return self._files(snapshot_id, {DataFileContent.DATA})

def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES})
268 changes: 145 additions & 123 deletions tests/integration/test_inspect_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,126 +672,141 @@ def test_inspect_files(
# append more data
tbl.append(arrow_table_with_null)

df = tbl.refresh().inspect.files()
# configure table properties
if format_version == 2:
with tbl.transaction() as txn:
txn.set_properties({"write.delete.mode": "merge-on-read"})
spark.sql(f"DELETE FROM {identifier} WHERE int = 1")
Comment on lines +675 to +679
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this to produce delete files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About this: #1066 (comment)
I have checked in Spark shell as well, the values are null

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know if this produces positional deletes, equality deletes, or both?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Spark only produces positional delete files. Flink might produce equality delete files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, good point. I can't find any reference with Spark and equality delete

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark streaming might produce equality deletes, I tried with Flink, created an Iceberg table and upserted some data into it, observed that there were both positional and equality deletes files, which looked weird to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was a nit comment btw, not blocking. we don't necessary have to test for equality delete files here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, other than this, other comments are resolved.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark only produces positional delete files. Flink might produce equality delete files.

That's my understanding as well. I think just testing for positional deletes is okay for now, and maybe we can think of ways in adding equality deletes in our test suite in the future.


assert df.column_names == [
"content",
"file_path",
"file_format",
"spec_id",
"record_count",
"file_size_in_bytes",
"column_sizes",
"value_counts",
"null_value_counts",
"nan_value_counts",
"lower_bounds",
"upper_bounds",
"key_metadata",
"split_offsets",
"equality_ids",
"sort_order_id",
"readable_metrics",
]

# make sure the non-nullable fields are filled
for int_column in ["content", "spec_id", "record_count", "file_size_in_bytes"]:
for value in df[int_column]:
assert isinstance(value.as_py(), int)

for split_offsets in df["split_offsets"]:
assert isinstance(split_offsets.as_py(), list)

for file_format in df["file_format"]:
assert file_format.as_py() == "PARQUET"
files_df = tbl.refresh().inspect.files()

for file_path in df["file_path"]:
assert file_path.as_py().startswith("s3://")
data_files_df = tbl.inspect.data_files()

lhs = df.to_pandas()
rhs = spark.table(f"{identifier}.files").toPandas()
delete_files_df = tbl.inspect.delete_files()

lhs_subset = lhs[
[
def inspect_files_asserts(df: pa.Table, spark_df: DataFrame) -> None:
assert df.column_names == [
"content",
"file_path",
"file_format",
"spec_id",
"record_count",
"file_size_in_bytes",
"column_sizes",
"value_counts",
"null_value_counts",
"nan_value_counts",
"lower_bounds",
"upper_bounds",
"key_metadata",
"split_offsets",
"equality_ids",
"sort_order_id",
"readable_metrics",
]
]
rhs_subset = rhs[
[
"content",
"file_path",
"file_format",
"spec_id",
"record_count",
"file_size_in_bytes",
"split_offsets",
"equality_ids",
"sort_order_id",

# make sure the non-nullable fields are filled
for int_column in ["content", "spec_id", "record_count", "file_size_in_bytes"]:
for value in df[int_column]:
assert isinstance(value.as_py(), int)

for split_offsets in df["split_offsets"]:
assert isinstance(split_offsets.as_py(), list)

for file_format in df["file_format"]:
assert file_format.as_py() == "PARQUET"

for file_path in df["file_path"]:
assert file_path.as_py().startswith("s3://")

lhs = df.to_pandas()
rhs = spark_df.toPandas()

lhs_subset = lhs[
[
"content",
"file_path",
"file_format",
"spec_id",
"record_count",
"file_size_in_bytes",
"split_offsets",
"equality_ids",
"sort_order_id",
]
]
rhs_subset = rhs[
[
"content",
"file_path",
"file_format",
"spec_id",
"record_count",
"file_size_in_bytes",
"split_offsets",
"equality_ids",
"sort_order_id",
]
]
]

assert_frame_equal(lhs_subset, rhs_subset, check_dtype=False, check_categorical=False)
assert_frame_equal(lhs_subset, rhs_subset, check_dtype=False, check_categorical=False)

for column in df.column_names:
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right):
# NaN != NaN in Python
continue
if column in [
"column_sizes",
"value_counts",
"null_value_counts",
"nan_value_counts",
"lower_bounds",
"upper_bounds",
]:
if isinstance(right, dict):
left = dict(left)
assert left == right, f"Difference in column {column}: {left} != {right}"
for column in df.column_names:
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right):
# NaN != NaN in Python
continue
if column in [
"column_sizes",
"value_counts",
"null_value_counts",
"nan_value_counts",
"lower_bounds",
"upper_bounds",
]:
if isinstance(right, dict):
left = dict(left)
assert left == right, f"Difference in column {column}: {left} != {right}"

elif column == "readable_metrics":
assert list(left.keys()) == [
"bool",
"string",
"string_long",
"int",
"long",
"float",
"double",
"timestamp",
"timestamptz",
"date",
"binary",
"fixed",
]
assert left.keys() == right.keys()

for rm_column in left.keys():
rm_lhs = left[rm_column]
rm_rhs = right[rm_column]

assert rm_lhs["column_size"] == rm_rhs["column_size"]
assert rm_lhs["value_count"] == rm_rhs["value_count"]
assert rm_lhs["null_value_count"] == rm_rhs["null_value_count"]
assert rm_lhs["nan_value_count"] == rm_rhs["nan_value_count"]

if rm_column == "timestamptz":
# PySpark does not correctly set the timstamptz
rm_rhs["lower_bound"] = rm_rhs["lower_bound"].replace(tzinfo=pytz.utc)
rm_rhs["upper_bound"] = rm_rhs["upper_bound"].replace(tzinfo=pytz.utc)

assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"]
assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"]
else:
assert left == right, f"Difference in column {column}: {left} != {right}"
elif column == "readable_metrics":
assert list(left.keys()) == [
"bool",
"string",
"string_long",
"int",
"long",
"float",
"double",
"timestamp",
"timestamptz",
"date",
"binary",
"fixed",
]
assert left.keys() == right.keys()

for rm_column in left.keys():
rm_lhs = left[rm_column]
rm_rhs = right[rm_column]

assert rm_lhs["column_size"] == rm_rhs["column_size"]
assert rm_lhs["value_count"] == rm_rhs["value_count"]
assert rm_lhs["null_value_count"] == rm_rhs["null_value_count"]
assert rm_lhs["nan_value_count"] == rm_rhs["nan_value_count"]

if rm_column == "timestamptz" and rm_rhs["lower_bound"] and rm_rhs["upper_bound"]:
# PySpark does not correctly set the timstamptz
rm_rhs["lower_bound"] = rm_rhs["lower_bound"].replace(tzinfo=pytz.utc)
rm_rhs["upper_bound"] = rm_rhs["upper_bound"].replace(tzinfo=pytz.utc)

assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"]
assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"]
else:
assert left == right, f"Difference in column {column}: {left} != {right}"

inspect_files_asserts(files_df, spark.table(f"{identifier}.files"))
inspect_files_asserts(data_files_df, spark.table(f"{identifier}.data_files"))
inspect_files_asserts(delete_files_df, spark.table(f"{identifier}.delete_files"))


@pytest.mark.integration
Expand All @@ -801,26 +816,33 @@ def test_inspect_files_no_snapshot(spark: SparkSession, session_catalog: Catalog

tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version})

df = tbl.refresh().inspect.files()
files_df = tbl.refresh().inspect.files()
data_files_df = tbl.inspect.data_files()
delete_files_df = tbl.inspect.delete_files()

assert df.column_names == [
"content",
"file_path",
"file_format",
"spec_id",
"record_count",
"file_size_in_bytes",
"column_sizes",
"value_counts",
"null_value_counts",
"nan_value_counts",
"lower_bounds",
"upper_bounds",
"key_metadata",
"split_offsets",
"equality_ids",
"sort_order_id",
"readable_metrics",
]
def inspect_files_asserts(df: pa.Table) -> None:
assert df.column_names == [
"content",
"file_path",
"file_format",
"spec_id",
"record_count",
"file_size_in_bytes",
"column_sizes",
"value_counts",
"null_value_counts",
"nan_value_counts",
"lower_bounds",
"upper_bounds",
"key_metadata",
"split_offsets",
"equality_ids",
"sort_order_id",
"readable_metrics",
]

assert df.to_pandas().empty is True

assert df.to_pandas().empty is True
inspect_files_asserts(files_df)
inspect_files_asserts(data_files_df)
inspect_files_asserts(delete_files_df)