-
Notifications
You must be signed in to change notification settings - Fork 414
Add Support for Dynamic Overwrite #931
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Support for Dynamic Overwrite #931
Conversation
sungwy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @jqin61 this PR is looking great. I left a few nit suggestions and a few pointers to incorporate a new features like merge_append
pyiceberg/table/__init__.py
Outdated
| for data_file in data_files: | ||
| append_files.append_data_file(data_file) | ||
|
|
||
| def _build_partition_predicate(self, spec_id: int, delete_partitions: List[Record]) -> BooleanExpression: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I found the delete_partitions argument a bit confusing here, because this function just translates a set of partition record values to its corresponding predicate. Could we rename it to something more generic to indicate that? W should also remove spec_id which isn't used in this function
| def _build_partition_predicate(self, spec_id: int, delete_partitions: List[Record]) -> BooleanExpression: | |
| def _build_partition_predicate(self, partition_records: List[Record]) -> BooleanExpression: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, thanks!
Fokko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @jqin61 Good seeing you here again! 🙌 I'll do a more in-depth review tomorrow morning. Could you also document this in the docs under mkdocs/? Otherwise folks won't be able to find this awesome feature 👍
pyiceberg/table/__init__.py
Outdated
| for data_file in data_files: | ||
| append_files.append_data_file(data_file) | ||
|
|
||
| def _build_partition_predicate(self, partition_records: List[Record]) -> BooleanExpression: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add two tests where:
- We start with an unpartitioned table, write some data, and then evolve to a partition.
- Start with a Monthly partitioned table, insert data for a few days, convert the partition to a daily partition, and dynamically overwrite a single day.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the thorough review! So if the table is firstly unpartitioned and then evolved into a partition. I think the expected bahavior is that dynamic overwrite will also delete (potentially through overwrite) data in the unpartitioned files?
|
Left some more comments @jqin61, thanks for working on this 👍 |
|
@jqin61 Sorry for the slow review, I was doing some other stuff as well. Can you fix the merge conflicts? I think this looks good to go 👍 |
Thank you Fokko! Sorry for the delay, I was extremely busy recently, I will get some time next weekend to fix the comments, add tests and fix the documentation. I will also move the transform support out of the scope of this pr due to its complexity, will send you details about it soon. |
…pyarrow_schema_compatible
sungwy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @jqin61 - this looks good to me. I've added some nit suggestions to the documentation.
Thank you again for working on this amazing feature!
mkdocs/docs/api.md
Outdated
|
|
||
| ### Partial overwrites | ||
|
|
||
| You can use overwrite with an overwrite filter `tbl.overwrite(df,overwrite_filter)` to delete partial table data which matches the filter before appending new data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| You can use overwrite with an overwrite filter `tbl.overwrite(df,overwrite_filter)` to delete partial table data which matches the filter before appending new data. | |
| When using the `overwrite` API, you can use an `overwrite_filter` to delete data that that matches the filter before appending new data into the table. |
mkdocs/docs/api.md
Outdated
| tbl.overwrite(df, overwrite_filter=EqualTo('city', "Paris")) | ||
| ``` | ||
|
|
||
| This results in such data if data is printed by `tbl.scan().to_arrow()`: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| This results in such data if data is printed by `tbl.scan().to_arrow()`: | |
| This produces the following result with `tbl.scan().to_arrow()`: |
mkdocs/docs/api.md
Outdated
| long: [[74.006],[4.896029,6.0989,2.349014]] | ||
| ``` | ||
|
|
||
| If the PyIceberg table is partitioned, you can use `tbl.dynamic_partition_overwrite(df)` to replace the partitions with new ones provided in the dataframe. The partitions to be replaced are detected automatically. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| If the PyIceberg table is partitioned, you can use `tbl.dynamic_partition_overwrite(df)` to replace the partitions with new ones provided in the dataframe. The partitions to be replaced are detected automatically. | |
| If the PyIceberg table is partitioned, you can use `tbl.dynamic_partition_overwrite(df)` to replace the existing partitions with new ones provided in the dataframe. The partitions to be replaced are detected automatically from the provided arrow table. |
mkdocs/docs/api.md
Outdated
| ``` | ||
|
|
||
| If the PyIceberg table is partitioned, you can use `tbl.dynamic_partition_overwrite(df)` to replace the partitions with new ones provided in the dataframe. The partitions to be replaced are detected automatically. | ||
| To try out it, you could firstly create a same PyIceberg table with partition specified on `"city"` field: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| To try out it, you could firstly create a same PyIceberg table with partition specified on `"city"` field: | |
| For example, with an iceberg table with a partition specified on `"city"` field: |
mkdocs/docs/api.md
Outdated
| ) | ||
| ``` | ||
|
|
||
| And then suppose the data for the partition of `"paris"` is wrong: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| And then suppose the data for the partition of `"paris"` is wrong: | |
| And we want to overwrite the data for the partition of `"Paris"`: |
mkdocs/docs/api.md
Outdated
| tbl.append(df) | ||
| ``` | ||
|
|
||
| Then you could use dynamic overwrite on this partition: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Then you could use dynamic overwrite on this partition: | |
| Then we can call `dynamic_partition_overwrite` with this arrow table: |
mkdocs/docs/api.md
Outdated
| tbl.dynamic_partition_overwrite(df_corrected) | ||
| ``` | ||
|
|
||
| This results in such data if data is printed by `tbl.scan().to_arrow()`: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| This results in such data if data is printed by `tbl.scan().to_arrow()`: | |
| This produces the following result with `tbl.scan().to_arrow()`: |
pyiceberg/table/__init__.py
Outdated
| The function detects partition values in the provided arrow table that using the current table | ||
| partition spec, and deletes existing partitions matching these values. Finally, the | ||
| data in the table is appended to the table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| The function detects partition values in the provided arrow table that using the current table | |
| partition spec, and deletes existing partitions matching these values. Finally, the | |
| data in the table is appended to the table. | |
| The function detects partition values in the provided arrow table using the current | |
| partition spec, and deletes existing partitions matching these values. Finally, the | |
| data in the table is appended to the table. |
jqin61
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sungwy Thank you for the detailed wording enhancements guidance. I updated the docs. Pls re-review when you get a chance
|
Thank you for making this contribution @jqin61 ! I'll leave this PR open for another review, especially given that it introduces a new table commit API |
mkdocs/docs/api.md
Outdated
| lat: double | ||
| long: double | ||
| ---- | ||
| city: [["New York"],["Amsterdam","Drachten","Paris"]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this example is correct. Paris should have been overwritten, right? It looks like we lost San Fran'.
pyiceberg/table/__init__.py
Outdated
| for data_file in data_files: | ||
| append_files.append_data_file(data_file) | ||
|
|
||
| def _build_partition_predicate(self, partition_records: Set[Record]) -> BooleanExpression: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a little doc here describing what the function does?
| tbl = session_catalog.create_table( | ||
| identifier=identifier, | ||
| schema=TABLE_SCHEMA, | ||
| properties={"format-version": "2"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we're testing the same thing twice :)
| properties={"format-version": "2"}, | |
| properties={"format-version": str(format_version)}, |
| # expecting 3 files: | ||
| rows = spark.sql(f"select partition from {identifier}.files").collect() | ||
| assert len(rows) == 3 | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is also a good test to have:
@pytest.mark.integration
@pytest.mark.parametrize(
"format_version",
[1, 2],
)
def test_dynamic_partition_overwrite_rename_column(
spark: SparkSession, session_catalog: Catalog, format_version: int
) -> None:
arrow_table = pa.Table.from_pydict(
{
"place": ["Amsterdam", "Drachten"],
"inhabitants": [921402, 44940],
},
)
identifier = f"default.partitioned_{format_version}_dynamic_partition_overwrite_rename_column"
try:
session_catalog.drop_table(identifier)
except:
pass
tbl = session_catalog.create_table(
identifier= identifier,
schema=arrow_table.schema,
properties={"format-version": str(format_version)},
)
with tbl.transaction() as tx:
with tx.update_spec() as schema:
schema.add_identity("place")
tbl.append(arrow_table)
with tbl.transaction() as tx:
with tx.update_schema() as schema:
schema.rename_column("place", "city")
arrow_table = pa.Table.from_pydict(
{
"city": ["Drachten"],
"inhabitants": [44941], # A new baby was born!
},
)
tbl.dynamic_partition_overwrite(arrow_table)
result = tbl.scan().to_arrow()
assert result['city'].to_pylist() == ['Drachten', 'Amsterdam']
assert result['inhabitants'].to_pylist() == [44941, 921402]There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pytest.mark.integration
@pytest.mark.parametrize(
"format_version",
[1, 2],
)
@pytest.mark.filterwarnings("ignore")
def test_dynamic_partition_overwrite_evolve_partition(
spark: SparkSession, session_catalog: Catalog, format_version: int
) -> None:
arrow_table = pa.Table.from_pydict(
{
"place": ["Amsterdam", "Drachten"],
"inhabitants": [921402, 44940],
},
)
identifier = f"default.partitioned_{format_version}_test_dynamic_partition_overwrite_evolve_partition"
try:
session_catalog.drop_table(identifier)
except:
pass
tbl = session_catalog.create_table(
identifier=identifier,
schema=arrow_table.schema,
properties={"format-version": str(format_version)},
)
with tbl.transaction() as tx:
with tx.update_spec() as schema:
schema.add_identity("place")
tbl.append(arrow_table)
with tbl.transaction() as tx:
with tx.update_schema() as schema:
schema.add_column("country", StringType())
with tx.update_spec() as schema:
schema.add_identity("country")
arrow_table = pa.Table.from_pydict(
{
"place": ["Groningen"],
"country": ["Netherlands"],
"inhabitants": [238147],
},
)
tbl.dynamic_partition_overwrite(arrow_table)
result = tbl.scan().to_arrow()
assert result['place'].to_pylist() == ['Groningen', 'Amsterdam', 'Drachten']
assert result['inhabitants'].to_pylist() == [238147, 921402, 44940]
pyiceberg/table/__init__.py
Outdated
| manifest_merge_enabled = property_as_bool( | ||
| self.table_metadata.properties, | ||
| TableProperties.MANIFEST_MERGE_ENABLED, | ||
| TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT, | ||
| ) | ||
| update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties) | ||
| append_method = update_snapshot.merge_append if manifest_merge_enabled else update_snapshot.fast_append |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is duplicated below as well, maybe move it into a function?
Fokko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some small stuff, but apart from that it looks good to me 👍 Thanks for working on this, and sorry for the long wait
|
@jqin61 Do you have time to follow up on the last few comments? Would be great to get this in 👍 |
|
Thanks for fixing the CI, shall we rerun and merge? @Fokko Thank you! |
sungwy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jqin61 again for contributing this feature!
Added support for dynamic overwrite leveraging delete and fast-append(counterpart in Iceberg Spark).
Several follow-ups:
Closes #1287