feat: add LazyPartition trait for LazyMemoryExec, migrate generate_series to partitions#20369
feat: add LazyPartition trait for LazyMemoryExec, migrate generate_series to partitions#20369ethan-tyler wants to merge 6 commits intoapache:mainfrom
Conversation
|
@2010YOUY01 @Jefffrey - this might be of interest if you. |
kosiew
left a comment
There was a problem hiding this comment.
Thanks for working on this.
| Self { schema, args } | ||
| } | ||
|
|
||
| pub fn as_partition(&self, batch_size: usize) -> Result<Arc<dyn LazyPartition>> { |
There was a problem hiding this comment.
GenerateSeriesTable::as_partition currently constructs GenerateSeriesPartition without validating timestamp timezone input.
Previously as_generator parsed timezone eagerly and could fail during planning. Consider validating in GenerateSeriesPartition::new (or a try_new) to keep error timing consistent and fail earlier.
| @@ -268,10 +408,10 @@ impl DisplayAs for LazyMemoryExec { | |||
| write!( | |||
| f, | |||
| "LazyMemoryExec: partitions={}, batch_generators=[{}]", | |||
There was a problem hiding this comment.
DisplayAs still prints batch_generators= even though the primary abstraction is now partitions?
| /// Get the batch generators | ||
| #[deprecated(note = "Use LazyMemoryExec::partitions instead")] | ||
| #[expect(deprecated)] | ||
| pub fn generators(&self) -> &Vec<Arc<RwLock<dyn LazyBatchGenerator>>> { |
There was a problem hiding this comment.
Not the fault of this PR but
generators() returns &Vec<_>. Since this is a compatibility accessor, returning a slice (&[_]) would avoid exposing container type and align with other accessor style.
| } | ||
|
|
||
| #[allow(deprecated)] | ||
| fn try_from_lazy_memory_exec(exec: &LazyMemoryExec) -> Result<Option<Self>> { |
There was a problem hiding this comment.
The native GenerateSeriesPartition path uses serialize_generate_series_args, but legacy generator downcasts still manually reconstruct protobuf args.
Could we extract a helper that converts legacy generator variants into GenSeriesArgs first, then reuse serialize_generate_series_args to reduce duplicated conversion logic?
| pub fn try_set_partitioning(&mut self, partitioning: Partitioning) -> Result<()> { | ||
| let partition_count = partitioning.partition_count(); | ||
| let generator_count = self.batch_generators.len(); | ||
| let generator_count = self.partitions.len(); |
There was a problem hiding this comment.
generator_count although this now counts partitions?
…tions Signed-off-by: Ethan Urbanski <ethanurbanski@gmail.com>
Signed-off-by: Ethan Urbanski <ethanurbanski@gmail.com>
Signed-off-by: Ethan Urbanski <ethanurbanski@gmail.com>
Signed-off-by: Ethan Urbanski <ethanurbanski@gmail.com>
5bbb386 to
3253f6c
Compare
Which issue does this PR close?
LazyMemoryExecwithSendableRecordBatchStream#13614Rationale for this change
LazyMemoryExeccurrently uses generator closures (LazyBatchGenerator) as its partition interface. This replaces with aLazyPartitiontrait that gives partitions a proper abstraction while keeping the old generator path working through a legacy adapter.generate_series/rangeare migrated to nativeLazyPartitionas the first consumer. Proto roundtrip is updated to handle both native and legacy forms.What changes are included in this PR?
LazyPartitiontrait as the native partition interface forLazyMemoryExecLazyBatchGeneratorPartitionadapter allowing existing generator to workLazyMemoryExec::try_new,LazyMemoryExec::generators,GenerateSeriesTable::as_generator.generate_series/rangemigrated to nativeGenerateSeriesPartitionGenerateSeriesPartitionwith legacy adapter fallbackLazyPartitionAre these changes tested?
yes
cargo test -p datafusion-physical-plan lazy_memory_tests --libcargo test -p datafusion-functions-table generate_series_tests --libcargo test -p datafusion-proto roundtrip_generate_series --test proto_integrationcargo test -p datafusion execution::coop --test core_integrationcargo check -p datafusion-physical-plan -p datafusion-functions-table -p datafusion-proto -p datafusionAre there any user-facing changes?
No.
LazyBatchGeneratorand existing public APIs remain available (deprecated)