Skip to content

Conversation

@shangxinli
Copy link
Contributor

@shangxinli shangxinli commented Oct 28, 2025

Why this change?

This implementation provides significant performance improvements for Parquet
file merging operations by eliminating serialization/deserialization overhead.
Benchmark results show 10x faster file merging compared to traditional
read-rewrite approaches.

The change leverages existing Parquet library capabilities (ParquetFileWriter
appendFile API) to perform zero-copy row-group merging, making it ideal for
compaction and maintenance operations on large Iceberg tables.

TODO: 1) Encrypted tables are not supported yet. 2) Schema evolution is not handled yet

What changed?

  • Added ParquetFileMerger class for row-group level file merging
    • Performs zero-copy merging using ParquetFileWriter.appendFile()
    • Validates schema compatibility across all input files
    • Supports merging multiple Parquet files into a single output file
  • Reuses existing Apache Parquet library functionality instead of custom implementation
  • Strict schema validation ensures data integrity during merge operations
  • Added comprehensive error handling for schema mismatches

Testing

  • Validated in staging test environment
  • Verified schema compatibility checks work correctly
  • Confirmed 13x performance improvement over traditional approach
  • Tested with various file sizes and row group configurations

  ## Why this change?

  This implementation provides significant performance improvements for Parquet
  file merging operations by eliminating serialization/deserialization overhead.
  Benchmark results show **13x faster** file merging compared to traditional
  read-rewrite approaches.

  The change leverages existing Parquet library capabilities (ParquetFileWriter
  appendFile API) to perform zero-copy row-group merging, making it ideal for
  compaction and maintenance operations on large Iceberg tables.

  TODO: 1) Encrypted tables are not supported yet. 2) Schema evolution is not handled yet

  ## What changed?

  - Added ParquetFileMerger class for row-group level file merging
    - Performs zero-copy merging using ParquetFileWriter.appendFile()
    - Validates schema compatibility across all input files
    - Supports merging multiple Parquet files into a single output file
  - Reuses existing Apache Parquet library functionality instead of custom implementation
  - Strict schema validation ensures data integrity during merge operations
  - Added comprehensive error handling for schema mismatches

  ## Testing

  - Validated in staging test environment
  - Verified schema compatibility checks work correctly
  - Confirmed 13x performance improvement over traditional approach
  - Tested with various file sizes and row group configurations
@huaxingao
Copy link
Contributor

Thanks @shangxinli for the PR! At a high level, leveraging Parquet’s appendFile for row‑group merging is the right approach and a performance win. Making it opt‑in via an action option and a table property is appropriate.

A couple of areas I’d like to discuss:

  • IO integration: Would it make sense to route IO through table.io()/OutputFileFactory rather than Hadoop IO?
  • Executor/driver split: Should executors only write files and return locations/sizes, with DataFiles (and metrics) constructed on the driver?

I’d also like to get others’ opinions. @pvary @amogh-jahagirdar @nastra @singhpk234

@pvary
Copy link
Contributor

pvary commented Nov 3, 2025

I have a few concerns here:

  • I would prefer if the decision to do the row-group level merging is done on the action level, and not leaked to the table properties
  • I would prefer to check the requirements as soon as possible and fail, or fall back with logging to the normal rewrite if the requirements are not met
  • In the planning we can create groups with the expected sizes, and in this case the runner could rewrite the whole groups, and don't need to split the planned groups to the expected file sizes
  • Always using HadoopFileIO could be problematic. The catalog might define a different FileIO implementation. We should handle the case correctly, and use the Catalog/Table provided FileIo
  • We don't reuse the ParquetFileMerger object. In this case, I usually prefer to use static methods.

@shangxinli
Copy link
Contributor Author

Thanks @huaxingao for the review and feedback! I've addressed both of your points.

  1. IO integration:
    Good catch! I've updated the implementation to use table.io() instead of hardcoding HadoopFileIO. The new approach:
  • Executors still use Hadoop Configuration for the actual Parquet file merging (since ParquetFileMerger internally uses Parquet's appendFile which requires Hadoop APIs)
  • Driver now uses table.io().newInputFile() to read metrics, which properly respects the catalog's configured FileIO implementation
  • This ensures compatibility with different storage systems (S3, GCS, Azure, custom FileIO implementations)
  1. Executor/driver split:
    I've refactored to follow the recommended pattern:
  • Executors: Only perform the file merge operation and return lightweight metadata (file path, size) via a MergeResult object
  • Driver: Receives the metadata, reads metrics using table.io(), and constructs the full DataFile objects
  • This minimizes serialization overhead and keeps heavyweight objects on the driver side

Additionally, I've addressed the architectural feedback about planning vs. execution:

  • Removed the groupFilesBySize() logic from the runner - the planner already creates appropriately-sized groups
  • Runner now merges the entire file group into a single output file without further splitting
  • This creates a cleaner separation where planning happens in the planner and execution happens in the runner

Let me know if there are any other concerns or improvements you'd like to see!

@shangxinli
Copy link
Contributor Author

Thanks @pvary for the detailed feedback! I've addressed your points:

  1. Decision at action level, not table properties:
    Done - Removed the PARQUET_USE_FILE_MERGER from TableProperties entirely and stripped out the fallback logic. Now it only checks the action options.

  2. Early validation with proper fallback:
    Done - Flipped the logic around to validate upfront with canUseMerger() before attempting the merge. Also beefed up the validation to actually check schema compatibility, not just file format. If anything fails, it logs and falls back to the standard rewrite.

  3. Planning creates expected sizes, runner doesn't split:
    Done - Nuked the whole groupFilesBySize() method. The runner now just merges whatever the planner gave it into a single file - no more re-grouping.

  4. Use Catalog/Table FileIO instead of HadoopFileIO:
    Done - Removed HadoopFileIO completely. Executors now just return path + size, and the driver reads metrics using table().io() which respects whatever FileIO the catalog configured.

  5. Static methods instead of object creation:
    Done - Converted ParquetFileMerger to a utility class with private constructor and all static methods. No more new ParquetFileMerger() calls anywhere.

parquetOutputFile,
schema,
ParquetFileWriter.Mode.CREATE,
ParquetWriter.DEFAULT_BLOCK_SIZE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not very sure about this part. Should it be directly fixed to ParquetWriter.DEFAULT_BLOCK_SIZE, or does it need to be linked with the table property write.parquet.row-group-size-bytes?

Copy link
Contributor Author

@shangxinli shangxinli Nov 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel using the property gives more flexibility for rewriter. But like to hear other's thoughts.

@Guosmilesmile
Copy link
Contributor

Guosmilesmile commented Nov 12, 2025

Thanks for the PR ! I have a question about the lineage, If the merging is only performed at the parquet layer, will the lineage information of the v3 table be disrupted?

@shangxinli
Copy link
Contributor Author

Thanks for the PR ! I have a question about the lineage, If the merging is only performed at the parquet layer, will the lineage information of the v3 table be disrupted?

Good question! The lineage information for v3 tables is preserved in two ways:

  1. Field IDs (Schema Lineage)

Field IDs are preserved because we strictly enforce identical schemas across all files being merged.

In ParquetFileMerger.java:130-136, we validate that all input files have exactly the same Parquet MessageType schema:

if (!schema.equals(currentSchema)) {
throw new IllegalArgumentException(
String.format("Schema mismatch detected: file '%s' has schema %s but file '%s' has schema %s. "
+ "All files must have identical Parquet schemas for row-group level merging.", ...));
}

Field IDs are stored directly in the Parquet schema structure itself (via Type.getId()), so when we copy row groups using ParquetFileWriter.appendFile() with the validated schema, all field IDs are preserved.

  1. Row IDs (Row Lineage for v3+)

Row IDs are automatically assigned by Iceberg's commit framework - we don't need special handling in the merger.

Here's how it works:

  1. Our code creates DataFile objects with metrics (including recordCount) but without firstRowId - see SparkParquetFileMergeRunner.java:236-243
  2. During commit, SnapshotProducer creates a ManifestListWriter initialized with base.nextRowId() (the table's current row ID counter) - see SnapshotProducer.java:273
  3. ManifestListWriter.prepare() automatically assigns firstRowId to each manifest and increments the counter by the number of rows - see ManifestListWriter.java:136-140:
    // assign first-row-id and update the next to assign
    wrapper.wrap(manifest, nextRowId);
    this.nextRowId += manifest.existingRowsCount() + manifest.addedRowsCount();
  4. The snapshot is committed with the updated nextRowId, ensuring all row IDs are correctly tracked

This is the same mechanism used by all Iceberg write operations, so row lineage is fully preserved for v3 tables.

@shangxinli
Copy link
Contributor Author

shangxinli commented Dec 21, 2025

@shangxinli: Here is a bit of refactor to fix warnings, and such: pvary@884ebea

Could you please take a look, and apply which is OK with you, and we can talk about the others?

Refactored: 5339589

- Add static constants for row lineage column types and descriptors
- Extract writer() helper method to reduce code duplication
- Rename methods for clarity:
  - mergeFilesWithSchema -> binaryMerge
  - mergeFilesWithRowLineageAndSchema -> generateRowLineageAndMerge
  - validateRowLineageColumnsHaveNoNulls -> allRowLineageColumnsNonNull
- Use Statistics builder instead of manual LongStatistics construction
- Add try-with-resources for ValuesWriter
- Fix encoding parameters: BIT_PACKED -> RLE for definition/repetition levels
- Change dataFile.path().toString() to dataFile.location()
- Use Java 9 try-with-resources syntax for readSchema()
- Remove redundant comments and simplify JavaDoc
- Remove InternalFileEncryptor import (no longer needed)
- Add BytesUtils import for Statistics builder

Minor formatting update

Fix spotless formatting
@shangxinli shangxinli force-pushed the rewrite_data_files2 branch 2 times, most recently from 5783738 to 5962e74 Compare December 21, 2025 00:39
@shangxinli
Copy link
Contributor Author

shangxinli commented Dec 21, 2025

I have seen this: #14853 - Maybe we can't use RLE? What is the normal Parquet writer using?

We should use RLE because it's the standard encoding for definition/repetition levels in Apache Parquet.

  • In the parquet document, BIT_PACKED is "deprecated and will be replaced by the RLE/bit-packing hybrid encoding"
  • In parquet code, we create RunLengthBitPackingHybridValuesWriter for def/rep levels, in which getEncoding() returns Encoding.RLE

Our code (ParquetFileMerger.java:537-539):

  • Line 537-538: Encoding.RLE for definition/repetition levels
  • Line 539: Encoding.DELTA_BINARY_PACKED for data values

PR #14853 is about reading RLE-encoded data pages, not def/rep levels. Since we use DELTA_BINARY_PACKED for data (line 539), that PR doesn't affect us.

…utor

Pass the actual RewriteFileGroup task object to the executor instead of
extracting parameters on the driver and passing them individually. This
makes the code more semantic and provides access to task metadata on the
executor if needed in the future.

- Change RDD type from JavaRDD<Integer> to JavaRDD<RewriteFileGroup>
- Pass group object instead of dummy integer to parallelize()
- Rename lambda parameter from 'ignored' to 'task' for clarity
- Simplify mergeFilesForTask signature (7 params -> 5 params)
- Extract dataFiles, spec, and partition from task on executor

Address review feedback on ParquetFileMerger

- Improve JavaDoc wording for canMergeAndGetSchema
- Add blank line before return statement in allRowLineageColumnsNonNull
}

private static MessageType readSchema(InputFile inputFile) throws IOException {
ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(inputFile));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we try to inline the reader within the try-with-resources block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

* <ul>
* <li>All files must have compatible schemas (identical {@link MessageType})
* <li>Files must not be encrypted
* <li>Files must not have associated delete files or delete vectors
Copy link
Contributor

@pvary pvary Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have just realized that we don't check this here. This is checked in the Spark code, please remove it form here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

* <li>Files must not be encrypted
* <li>Files must not have associated delete files or delete vectors
* <li>All files have the same partition spec
* <li>Table must not have a sort order (including z-ordered tables)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also just checked in the Spark code, so please remove it from here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

* Utility class for performing strict schema validation and merging of Parquet files at the
* row-group level.
*/
public class ParquetFileMerger {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please describe me why we order the methods in this file, as we do it?

In several comment I have mentioned that please make sure that we follow some logic here.
Examples:

  • Public methods first
  • package private methods second
  • private methods at the end

Or:

  • one public method first
  • private methods which are used only with this public method
  • another public method
  • private methods which are used only with this public method
  • private methods which are used by both public methods.

I would prefer the first, or migth be accept the one you have chosen, but I don't understand the logic as it stands.

for (int i = 0; i < 100; i++) {
records.add(createRecord(i, "data" + i));
}
createParquetFileWithData(output1, SCHEMA, records, 1024); // Small row group size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

MessageType mergedSchema;
try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(mergedInput))) {
mergedSchema = reader.getFooter().getFileMetaData().getSchema();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

long rowGroupSize,
String compression)
throws IOException {
var writerBuilder =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use var in the code for now. Please set the correct return type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

writerBuilder.set(TableProperties.PARQUET_COMPRESSION, compression);
}

