-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Fix distribution enforcement for chained aggregates to avoid SanityCheckPlan failures #19476
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
kosiew
wants to merge
19
commits into
apache:main
Choose a base branch
from
kosiew:sanitycheck-partitioning-18989
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+289
−139
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Include a regression test that verifies the optimized plan maintains a safe distribution path through either repartitioning or sort-preserving merge. Ensure it executes correctly without panicking. Register the new test module in the physical optimizer integration test suite.
Flag unmet hash distributions when existing partitioning keys differ from the requirements, regardless of partition counts. Add a superset-hash unit test to ensure EnforceDistribution inserts the necessary repartition while preserving the original partitioning layer.
Add a shared DistributionSatisfactionResult helper to streamline partitioning satisfaction checks. Update the SanityCheckPlan to utilize this helper and re-export PartitioningSatisfaction for better cross-crate reuse. Include unit tests for exact, subset, and superset hash partitioning scenarios to ensure consistent enforcement and sanity checking interpretations.
Reinstate logic to skip unnecessary repartitioning for single-partition joins, window partitions, and grouped unions, matching previous behavior and maintaining regression expectations.
…tioning validation
…s to reference issue apache#18989
Add necessary imports to enforce_distribution.rs and remove unused BoundedWindowAggExec import. Move the test for superset hash keys to a more logical position, ensuring consistency in testing for hash join partitioning. Update test expectations to reflect current optimizer behavior by removing unnecessary superset repartitions and replacing them with exact match requirements. Revise comments and assertions to align with the improved optimization. Delete the original enforce_distribution_superset.rs file.
…fy repartitioning logic
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Labels
core
Core DataFusion crate
optimizer
Optimizer rules
physical-expr
Changes to the physical-expr crates
sql
SQL Planner
sqllogictest
SQL Logic Tests (.slt)
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Rationale for this change
A panic could occur during
SanityCheckPlanwhen optimizing / executing queries that chain multiple aggregations over a multi-partitioned input (often empty tables), e.g.Sort -> Aggregate(ts, region) -> Sort -> Aggregate(ts).In these cases the first aggregation can produce a hash partitioning that is a superset of the second aggregation’s required distribution keys. The optimizer previously treated some of these situations as not needing a hash repartition (especially when the input was empty / stats suggested repartition wasn’t beneficial), which could leave the plan violating the downstream operator’s distribution requirement and trigger
SanityCheckPlanfailures.This PR aligns the optimizer’s distribution enforcement decisions with the same partitioning satisfaction logic used by the sanity checker, ensuring that required repartitions are inserted whenever the downstream requirement is not satisfied.
What changes are included in this PR?
Introduced
DistributionSatisfactionResultand a shareddistribution_satisfaction(...)helper inenforce_distributionto compute and carry:Distributionoutput_partitioningPartitioningSatisfaction(Exact / Subset / NotSatisfied)requires_repartition(...)decision function.Updated
add_hash_on_topandensure_distributionto use the shared satisfaction result and decision logic, rather than ad-hoc checks.Simplified
get_repartition_requirement_statusby removing alignment logic based onhash_necessaryflags and instead deferring to the satisfaction-based repartition decision.Updated
SanityCheckPlanto reuse the samedistribution_satisfaction(...)helper (so optimizer and sanity checker evaluate distribution satisfaction consistently).Re-exported
PartitioningSatisfactionfromdatafusion_physical_exprfor use in tests and downstream code.Added targeted regression tests:
PartitioningSatisfactionoutcomes (Exact / Subset / NotSatisfied) match sanity checker behavior for hash partitioning.SortPreservingMergeExec, and executes without panic.Updated sqllogictest expected plans where distribution enforcement now inserts
RepartitionExecand/or preserves partitioning throughSortExec.Are these changes tested?
Yes.
datafusion/core/tests/physical_optimizer/enforce_distribution.rsto validate distribution satisfaction behavior and its alignment withSanityCheckPlan.Performance sanity checks:
Ran the following benchmarks and verified no performance regressions:
cargo bench --bench topk_aggregate --profile=profiling -- --save-baseline beforecargo bench --bench distinct_query_sql --profile=profiling -- --save-baseline beforecargo bench --bench sort_preserving_merge --profile=profiling -- --save-baseline beforeAre there any user-facing changes?
Potentially.
RepartitionExecoperators (or preserve partitioning through sorts) to correctly satisfy downstream distribution requirements. This can change the displayed physical plan (EXPLAIN output) and may affect performance characteristics in edge cases, but fixes a correctness/stability issue (prevents optimizer-time / execution-time panics).LLM-generated code disclosure
This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested.