-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Establish the high level API for sort pushdown and the optimizer rule and support reverse files and row groups #19064
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…. Only re-arrange files and row groups and return Inexact.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR establishes the high-level API for sort pushdown optimization in DataFusion, specifically targeting Parquet files. The optimization improves TopK query performance by reordering files and row groups based on statistics, though it returns inexact ordering (keeping the Sort operator for correctness while enabling early termination).
Key Changes:
- Added
enable_sort_pushdownconfiguration option (default: true) to control the optimization - Implemented the optimizer rule
PushdownSortthat detects SortExec nodes and pushes sort requirements down to data sources - Extended ExecutionPlan, DataSource, and FileSource traits with
try_pushdown_sort()methods to support the optimization - Added
reverse_scan_inexactflag to ParquetSource to reverse row group read order when beneficial
Reviewed changes
Copilot reviewed 25 out of 30 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
docs/source/user-guide/configs.md |
Added documentation for the new enable_sort_pushdown configuration option |
datafusion/sqllogictest/test_files/topk.slt |
Updated test expectation to show reverse_scan_inexact=true in explain output |
datafusion/sqllogictest/test_files/spark/bitwise/bit_count.slt |
Fixed line number formatting in test file |
datafusion/sqllogictest/test_files/slt_features.slt |
Fixed line number formatting in test file |
datafusion/sqllogictest/test_files/information_schema.slt |
Added enable_sort_pushdown to configuration schema output |
datafusion/sqllogictest/test_files/explain.slt |
Updated test expectations to show PushdownSort optimizer phase |
datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt |
Added comprehensive integration tests for sort pushdown with various scenarios |
datafusion/sqllogictest/test_files/create_external_table.slt |
Updated test expectation to show reverse_scan_inexact=true |
datafusion/proto/src/logical_plan/file_formats.rs |
Added enable_sort_pushdown field to protobuf serialization (with hardcoded value bug) |
datafusion/proto/src/generated/datafusion_proto_common.rs |
Added enable_sort_pushdown field to generated protobuf structures |
datafusion/proto-common/src/to_proto/mod.rs |
Implemented protobuf serialization for enable_sort_pushdown (with hardcoded value bug) |
datafusion/proto-common/src/generated/prost.rs |
Added enable_sort_pushdown field to generated protobuf structures |
datafusion/proto-common/src/generated/pbjson.rs |
Added JSON serialization support for enable_sort_pushdown |
datafusion/proto-common/src/from_proto/mod.rs |
Implemented protobuf deserialization for enable_sort_pushdown (with hardcoded value bug) |
datafusion/proto-common/proto/datafusion_common.proto |
Added enable_sort_pushdown field to protobuf schema |
datafusion/physical-plan/src/execution_plan.rs |
Added try_pushdown_sort() trait method to ExecutionPlan |
datafusion/physical-optimizer/src/pushdown_sort.rs |
Implemented the core PushdownSort optimizer rule |
datafusion/physical-optimizer/src/optimizer.rs |
Registered PushdownSort optimizer in the optimization pipeline |
datafusion/physical-optimizer/src/lib.rs |
Exported the new pushdown_sort module |
datafusion/execution/src/config.rs |
Added session config method (with naming and documentation issues) |
datafusion/datasource/src/source.rs |
Added try_pushdown_sort() trait method to DataSource and DataSourceExec |
datafusion/datasource/src/file_scan_config.rs |
Implemented sort pushdown logic for FileScanConfig including file reversal |
datafusion/datasource/src/file.rs |
Added SortOrderPushdownResult enum and try_pushdown_sort() trait method to FileSource |
datafusion/datasource-parquet/src/source.rs |
Implemented sort pushdown for ParquetSource with reverse_scan_inexact flag |
datafusion/datasource-parquet/src/opener.rs |
Implemented row group reversal logic in ParquetOpener |
datafusion/core/tests/physical_optimizer/test_utils.rs |
Added OptimizationTest harness for testing physical optimizers |
datafusion/core/tests/physical_optimizer/pushdown_sort.rs |
Added comprehensive unit tests for the PushdownSort optimizer |
datafusion/core/tests/physical_optimizer/mod.rs |
Registered the pushdown_sort test module |
datafusion/common/src/file_options/parquet_writer.rs |
Added enable_sort_pushdown field to ParquetOptions destructuring |
datafusion/common/src/config.rs |
Added enable_sort_pushdown configuration field (with indentation issue) |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot <[email protected]>
| order: &[PhysicalSortExpr], | ||
| ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> { | ||
| // RepartitionExec only maintains input order if preserve_order is set | ||
| // or if there's only one partition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the single partition case handled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - maintains_input_order_helper returns true for single partitions, so the check passes and sort pushdown works correctly.
| // Default: clone self without modification | ||
| // ParquetSource will override this | ||
| not_impl_err!("with_file_ordering_info not implemented for this FileSource") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment doesn't match the implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch @adriangb , fixed the comments in latest PR, thanks!
Let me know if you want me to double check anything specific. Otherwise don't feel like you have to wait for me -- I am not likely to be able to get back here for the next few days BTW I also added this to the list of features to highlight for 52 n |
Thank you @alamb and @adriangb for the review and discussion! I've merged the PR now. If any issues come up, I'll address them promptly. Thanks again! |
|
Thank you for this amazing piece of work! Very excited to see where this goes next. |
|
100% agree -- this is so cool |
| match sort_input.try_pushdown_sort(required_ordering)? { | ||
| SortOrderPushdownResult::Exact { inner } => { | ||
| // Data source guarantees perfect ordering - remove the Sort operator | ||
| Ok(Transformed::yes(inner)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the sort has fetch, can we directly remove the sort?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes @xudong963 , we will implement the remove sort part in phase 2, here is the ticket:
in #19329
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR implemented the inexact mode, which has similar performance from sorted data clickbench.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the sort operator will be removed in the enforce_sorting if the required ordering is matched.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes agreed. I think what we ultimately need to do is return an ExecutionPlan that claims to satisfy the ordering and let the existing optimizer rule remove the sort for us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, i was testing the exact reverse cases, the enforce_sorting is not enough for the exact reverse, but it's ok for current main branch.
Which issue does this PR close?
Establish the high level API for sort pushdown and the optimizer rule. Only re-arrange files and row groups and return Inexact, now support reverse order case, and we don't need to cache anything for this implementation, so it's no memory overhead.
It will have huge performance improvement with dynamic topk pushdown to skip row groups.
Details:
Performance results on ClickBench sorted data: 13ms vs 300ms baseline (23x faster), close to aggressive caching approach (9.8ms) but with much better memory stability details
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?