var writer = writerBuilder.overwrite().build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use var

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

}

@TestTemplate
public void testBinPackUsesCorrectRunnerBasedOnOption() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to run these tests 2 times?
I think here we should use assume to run these tests only if the useParquetFileMerger is true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion

}

@TestTemplate
public void testParquetFileMergerExplicitlyEnabledAndDisabled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to run these tests 2 times?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, we should split this tests, or maybe remove it as other test cases are already testing this based on the useParquetFileMerger parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok to remove

verifyInitialVirtualRowIds(binpackTable);
long binpackCountBefore = currentData().size();

RewriteDataFiles.Result binpackResult =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we split these tests based on useParquetFileMerger? Why do we cram USE_PARQUET_ROW_GROUP_MERGE true and false in the same test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is to compare both approach have the same results. Added assume statement to let it run only once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use useParquetFileMerger

public void testParquetFileMergerPreservesPhysicalRowIds() throws IOException {
// Test scenario 2: Tables with physical _row_id column
// After merging, the physical _row_id should be preserved (not changed)
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want this test run 2 times based on useParquetFileMerger? Shall we just use an assume and run only with useParquetFileMerger set to true?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or if we decide it is worth to test for the useParquetFileMerger false case as well, we could just set USE_PARQUET_ROW_GROUP_MERGE based on useParquetFileMerger

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use useParquetFileMerger

// Merge using ParquetFileMerger - should handle all partitions correctly
RewriteDataFiles.Result result =
basicRewrite(table)
.option(RewriteDataFiles.USE_PARQUET_ROW_GROUP_MERGE, "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please think through all of the cases where we explicitly set USE_PARQUET_ROW_GROUP_MERGE. Should we set it based on useParquetFileMerger? If we don't want to set it based on useParquetFileMerger, do we need to run the tests 2 times with useParquetFileMerger true and false as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. let's use useParquetFileMerger

Comment on lines 63 to 97
@Test
public void testDescriptionReturnsParquetMerge() {
Table table = TABLES.create(SCHEMA, tableLocation);
SparkParquetFileMergeRunner runner = new SparkParquetFileMergeRunner(spark, table);

assertThat(runner.description()).isEqualTo("PARQUET-MERGE");
}

@Test
public void testInheritsFromSparkBinPackFileRewriteRunner() {
Table table = TABLES.create(SCHEMA, tableLocation);
SparkParquetFileMergeRunner runner = new SparkParquetFileMergeRunner(spark, table);

// Verify inheritance
assertThat(runner).isInstanceOf(SparkBinPackFileRewriteRunner.class);
}

@Test
public void testValidOptionsInheritedFromParent() {
Table table = TABLES.create(SCHEMA, tableLocation);
SparkParquetFileMergeRunner runner = new SparkParquetFileMergeRunner(spark, table);

// Should inherit validOptions from parent
Set<String> validOptions = runner.validOptions();
assertThat(validOptions).isNotNull();
}

@Test
public void testInitMethodInheritedFromParent() {
Table table = TABLES.create(SCHEMA, tableLocation);
SparkParquetFileMergeRunner runner = new SparkParquetFileMergeRunner(spark, table);

// Should not throw exception when init is called
runner.init(Collections.emptyMap());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need these tests?
Do we have similar tests for other Runners?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, not much value. It was firstly added for initial coding.

Comment on lines 124 to 133
// This validation would normally be done in SparkParquetFileMergeRunner.canMergeAndGetSchema
// but we're testing the sort order check that happens before calling ParquetFileMerger
// Since table has sort order, validation should fail early
if (table.sortOrder().isSorted()) {
// Should fail due to sort order
assertThat(true).isTrue();
} else {
// If we got here, the sort order check didn't work
assertThat(false).isTrue();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get this.
What do we test here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

Comment on lines 152 to 157
// This validation would normally be done in SparkParquetFileMergeRunner.canMergeAndGetSchema
// but we're testing the delete file check that happens before calling ParquetFileMerger
boolean hasDeletes = group.fileScanTasks().stream().anyMatch(task -> !task.deletes().isEmpty());

// Should be true because files have delete files
assertThat(hasDeletes).isTrue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again. What do we test here?
Which part of our code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

}

@Test
public void testCanMergeAndGetSchemaReturnsFalseForSortedTable() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect that these code should call canMergeAndGetSchema somewhere, but they don't. What is the goal of these tests?

@pvary
Copy link
Contributor

pvary commented Jan 6, 2026

@shangxinli: Back from the holidays! I'm fairly happy with the production part of the code. Left a few small comments. I'm happy that @SourabhBadhya is checking the Sparks side.

There are still a few questions wrt the tests. Please check them if you have time.

Thanks, and happy new year 🎉 !

@shangxinli shangxinli force-pushed the rewrite_data_files2 branch from 00df946 to 45b0197 Compare January 7, 2026 03:06
1. Remove Spark-specific javadoc constraints from ParquetFileMerger
   - Removed "Files must not have associated delete files" constraint
   - Removed "Table must not have a sort order" constraint
   - These validations are only enforced in SparkParquetFileMergeRunner,
     not in the ParquetFileMerger class itself

2. Fix code style in TestParquetFileMerger
   - Replace 'var' with explicit types (Parquet.DataWriteBuilder, DataWriter<Record>)
   - Add newlines after for loop and try-catch blocks for better readability
   - Remove unused Parquet import

3. Optimize test execution in TestRewriteDataFilesAction
   - Add assumeThat for comparison tests to run once instead of twice
   - Use String.valueOf(useParquetFileMerger) for regular tests to test both approaches
   - Remove redundant testParquetFileMergerExplicitlyEnabledAndDisabled test

4. Fix TestSparkParquetFileMergeRunner to actually call canMergeAndGetSchema
   - Changed canMergeAndGetSchema from private to package-private in SparkParquetFileMergeRunner
   - Updated all tests to create runner instance and call canMergeAndGetSchema()
   - Removed 4 trivial tests (description, inheritance, validOptions, init)
   - All remaining tests now validate actual canMergeAndGetSchema behavior
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants