Conversation
|
R: @chamikaramj |
There was a problem hiding this comment.
lol I missed this one
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
201e6cb to
a06a187
Compare
There was a problem hiding this comment.
I'm wondering if we can strip dynamic destinations based on UDFs out and think about how to introduce dynamic destinations to this I/O in a portable way based on https://s.apache.org/portable-dynamic-destinations
There was a problem hiding this comment.
I left them in a little bit for abstraction, but it can be an implementation detail and IcebergIO.writeToDestinations(...) can just take the string pattern. I haven't done that part yet. I was mostly getting the main body of the transform to only do Rows
There was a problem hiding this comment.
Is it possible to easily convert "IcebergCatalog" into a portable representation for SchemaTransforms ?
There was a problem hiding this comment.
TBD. Leaving all "catalog" questions unresolved for this revision.
There was a problem hiding this comment.
I would just limit this to PTransform<PCollection<Row>, IcebergWriteResult<Row>> to make this portability first and make it friendly for SchemaTransforms.
There was a problem hiding this comment.
Done (and even simpler)
There was a problem hiding this comment.
Any idea how we got to these defaults ? (if so we should document)
There was a problem hiding this comment.
I have no idea. This number 20 must be just a guess. Some of the others appear to be BigQuery quota limitations that we can just ignore. One thing that we should do is that I read a lot online about ideal iceberg file size being 512mb (that's what some internal iceberg code does I guess) so perhaps we follow that. I'm still learning the iceberg Java APIs and the best way to use their best practices.
There was a problem hiding this comment.
Can we use the new DLQ framework instead ? (seems like this is following the old DLQ implementation in BQ).
New framework also considers portability aspects for example so it's more advantageous.
https://docs.google.com/document/d/1NGeCk6tOqF-TiGEAV7ixd_vhIiWz9sHPlCa1P_77Ajs/edit?tab=t.0#heading=h.fppublcudjbt
(can be a separate PR but we should remove the DLQ feature from this PR in that case)
There was a problem hiding this comment.
I just left it out for now.
There was a problem hiding this comment.
Not sure what we are doing here. Are we trying to write failed records again and flatten with the originally written records (in the subsequent step below) ?
Possibly we should be writing failed records to a DLQ ?
There was a problem hiding this comment.
Re-reading the code, seems like failedWrites here are actually due to previous WriteBundlesToFiles exceeding any of the limits provided to the transform (DEFAULT_MAX_WRITERS_PER_BUNDLE, DEFAULT_MAX_BYTES_PER_FILE). We group known set of spilled over records and write in the subsequent transform which makes sense. We should probably change 'failedWrites' to 'spilledOverWrites'.
There was a problem hiding this comment.
I have now totally refactored this and renamed everything. Thanks for your description; it helped a lot to understand how to organize it.
There was a problem hiding this comment.
Prob rename to MetadataUpdateDoFn for clarify.
There was a problem hiding this comment.
Done, but I still need to refactor this out anyhow.
There was a problem hiding this comment.
Probably this should be followed up by another GBK and a cleanup step that deletes temp files (of this step and any failed work items).
There was a problem hiding this comment.
Oh and btw the files are not tmp. They become part of the table. So it is simpler than the BQ equivalent.
There was a problem hiding this comment.
Seems like this has a lot of copied over logic from BQ dynamic destinations which probably we can simplify/change if we went with the new DLQ framework.
There was a problem hiding this comment.
Gotcha. I removed actually all the logic and just do something extremely basic for now. I guess DLQ could be update-incompatible change so I better get that done really quick too.
There was a problem hiding this comment.
Seems like org.apache.hadoop.conf.Configuration is a set of string key value pairs.
https://hadoop.apache.org/docs/current/api/org/apache/hadoop/conf/Configuration.html
May be we should just accept a org.apache.hadoop.conf.Configuration and build the Hadoop Configuration to make this more portability friendly.
There was a problem hiding this comment.
That makes sense. Leaving this unresolved as I did not get to this yet.
There was a problem hiding this comment.
Re-reading the code, seems like failedWrites here are actually due to previous WriteBundlesToFiles exceeding any of the limits provided to the transform (DEFAULT_MAX_WRITERS_PER_BUNDLE, DEFAULT_MAX_BYTES_PER_FILE). We group known set of spilled over records and write in the subsequent transform which makes sense. We should probably change 'failedWrites' to 'spilledOverWrites'.
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #30797 +/- ##
=======================================
Coverage 71.47% 71.47%
=======================================
Files 710 710
Lines 104815 104815
=======================================
Hits 74915 74915
Misses 28268 28268
Partials 1632 1632 ☔ View full report in Codecov by Sentry. |
kennknowles
left a comment
There was a problem hiding this comment.
OK I did a major revision to clarify things and streamline the main logic around writing rows. Still need another major revision to address the remaining non-portable pieces and DLQ.
There was a problem hiding this comment.
I left them in a little bit for abstraction, but it can be an implementation detail and IcebergIO.writeToDestinations(...) can just take the string pattern. I haven't done that part yet. I was mostly getting the main body of the transform to only do Rows
There was a problem hiding this comment.
Gotcha. I removed actually all the logic and just do something extremely basic for now. I guess DLQ could be update-incompatible change so I better get that done really quick too.
There was a problem hiding this comment.
That makes sense. Leaving this unresolved as I did not get to this yet.
There was a problem hiding this comment.
TBD. Leaving all "catalog" questions unresolved for this revision.
There was a problem hiding this comment.
Done (and even simpler)
There was a problem hiding this comment.
I have no idea. This number 20 must be just a guess. Some of the others appear to be BigQuery quota limitations that we can just ignore. One thing that we should do is that I read a lot online about ideal iceberg file size being 512mb (that's what some internal iceberg code does I guess) so perhaps we follow that. I'm still learning the iceberg Java APIs and the best way to use their best practices.
There was a problem hiding this comment.
I just left it out for now.
There was a problem hiding this comment.
I have now totally refactored this and renamed everything. Thanks for your description; it helped a lot to understand how to organize it.
There was a problem hiding this comment.
Done, but I still need to refactor this out anyhow.
0ccdf45 to
5af12aa
Compare
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #30797 +/- ##
=============================================
- Coverage 70.95% 0 -70.96%
=============================================
Files 1257 0 -1257
Lines 140939 0 -140939
Branches 4307 0 -4307
=============================================
- Hits 100004 0 -100004
+ Misses 37456 0 -37456
+ Partials 3479 0 -3479
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
|
OK I have done a whole massive revision and tested it a little bit more. The only piece that I have not revised is the |
It looks like this might work: https://github.com/tabular-io/iceberg-kafka-connect/blob/5ab5c538efab9ccf3cde166f36ba34189eed7187/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkConfig.java#L256 |
chamikaramj
left a comment
There was a problem hiding this comment.
Thanks. Looks great and almost there!
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
If this is not configurable, let's document.
There was a problem hiding this comment.
It should be configurable. In testing, I have discovered that the ORC codepath doesn't work so I've changed it to throw.
There was a problem hiding this comment.
Shouldn't this update be atomic for all files ?
In so, we might have to push this to a separate step behind a shuffle.
The key question is what will happen if the step fails after writing some of the elements and gets retried.
There was a problem hiding this comment.
All the files per destination are grouped into a single atomic commit. There are two things that could go wrong:
- Failure after the commit but before downstream processing, so a new transaction will try to append the same files. I verified that this is idempotent (and I included it as a unit test just to clarify).
- Some tables successfully commit but then there are enough failures that the pipeline itself fails. We probably can do a multi-table transaction. We would write the various files all to a manifest and then merge to a single thread and commit all the manifests at once. We don't do this for other sinks, do we?
There was a problem hiding this comment.
Yeah, (2) is fine. It's more about making sure that we don't double write if a work item fails. But if writing is idempotent it's simpler.
There was a problem hiding this comment.
Sorry to be late on this, I just wondering if we would not need a kind of "commit coordinator" to be sure we have one commit at a time: if we have concurrent commits, it could be problematic in Iceberg.
There was a problem hiding this comment.
I am not that familiar with the iceberg libraries. I was under the impression that the optimistic concurrency protocol was handled by them (https://iceberg.apache.org/docs/1.5.2/reliability/#concurrent-write-operations and on filesystem tables described by https://iceberg.apache.org/spec/#file-system-tables).
There was a problem hiding this comment.
Let's make sure that this is covered by unit testing.
There was a problem hiding this comment.
Done, somewhat. Could use some data generators to thoroughly test.
There was a problem hiding this comment.
Are these types not supported ?
If so we should fail instead of dropping ?
There was a problem hiding this comment.
omg yes. haha I didn't notice this. Fixed - added some more support and testing for some types, and throw for the other ones that are not yet supported. We will want to fast-follow with support, but some of the date semantics are unclear to me. (like an iceberg DATE is stored as a Long but I'm not sure exactly what it represents)
There was a problem hiding this comment.
UUID is BYTES not STRING ?
There was a problem hiding this comment.
Yea it is a Java UUID which contains a byte[].
8e4c12e to
0a0899a
Compare
settings.gradle.kts
Outdated
There was a problem hiding this comment.
It doesn't look like we add anything under "sdks:java:io:catalog".
- remove Read path (will propose separately) - re-enable checking, fix type errors - some style adjustments
kennknowles
left a comment
There was a problem hiding this comment.
Thanks for all the review!
settings.gradle.kts
Outdated
0a0899a to
a7a6515
Compare
|
Hello, could you pls kindly update the below docwith the merged implementation?
|
This is a basic Iceberg sink. Somewhat in the style of BigQuery file loads:
And how it works, roughly:
I'm a bit of an Iceberg newb. Byron did the first draft and I just refactored and added some stuff to it. This has some small tests but needs integration tests and larger tests. It is a starting point for integrating with @ahmedabu98's work on managed transforms.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.