Skip to content

Improve UNNEST streaming with chunked batch emission and disable chunking for recursive/staged paths#20866

Open
kosiew wants to merge 20 commits intoapache:mainfrom
kosiew:unnest-01-20788
Open

Improve UNNEST streaming with chunked batch emission and disable chunking for recursive/staged paths#20866
kosiew wants to merge 20 commits intoapache:mainfrom
kosiew:unnest-01-20788

Conversation

@kosiew
Copy link
Contributor

@kosiew kosiew commented Mar 11, 2026

Which issue does this PR close?

Rationale for this change

UNNEST could buffer and expand very large intermediate results before producing output, which makes unnest + group by workloads consume excessive memory. That behavior is especially problematic for wide, high-fanout list columns, where a relatively small input batch can explode into a much larger output batch.

This change moves UnnestExec toward a more streaming-friendly execution model by slicing input batches into smaller chunks before expansion, so output is emitted in bounded batches that better respect the configured session batch_size. That reduces peak memory pressure for the core case behind #20788 while preserving correctness.

The patch also addresses an important correctness concern discovered while working on chunked emission: recursive or staged/stacked UNNEST can be sensitive to batch boundaries, especially when placeholder columns are introduced during planning and downstream repartitioning is present. For those cases, this PR takes the conservative path and disables chunking until a parent-row-aware strategy is implemented.

What changes are included in this PR?

This PR includes the following changes:

  • adds planner-visible metadata helpers in datafusion_common::unnest to mark internal UNNEST placeholder fields:

    • UNNEST_PLACEHOLDER_METADATA_KEY
    • unnest_placeholder_field_metadata
    • is_unnest_placeholder_field
  • annotates recursive/staged UNNEST placeholder projections in the SQL rewriter with this metadata using alias_with_metadata

  • updates UnnestExec to process pending input batches incrementally rather than expanding each input batch all at once

  • introduces chunk selection logic that estimates per-row UNNEST expansion and slices input batches so emitted output batches stay near the configured batch_size

  • preserves existing null-handling semantics when estimating chunk sizes, including preserve_nulls = false

  • conservatively disables chunking for:

    • recursive unnest (depth > 1)
    • staged/stacked unnest inputs identified via placeholder-field metadata
  • refactors some internal unnest code for clarity and maintainability, including:

    • centralizing list-array dispatch in as_list_array_type
    • simplifying struct flattening checks
    • simplifying ordering/reconstruction of multiply-unnested outputs
  • re-exports the new unnest metadata helpers from datafusion_common::lib

Are these changes tested?

Yes.

This PR adds targeted tests covering both memory-bounded chunking behavior and the conservative fallback paths, including:

  • high-fanout unnest producing multiple bounded output batches
  • multi-column unnest where per-row output size is driven by the max list length across columns
  • preserve_nulls = false during chunk-size estimation
  • recursive unnest correctness under small batch sizes
  • stacked/staged unnest preserving order
  • repeated recursive unnest references in plans that include repartitioning
  • unit tests verifying chunking is disabled for recursive and placeholder-marked staged inputs

It also updates SQL-planner tests to assert that placeholder aliases carry the expected metadata.

Are there any user-facing changes?

There are no intended SQL or API behavior changes for end users.

The main user-visible impact is improved execution behavior for some UNNEST workloads: UnnestExec now emits smaller, more bounded batches in the non-recursive case, which should reduce peak memory usage for queries like the one described in #20788.

This PR also introduces new public helper exports in datafusion-common for internal placeholder-field metadata. They are intended primarily for planner/executor coordination rather than general end-user use.

LLM-generated code disclosure

This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested.

kosiew added 5 commits March 11, 2026 19:56
Drain each upstream RecordBatch in smaller contiguous row slices
based on the session batch_size, preventing the full materialization
of large batches at once. For recursive unnesting (depth > 1),
fallback to single-row slices to maintain bounded memory use
while preserving recursive semantics.
Consolidate List / LargeList / FixedSizeList dispatching
into a single helper function as_list_array_type. This
simplifies the code by replacing multiple match blocks
with a uniform call. Any future changes to list datatypes
or null semantics will now only require adjustments in
as_list_array_type and its trait implementations.
Introduce new tests for unnest_chunks_high_fanout_batches:
- Test handling of list columns with varying lengths.
- Validate row output with nulls dropped when preserve_nulls=false.
- Verify recursive unnesting works correctly under memory pressure.
Introduce PendingBatch struct to streamline UnnestStream.
Encapsulate row slicing and cursor advancement in
PendingBatch methods, reducing complexity and improving
code readability. Replace existing fields in UnnestStream
and update relevant methods to utilize the new structure.
Remove redundant scaffolding in PendingBatch within unnest.rs by
integrating exhaustion checks into the existing logic. Additionally,
extract common functionality into helper functions in mod.rs to
reduce repetition in new unnest chunking tests, improving
readability and maintainability.
@github-actions github-actions bot added core Core DataFusion crate physical-plan Changes to the physical-plan crate labels Mar 11, 2026
kosiew added 9 commits March 11, 2026 22:34
Disable row-slice chunking in UnnestExec only when
unnesting an internal __unnest_placeholder(...) input
at depth == 1 to handle the stacked-unnest case.
Retain the existing fallback for depth > 1. Added
a regression test in mod.rs to validate the output
order of a two-stage unnest pipeline.
Add UnnestExec::disable_chunking_for_stacked_unnest to
centralize detection logic for chunking. Introduce a helper
function is_internal_unnest_placeholder to streamline checks.
Replace inline "__unnest_placeholder(" verification in
execute with calls to the new disable_chunking method.
Simplify `PendingBatch::take_next_slice` to only manage state and slice.
Introduce `next_pending_slice_row_count` to streamline policy selection,
encapsulating the `disable_chunking_for_stacked_unnest` logic.
Update `build_next_pending_batch` to compute row count using the
new helper and pass it to `PendingBatch::take_next_slice`.
Implement a reusable function to count nodes by operator name
in the physical plan. Update test `unnest_chunks_stacked_unnest_preserves_order`
to build the physical plan directly from the query DataFrame.
Assert the presence of exactly 2 `UnnestExec` nodes using the new
helper, and reuse the same DataFrame for result collection.
This change enhances test stability by removing reliance on
formatted EXPLAIN output, minimizing issues with formatting.
Add a dedicated module for unnest chunking tests in
datafusion/core/tests/dataframe. Move relevant helper
functions and tests to a new focused file, ensuring
mod.rs remains cleaner. This organizes the test suite
for chunking into a more cohesive structure.
Simplify control flow in unnest.rs by inlining
redundant state transitions and removing unused helpers.
Reduce code duplication in unnest_chunks.rs by
extracting nested-list batch construction into a
single reusable helper for both recursive and
stacked-unnest tests.
Add a shared unnest-placeholder marker in unnest.rs with
helper functions for creation and inspection, re-exported from
lib.rs. Update the planner in utils.rs to tag internal
placeholder aliases with metadata during inner projection.
Modify UnnestExec to check input field metadata using the
shared helper instead of relying on string matching.
Also, implement a unit test in unnest.rs to confirm that
chunking is disabled via metadata, removing the brittle
string dependency.
Include a test helper for expected placeholder aliases with
metadata. Update `test_transform_bottom_unnest_recursive`
to convert inner projection aliases to fields and assert
`is_unnest_placeholder_field(...)` before physical planning.
Also, revise existing planner test expectations to align with
the new metadata-bearing placeholder aliases.
Simplify build_batch by removing the UnnestingResult wrapper and
computing max_recursion more idiomatically. Flatten and sort the
unnested outputs directly before regrouping by the original column
index.

Reduce duplicate scanning in list_unnest_at_level by creating
repeat_mask in a single pass per column. Optimize
flatten_struct_cols using HashSet::contains for direct checks.
@github-actions github-actions bot added sql SQL Planner common Related to common crate labels Mar 12, 2026
@kosiew kosiew changed the title Chunk unnest output by batch size to reduce memory growth Chunk UnnestExec output batches to bound memory usage and preserve stacked unnest semantics Mar 12, 2026
kosiew added 4 commits March 12, 2026 13:49
UnnestExec now disables row-slice chunking for any recursive or
staged unnest pipeline, ensuring fallback to whole-batch processing
for depth > 1 or staged placeholder columns. Updated tests to reflect
the new behavior and confirmed target partitions for the repeated
reference regression test. Added TODO note for follow-up on issue
apache#20788.
Remove duplicate input.schema() lookup in compute_properties.
Tighten disable_chunking_for_recursive_or_staged_unnest() by
reusing one schema handle. Move pending-batch bookkeeping into
small PendingBatch helpers and collapse batch building/timing
into UnnestStream::build_batch. Simplify build_batch() to
early-return for no-list cases and reduce temporary variables,
while keeping the recursive/staged unnest fallback and
TODO(apache#20788) behavior intact.
Extract shared SQL into CREATE_RECURSIVE_UNNEST_TABLE and
RECURSIVE_REPEATED_REFS_QUERY. Introduce helper functions for
total_rows(...) and create_recursive_unnest_table(...).

Tighten assert_total_and_max_batch_rows(...) to reuse the
shared row-count helper, maintaining behavior while reducing
setup and inline SQL noise.
@kosiew kosiew changed the title Chunk UnnestExec output batches to bound memory usage and preserve stacked unnest semantics Improve UNNEST streaming with chunked batch emission and disable chunking for recursive/staged paths Mar 12, 2026
@kosiew kosiew marked this pull request as ready for review March 12, 2026 07:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

common Related to common crate core Core DataFusion crate physical-plan Changes to the physical-plan crate sql SQL Planner

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant