Skip to content

Commit b2344de

Browse files
committed
Add history inspect table
1 parent a29491a commit b2344de

File tree

3 files changed

+118
-0
lines changed

3 files changed

+118
-0
lines changed

mkdocs/docs/api.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,27 @@ partition_summaries: [[ -- is_valid: all not null
656656
["test"]]]
657657
```
658658

659+
### History
660+
661+
To show a table's history:
662+
663+
```python
664+
table.inspect.history()
665+
```
666+
667+
```
668+
pyarrow.Table
669+
made_current_at: timestamp[ms] not null
670+
snapshot_id: int64 not null
671+
parent_id: int64
672+
is_current_ancestor: bool not null
673+
----
674+
made_current_at: [[2024-06-18 16:17:48.768,2024-06-18 16:17:49.240,2024-06-18 16:17:49.343,2024-06-18 16:17:49.511]]
675+
snapshot_id: [[4358109269873137077,3380769165026943338,4358109269873137077,3089420140651211776]]
676+
parent_id: [[null,4358109269873137077,null,4358109269873137077]]
677+
is_current_ancestor: [[true,false,true,true]]
678+
```
679+
659680
## Add Files
660681

661682
Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them.

pyiceberg/table/__init__.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@
113113
SnapshotLogEntry,
114114
SnapshotSummaryCollector,
115115
Summary,
116+
ancestors_of,
116117
update_snapshot_summaries,
117118
)
118119
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -3827,6 +3828,33 @@ def _partition_summaries_to_rows(
38273828
schema=manifest_schema,
38283829
)
38293830

3831+
def history(self) -> "pa.Table":
3832+
import pyarrow as pa
3833+
3834+
history_schema = pa.schema([
3835+
pa.field("made_current_at", pa.timestamp(unit="ms"), nullable=False),
3836+
pa.field("snapshot_id", pa.int64(), nullable=False),
3837+
pa.field("parent_id", pa.int64(), nullable=True),
3838+
pa.field("is_current_ancestor", pa.bool_(), nullable=False),
3839+
])
3840+
3841+
ancestors_ids = {snapshot.snapshot_id for snapshot in ancestors_of(self.tbl.current_snapshot(), self.tbl.metadata)}
3842+
3843+
history = []
3844+
metadata = self.tbl.metadata
3845+
3846+
for snapshot_entry in metadata.snapshot_log:
3847+
snapshot = metadata.snapshot_by_id(snapshot_entry.snapshot_id)
3848+
3849+
history.append({
3850+
"made_current_at": datetime.utcfromtimestamp(snapshot_entry.timestamp_ms / 1000.0),
3851+
"snapshot_id": snapshot_entry.snapshot_id,
3852+
"parent_id": snapshot.parent_snapshot_id if snapshot else None,
3853+
"is_current_ancestor": snapshot_entry.snapshot_id in ancestors_ids,
3854+
})
3855+
3856+
return pa.Table.from_pylist(history, schema=history_schema)
3857+
38303858

38313859
@dataclass(frozen=True)
38323860
class TablePartition:

tests/integration/test_inspect_table.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,3 +528,72 @@ def test_inspect_manifests(spark: SparkSession, session_catalog: Catalog, format
528528
for column in df.column_names:
529529
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
530530
assert left == right, f"Difference in column {column}: {left} != {right}"
531+
532+
533+
@pytest.mark.integration
534+
@pytest.mark.parametrize("format_version", [1, 2])
535+
def test_inspect_history(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
536+
identifier = "default.table_history"
537+
538+
try:
539+
session_catalog.drop_table(identifier=identifier)
540+
except NoSuchTableError:
541+
pass
542+
543+
spark.sql(
544+
f"""
545+
CREATE TABLE {identifier} (
546+
id int,
547+
data string
548+
)
549+
PARTITIONED BY (data)
550+
"""
551+
)
552+
553+
spark.sql(
554+
f"""
555+
INSERT INTO {identifier} VALUES (1, "a")
556+
"""
557+
)
558+
559+
table = session_catalog.load_table(identifier)
560+
first_snapshot = table.current_snapshot()
561+
snapshot_id = None if not first_snapshot else first_snapshot.snapshot_id
562+
563+
spark.sql(
564+
f"""
565+
INSERT INTO {identifier} VALUES (2, "b")
566+
"""
567+
)
568+
569+
spark.sql(
570+
f"""
571+
CALL integration.system.rollback_to_snapshot('{identifier}', {snapshot_id})
572+
"""
573+
)
574+
575+
spark.sql(
576+
f"""
577+
INSERT INTO {identifier} VALUES (3, "c")
578+
"""
579+
)
580+
581+
table.refresh()
582+
583+
df = table.inspect.history()
584+
585+
assert df.column_names == [
586+
"made_current_at",
587+
"snapshot_id",
588+
"parent_id",
589+
"is_current_ancestor",
590+
]
591+
592+
lhs = spark.table(f"{identifier}.history").toPandas()
593+
rhs = df.to_pandas()
594+
for column in df.column_names:
595+
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
596+
if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right):
597+
# NaN != NaN in Python
598+
continue
599+
assert left == right, f"Difference in column {column}: {left} != {right}"

0 commit comments

Comments
 (0)