Improve UNNEST streaming with chunked batch emission and disable chunking for recursive/staged paths#20866
Open
kosiew wants to merge 20 commits intoapache:mainfrom
Open
Improve UNNEST streaming with chunked batch emission and disable chunking for recursive/staged paths#20866kosiew wants to merge 20 commits intoapache:mainfrom
kosiew wants to merge 20 commits intoapache:mainfrom
Conversation
Drain each upstream RecordBatch in smaller contiguous row slices based on the session batch_size, preventing the full materialization of large batches at once. For recursive unnesting (depth > 1), fallback to single-row slices to maintain bounded memory use while preserving recursive semantics.
Consolidate List / LargeList / FixedSizeList dispatching into a single helper function as_list_array_type. This simplifies the code by replacing multiple match blocks with a uniform call. Any future changes to list datatypes or null semantics will now only require adjustments in as_list_array_type and its trait implementations.
Introduce new tests for unnest_chunks_high_fanout_batches: - Test handling of list columns with varying lengths. - Validate row output with nulls dropped when preserve_nulls=false. - Verify recursive unnesting works correctly under memory pressure.
Introduce PendingBatch struct to streamline UnnestStream. Encapsulate row slicing and cursor advancement in PendingBatch methods, reducing complexity and improving code readability. Replace existing fields in UnnestStream and update relevant methods to utilize the new structure.
Remove redundant scaffolding in PendingBatch within unnest.rs by integrating exhaustion checks into the existing logic. Additionally, extract common functionality into helper functions in mod.rs to reduce repetition in new unnest chunking tests, improving readability and maintainability.
Disable row-slice chunking in UnnestExec only when unnesting an internal __unnest_placeholder(...) input at depth == 1 to handle the stacked-unnest case. Retain the existing fallback for depth > 1. Added a regression test in mod.rs to validate the output order of a two-stage unnest pipeline.
Add UnnestExec::disable_chunking_for_stacked_unnest to
centralize detection logic for chunking. Introduce a helper
function is_internal_unnest_placeholder to streamline checks.
Replace inline "__unnest_placeholder(" verification in
execute with calls to the new disable_chunking method.
Simplify `PendingBatch::take_next_slice` to only manage state and slice. Introduce `next_pending_slice_row_count` to streamline policy selection, encapsulating the `disable_chunking_for_stacked_unnest` logic. Update `build_next_pending_batch` to compute row count using the new helper and pass it to `PendingBatch::take_next_slice`.
Implement a reusable function to count nodes by operator name in the physical plan. Update test `unnest_chunks_stacked_unnest_preserves_order` to build the physical plan directly from the query DataFrame. Assert the presence of exactly 2 `UnnestExec` nodes using the new helper, and reuse the same DataFrame for result collection. This change enhances test stability by removing reliance on formatted EXPLAIN output, minimizing issues with formatting.
Add a dedicated module for unnest chunking tests in datafusion/core/tests/dataframe. Move relevant helper functions and tests to a new focused file, ensuring mod.rs remains cleaner. This organizes the test suite for chunking into a more cohesive structure.
Simplify control flow in unnest.rs by inlining redundant state transitions and removing unused helpers. Reduce code duplication in unnest_chunks.rs by extracting nested-list batch construction into a single reusable helper for both recursive and stacked-unnest tests.
Add a shared unnest-placeholder marker in unnest.rs with helper functions for creation and inspection, re-exported from lib.rs. Update the planner in utils.rs to tag internal placeholder aliases with metadata during inner projection. Modify UnnestExec to check input field metadata using the shared helper instead of relying on string matching. Also, implement a unit test in unnest.rs to confirm that chunking is disabled via metadata, removing the brittle string dependency.
Include a test helper for expected placeholder aliases with metadata. Update `test_transform_bottom_unnest_recursive` to convert inner projection aliases to fields and assert `is_unnest_placeholder_field(...)` before physical planning. Also, revise existing planner test expectations to align with the new metadata-bearing placeholder aliases.
Simplify build_batch by removing the UnnestingResult wrapper and computing max_recursion more idiomatically. Flatten and sort the unnested outputs directly before regrouping by the original column index. Reduce duplicate scanning in list_unnest_at_level by creating repeat_mask in a single pass per column. Optimize flatten_struct_cols using HashSet::contains for direct checks.
UnnestExec output batches to bound memory usage and preserve stacked unnest semantics
UnnestExec now disables row-slice chunking for any recursive or staged unnest pipeline, ensuring fallback to whole-batch processing for depth > 1 or staged placeholder columns. Updated tests to reflect the new behavior and confirmed target partitions for the repeated reference regression test. Added TODO note for follow-up on issue apache#20788.
Remove duplicate input.schema() lookup in compute_properties. Tighten disable_chunking_for_recursive_or_staged_unnest() by reusing one schema handle. Move pending-batch bookkeeping into small PendingBatch helpers and collapse batch building/timing into UnnestStream::build_batch. Simplify build_batch() to early-return for no-list cases and reduce temporary variables, while keeping the recursive/staged unnest fallback and TODO(apache#20788) behavior intact.
Extract shared SQL into CREATE_RECURSIVE_UNNEST_TABLE and RECURSIVE_REPEATED_REFS_QUERY. Introduce helper functions for total_rows(...) and create_recursive_unnest_table(...). Tighten assert_total_and_max_batch_rows(...) to reuse the shared row-count helper, maintaining behavior while reducing setup and inline SQL noise.
UnnestExec output batches to bound memory usage and preserve stacked unnest semantics
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Rationale for this change
UNNESTcould buffer and expand very large intermediate results before producing output, which makesunnest + group byworkloads consume excessive memory. That behavior is especially problematic for wide, high-fanout list columns, where a relatively small input batch can explode into a much larger output batch.This change moves
UnnestExectoward a more streaming-friendly execution model by slicing input batches into smaller chunks before expansion, so output is emitted in bounded batches that better respect the configured sessionbatch_size. That reduces peak memory pressure for the core case behind #20788 while preserving correctness.The patch also addresses an important correctness concern discovered while working on chunked emission: recursive or staged/stacked
UNNESTcan be sensitive to batch boundaries, especially when placeholder columns are introduced during planning and downstream repartitioning is present. For those cases, this PR takes the conservative path and disables chunking until a parent-row-aware strategy is implemented.What changes are included in this PR?
This PR includes the following changes:
adds planner-visible metadata helpers in
datafusion_common::unnestto mark internalUNNESTplaceholder fields:UNNEST_PLACEHOLDER_METADATA_KEYunnest_placeholder_field_metadatais_unnest_placeholder_fieldannotates recursive/staged
UNNESTplaceholder projections in the SQL rewriter with this metadata usingalias_with_metadataupdates
UnnestExecto process pending input batches incrementally rather than expanding each input batch all at onceintroduces chunk selection logic that estimates per-row
UNNESTexpansion and slices input batches so emitted output batches stay near the configuredbatch_sizepreserves existing null-handling semantics when estimating chunk sizes, including
preserve_nulls = falseconservatively disables chunking for:
depth > 1)refactors some internal unnest code for clarity and maintainability, including:
as_list_array_typere-exports the new unnest metadata helpers from
datafusion_common::libAre these changes tested?
Yes.
This PR adds targeted tests covering both memory-bounded chunking behavior and the conservative fallback paths, including:
preserve_nulls = falseduring chunk-size estimationIt also updates SQL-planner tests to assert that placeholder aliases carry the expected metadata.
Are there any user-facing changes?
There are no intended SQL or API behavior changes for end users.
The main user-visible impact is improved execution behavior for some
UNNESTworkloads:UnnestExecnow emits smaller, more bounded batches in the non-recursive case, which should reduce peak memory usage for queries like the one described in #20788.This PR also introduces new public helper exports in
datafusion-commonfor internal placeholder-field metadata. They are intended primarily for planner/executor coordination rather than general end-user use.LLM-generated code disclosure
This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested.