-
Notifications
You must be signed in to change notification settings - Fork 3k
Add ParquetFileMerger for efficient row-group level file merging #14435
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
c42af16 to
6bfe165
Compare
## Why this change?
This implementation provides significant performance improvements for Parquet
file merging operations by eliminating serialization/deserialization overhead.
Benchmark results show **13x faster** file merging compared to traditional
read-rewrite approaches.
The change leverages existing Parquet library capabilities (ParquetFileWriter
appendFile API) to perform zero-copy row-group merging, making it ideal for
compaction and maintenance operations on large Iceberg tables.
TODO: 1) Encrypted tables are not supported yet. 2) Schema evolution is not handled yet
## What changed?
- Added ParquetFileMerger class for row-group level file merging
- Performs zero-copy merging using ParquetFileWriter.appendFile()
- Validates schema compatibility across all input files
- Supports merging multiple Parquet files into a single output file
- Reuses existing Apache Parquet library functionality instead of custom implementation
- Strict schema validation ensures data integrity during merge operations
- Added comprehensive error handling for schema mismatches
## Testing
- Validated in staging test environment
- Verified schema compatibility checks work correctly
- Confirmed 13x performance improvement over traditional approach
- Tested with various file sizes and row group configurations
7be3ef0 to
7f2d5b0
Compare
|
Thanks @shangxinli for the PR! At a high level, leveraging Parquet’s appendFile for row‑group merging is the right approach and a performance win. Making it opt‑in via an action option and a table property is appropriate. A couple of areas I’d like to discuss:
I’d also like to get others’ opinions. @pvary @amogh-jahagirdar @nastra @singhpk234 |
|
I have a few concerns here:
|
cae2d00 to
fa1d073
Compare
|
Thanks @huaxingao for the review and feedback! I've addressed both of your points.
Additionally, I've addressed the architectural feedback about planning vs. execution:
Let me know if there are any other concerns or improvements you'd like to see! |
|
Thanks @pvary for the detailed feedback! I've addressed your points:
|
...k/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/SparkParquetFileMergeRunner.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/ParquetFileMerger.java
Outdated
Show resolved
Hide resolved
8d6abd0 to
7a34353
Compare
api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/ParquetFileMerger.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/ParquetFileMerger.java
Outdated
Show resolved
Hide resolved
...k/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/SparkParquetFileMergeRunner.java
Outdated
Show resolved
Hide resolved
...k/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/SparkParquetFileMergeRunner.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/ParquetFileMerger.java
Outdated
Show resolved
Hide resolved
| parquetOutputFile, | ||
| schema, | ||
| ParquetFileWriter.Mode.CREATE, | ||
| ParquetWriter.DEFAULT_BLOCK_SIZE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not very sure about this part. Should it be directly fixed to ParquetWriter.DEFAULT_BLOCK_SIZE, or does it need to be linked with the table property write.parquet.row-group-size-bytes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel using the property gives more flexibility for rewriter. But like to hear other's thoughts.
|
Thanks for the PR ! I have a question about the lineage, If the merging is only performed at the parquet layer, will the lineage information of the v3 table be disrupted? |
Good question! The lineage information for v3 tables is preserved in two ways:
Field IDs are preserved because we strictly enforce identical schemas across all files being merged. In ParquetFileMerger.java:130-136, we validate that all input files have exactly the same Parquet MessageType schema: if (!schema.equals(currentSchema)) { Field IDs are stored directly in the Parquet schema structure itself (via Type.getId()), so when we copy row groups using ParquetFileWriter.appendFile() with the validated schema, all field IDs are preserved.
Row IDs are automatically assigned by Iceberg's commit framework - we don't need special handling in the merger. Here's how it works:
This is the same mechanism used by all Iceberg write operations, so row lineage is fully preserved for v3 tables. |
a32947b to
e6595f3
Compare
Refactored: 5339589 |
5339589 to
e550da4
Compare
- Add static constants for row lineage column types and descriptors - Extract writer() helper method to reduce code duplication - Rename methods for clarity: - mergeFilesWithSchema -> binaryMerge - mergeFilesWithRowLineageAndSchema -> generateRowLineageAndMerge - validateRowLineageColumnsHaveNoNulls -> allRowLineageColumnsNonNull - Use Statistics builder instead of manual LongStatistics construction - Add try-with-resources for ValuesWriter - Fix encoding parameters: BIT_PACKED -> RLE for definition/repetition levels - Change dataFile.path().toString() to dataFile.location() - Use Java 9 try-with-resources syntax for readSchema() - Remove redundant comments and simplify JavaDoc - Remove InternalFileEncryptor import (no longer needed) - Add BytesUtils import for Statistics builder Minor formatting update Fix spotless formatting
5783738 to
5962e74
Compare
We should use RLE because it's the standard encoding for definition/repetition levels in Apache Parquet.
Our code (ParquetFileMerger.java:537-539):
PR #14853 is about reading RLE-encoded data pages, not def/rep levels. Since we use DELTA_BINARY_PACKED for data (line 539), that PR doesn't affect us. |
ae4f8ea to
9a9eb39
Compare
…utor Pass the actual RewriteFileGroup task object to the executor instead of extracting parameters on the driver and passing them individually. This makes the code more semantic and provides access to task metadata on the executor if needed in the future. - Change RDD type from JavaRDD<Integer> to JavaRDD<RewriteFileGroup> - Pass group object instead of dummy integer to parallelize() - Rename lambda parameter from 'ignored' to 'task' for clarity - Simplify mergeFilesForTask signature (7 params -> 5 params) - Extract dataFiles, spec, and partition from task on executor Address review feedback on ParquetFileMerger - Improve JavaDoc wording for canMergeAndGetSchema - Add blank line before return statement in allRowLineageColumnsNonNull
9a9eb39 to
2eca995
Compare
| } | ||
|
|
||
| private static MessageType readSchema(InputFile inputFile) throws IOException { | ||
| ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(inputFile)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we try to inline the reader within the try-with-resources block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
| * <ul> | ||
| * <li>All files must have compatible schemas (identical {@link MessageType}) | ||
| * <li>Files must not be encrypted | ||
| * <li>Files must not have associated delete files or delete vectors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have just realized that we don't check this here. This is checked in the Spark code, please remove it form here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
| * <li>Files must not be encrypted | ||
| * <li>Files must not have associated delete files or delete vectors | ||
| * <li>All files have the same partition spec | ||
| * <li>Table must not have a sort order (including z-ordered tables) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also just checked in the Spark code, so please remove it from here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
| * Utility class for performing strict schema validation and merging of Parquet files at the | ||
| * row-group level. | ||
| */ | ||
| public class ParquetFileMerger { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please describe me why we order the methods in this file, as we do it?
In several comment I have mentioned that please make sure that we follow some logic here.
Examples:
- Public methods first
- package private methods second
- private methods at the end
Or:
- one public method first
- private methods which are used only with this public method
- another public method
- private methods which are used only with this public method
- private methods which are used by both public methods.
I would prefer the first, or migth be accept the one you have chosen, but I don't understand the logic as it stands.
| for (int i = 0; i < 100; i++) { | ||
| records.add(createRecord(i, "data" + i)); | ||
| } | ||
| createParquetFileWithData(output1, SCHEMA, records, 1024); // Small row group size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
| MessageType mergedSchema; | ||
| try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(mergedInput))) { | ||
| mergedSchema = reader.getFooter().getFileMetaData().getSchema(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
| long rowGroupSize, | ||
| String compression) | ||
| throws IOException { | ||
| var writerBuilder = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't use var in the code for now. Please set the correct return type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
| writerBuilder.set(TableProperties.PARQUET_COMPRESSION, compression); | ||
| } | ||
|
|
||
| var writer = writerBuilder.overwrite().build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't use var
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
| } | ||
|
|
||
| @TestTemplate | ||
| public void testBinPackUsesCorrectRunnerBasedOnOption() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to run these tests 2 times?
I think here we should use assume to run these tests only if the useParquetFileMerger is true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion
| } | ||
|
|
||
| @TestTemplate | ||
| public void testParquetFileMergerExplicitlyEnabledAndDisabled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to run these tests 2 times?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, we should split this tests, or maybe remove it as other test cases are already testing this based on the useParquetFileMerger parameter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok to remove
| verifyInitialVirtualRowIds(binpackTable); | ||
| long binpackCountBefore = currentData().size(); | ||
|
|
||
| RewriteDataFiles.Result binpackResult = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we split these tests based on useParquetFileMerger? Why do we cram USE_PARQUET_ROW_GROUP_MERGE true and false in the same test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is to compare both approach have the same results. Added assume statement to let it run only once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use useParquetFileMerger
| public void testParquetFileMergerPreservesPhysicalRowIds() throws IOException { | ||
| // Test scenario 2: Tables with physical _row_id column | ||
| // After merging, the physical _row_id should be preserved (not changed) | ||
| assumeThat(formatVersion).isGreaterThanOrEqualTo(3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want this test run 2 times based on useParquetFileMerger? Shall we just use an assume and run only with useParquetFileMerger set to true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or if we decide it is worth to test for the useParquetFileMerger false case as well, we could just set USE_PARQUET_ROW_GROUP_MERGE based on useParquetFileMerger
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use useParquetFileMerger
| // Merge using ParquetFileMerger - should handle all partitions correctly | ||
| RewriteDataFiles.Result result = | ||
| basicRewrite(table) | ||
| .option(RewriteDataFiles.USE_PARQUET_ROW_GROUP_MERGE, "true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please think through all of the cases where we explicitly set USE_PARQUET_ROW_GROUP_MERGE. Should we set it based on useParquetFileMerger? If we don't want to set it based on useParquetFileMerger, do we need to run the tests 2 times with useParquetFileMerger true and false as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. let's use useParquetFileMerger
| @Test | ||
| public void testDescriptionReturnsParquetMerge() { | ||
| Table table = TABLES.create(SCHEMA, tableLocation); | ||
| SparkParquetFileMergeRunner runner = new SparkParquetFileMergeRunner(spark, table); | ||
|
|
||
| assertThat(runner.description()).isEqualTo("PARQUET-MERGE"); | ||
| } | ||
|
|
||
| @Test | ||
| public void testInheritsFromSparkBinPackFileRewriteRunner() { | ||
| Table table = TABLES.create(SCHEMA, tableLocation); | ||
| SparkParquetFileMergeRunner runner = new SparkParquetFileMergeRunner(spark, table); | ||
|
|
||
| // Verify inheritance | ||
| assertThat(runner).isInstanceOf(SparkBinPackFileRewriteRunner.class); | ||
| } | ||
|
|
||
| @Test | ||
| public void testValidOptionsInheritedFromParent() { | ||
| Table table = TABLES.create(SCHEMA, tableLocation); | ||
| SparkParquetFileMergeRunner runner = new SparkParquetFileMergeRunner(spark, table); | ||
|
|
||
| // Should inherit validOptions from parent | ||
| Set<String> validOptions = runner.validOptions(); | ||
| assertThat(validOptions).isNotNull(); | ||
| } | ||
|
|
||
| @Test | ||
| public void testInitMethodInheritedFromParent() { | ||
| Table table = TABLES.create(SCHEMA, tableLocation); | ||
| SparkParquetFileMergeRunner runner = new SparkParquetFileMergeRunner(spark, table); | ||
|
|
||
| // Should not throw exception when init is called | ||
| runner.init(Collections.emptyMap()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need these tests?
Do we have similar tests for other Runners?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, not much value. It was firstly added for initial coding.
| // This validation would normally be done in SparkParquetFileMergeRunner.canMergeAndGetSchema | ||
| // but we're testing the sort order check that happens before calling ParquetFileMerger | ||
| // Since table has sort order, validation should fail early | ||
| if (table.sortOrder().isSorted()) { | ||
| // Should fail due to sort order | ||
| assertThat(true).isTrue(); | ||
| } else { | ||
| // If we got here, the sort order check didn't work | ||
| assertThat(false).isTrue(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get this.
What do we test here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
| // This validation would normally be done in SparkParquetFileMergeRunner.canMergeAndGetSchema | ||
| // but we're testing the delete file check that happens before calling ParquetFileMerger | ||
| boolean hasDeletes = group.fileScanTasks().stream().anyMatch(task -> !task.deletes().isEmpty()); | ||
|
|
||
| // Should be true because files have delete files | ||
| assertThat(hasDeletes).isTrue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again. What do we test here?
Which part of our code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
| } | ||
|
|
||
| @Test | ||
| public void testCanMergeAndGetSchemaReturnsFalseForSortedTable() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect that these code should call canMergeAndGetSchema somewhere, but they don't. What is the goal of these tests?
|
@shangxinli: Back from the holidays! I'm fairly happy with the production part of the code. Left a few small comments. I'm happy that @SourabhBadhya is checking the Sparks side. There are still a few questions wrt the tests. Please check them if you have time. Thanks, and happy new year 🎉 ! |
00df946 to
45b0197
Compare
1. Remove Spark-specific javadoc constraints from ParquetFileMerger
- Removed "Files must not have associated delete files" constraint
- Removed "Table must not have a sort order" constraint
- These validations are only enforced in SparkParquetFileMergeRunner,
not in the ParquetFileMerger class itself
2. Fix code style in TestParquetFileMerger
- Replace 'var' with explicit types (Parquet.DataWriteBuilder, DataWriter<Record>)
- Add newlines after for loop and try-catch blocks for better readability
- Remove unused Parquet import
3. Optimize test execution in TestRewriteDataFilesAction
- Add assumeThat for comparison tests to run once instead of twice
- Use String.valueOf(useParquetFileMerger) for regular tests to test both approaches
- Remove redundant testParquetFileMergerExplicitlyEnabledAndDisabled test
4. Fix TestSparkParquetFileMergeRunner to actually call canMergeAndGetSchema
- Changed canMergeAndGetSchema from private to package-private in SparkParquetFileMergeRunner
- Updated all tests to create runner instance and call canMergeAndGetSchema()
- Removed 4 trivial tests (description, inheritance, validOptions, init)
- All remaining tests now validate actual canMergeAndGetSchema behavior
Why this change?
This implementation provides significant performance improvements for Parquet
file merging operations by eliminating serialization/deserialization overhead.
Benchmark results show 10x faster file merging compared to traditional
read-rewrite approaches.
The change leverages existing Parquet library capabilities (ParquetFileWriter
appendFile API) to perform zero-copy row-group merging, making it ideal for
compaction and maintenance operations on large Iceberg tables.
TODO: 1) Encrypted tables are not supported yet. 2) Schema evolution is not handled yet
What changed?
Testing