diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 8952e456398d0..b99ab010058f3 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -124,6 +124,7 @@ imdb: Join Order Benchmark (JOB) using the IMDB dataset conver # Micro-Benchmarks (specific operators and features) cancellation: How long cancelling a query takes +nlj: Benchmark for simple nested loop joins, testing various join scenarios ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Supported Configuration (Environment Variables) @@ -196,6 +197,7 @@ main() { data_clickbench_1 data_clickbench_partitioned data_imdb + # nlj uses range() function, no data generation needed ;; tpch) data_tpch "1" @@ -298,6 +300,10 @@ main() { # same data as for tpch data_tpch "1" ;; + nlj) + # nlj uses range() function, no data generation needed + echo "NLJ benchmark does not require data generation" + ;; *) echo "Error: unknown benchmark '$BENCHMARK' for data generation" usage @@ -354,6 +360,7 @@ main() { run_h2o_join "BIG" "PARQUET" "join" run_imdb run_external_aggr + run_nlj ;; tpch) run_tpch "1" "parquet" @@ -458,6 +465,9 @@ main() { topk_tpch) run_topk_tpch ;; + nlj) + run_nlj + ;; *) echo "Error: unknown benchmark '$BENCHMARK' for run" usage @@ -1085,6 +1095,14 @@ run_topk_tpch() { $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" --limit 100 ${QUERY_ARG} } +# Runs the nlj benchmark +run_nlj() { + RESULTS_FILE="${RESULTS_DIR}/nlj.json" + echo "RESULTS_FILE: ${RESULTS_FILE}" + echo "Running nlj benchmark..." + debug_run $CARGO_COMMAND --bin dfbench -- nlj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG} +} + compare_benchmarks() { BASE_RESULTS_DIR="${SCRIPT_DIR}/results" diff --git a/benchmarks/src/bin/dfbench.rs b/benchmarks/src/bin/dfbench.rs index e92fd115c7d87..88378492b7267 100644 --- a/benchmarks/src/bin/dfbench.rs +++ b/benchmarks/src/bin/dfbench.rs @@ -33,7 +33,7 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; #[global_allocator] static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; -use datafusion_benchmarks::{cancellation, clickbench, h2o, imdb, sort_tpch, tpch}; +use datafusion_benchmarks::{cancellation, clickbench, h2o, imdb, nlj, sort_tpch, tpch}; #[derive(Debug, StructOpt)] #[structopt(about = "benchmark command")] @@ -42,6 +42,7 @@ enum Options { Clickbench(clickbench::RunOpt), H2o(h2o::RunOpt), Imdb(imdb::RunOpt), + Nlj(nlj::RunOpt), SortTpch(sort_tpch::RunOpt), Tpch(tpch::RunOpt), TpchConvert(tpch::ConvertOpt), @@ -57,6 +58,7 @@ pub async fn main() -> Result<()> { Options::Clickbench(opt) => opt.run().await, Options::H2o(opt) => opt.run().await, Options::Imdb(opt) => Box::pin(opt.run()).await, + Options::Nlj(opt) => opt.run().await, Options::SortTpch(opt) => opt.run().await, Options::Tpch(opt) => Box::pin(opt.run()).await, Options::TpchConvert(opt) => opt.run().await, diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs index e7657c4078d12..5d982fad6f77f 100644 --- a/benchmarks/src/lib.rs +++ b/benchmarks/src/lib.rs @@ -20,6 +20,7 @@ pub mod cancellation; pub mod clickbench; pub mod h2o; pub mod imdb; +pub mod nlj; pub mod sort_tpch; pub mod tpch; pub mod util; diff --git a/benchmarks/src/nlj.rs b/benchmarks/src/nlj.rs new file mode 100644 index 0000000000000..4466641170ca4 --- /dev/null +++ b/benchmarks/src/nlj.rs @@ -0,0 +1,223 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::util::{BenchmarkRun, CommonOpt, QueryResult}; +use datafusion::{error::Result, prelude::SessionContext}; +use datafusion_common::instant::Instant; +use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError}; +use structopt::StructOpt; + +/// Run the Nested Loop Join (NLJ) benchmark +/// +/// This micro-benchmark focuses on the performance characteristics of NLJs. +/// +/// It always tries to use fast scanners (without decoding overhead) and +/// efficient predicate expressions to ensure it can reflect the performance +/// of the NLJ operator itself. +/// +/// In this micro-benchmark, the following workload characteristics will be +/// varied: +/// - Join type: Inner/Left/Right/Full (all for the NestedLoopJoin physical +/// operator) +/// TODO: Include special join types (Semi/Anti/Mark joins) +/// - Input size: Different combinations of left (build) side and right (probe) +/// side sizes +/// - Selectivity of join filters +#[derive(Debug, StructOpt, Clone)] +#[structopt(verbatim_doc_comment)] +pub struct RunOpt { + /// Query number (between 1 and 10). If not specified, runs all queries + #[structopt(short, long)] + query: Option, + + /// Common options + #[structopt(flatten)] + common: CommonOpt, + + /// If present, write results json here + #[structopt(parse(from_os_str), short = "o", long = "output")] + output_path: Option, +} + +/// Inline SQL queries for NLJ benchmarks +/// +/// Each query's comment includes: +/// - Left (build) side row count × Right (probe) side row count +/// - Join predicate selectivity (1% means the output size is 1% * input size) +const NLJ_QUERIES: &[&str] = &[ + // Q1: INNER 10K x 10K | LOW 0.1% + r#" + SELECT * + FROM range(10000) AS t1 + JOIN range(10000) AS t2 + ON (t1.value + t2.value) % 1000 = 0; + "#, + // Q2: INNER 10K x 10K | Medium 20% + r#" + SELECT * + FROM range(10000) AS t1 + JOIN range(10000) AS t2 + ON (t1.value + t2.value) % 5 = 0; + "#, + // Q3: INNER 10K x 10K | High 90% + r#" + SELECT * + FROM range(10000) AS t1 + JOIN range(10000) AS t2 + ON (t1.value + t2.value) % 10 <> 0; + "#, + // Q4: INNER 30K x 30K | Medium 20% + r#" + SELECT * + FROM range(30000) AS t1 + JOIN range(30000) AS t2 + ON (t1.value + t2.value) % 5 = 0; + "#, + // Q5: INNER 10K x 200K | LOW 0.1% (small to large) + r#" + SELECT * + FROM range(10000) AS t1 + JOIN range(200000) AS t2 + ON (t1.value + t2.value) % 1000 = 0; + "#, + // Q6: INNER 200K x 10K | LOW 0.1% (large to small) + r#" + SELECT * + FROM range(200000) AS t1 + JOIN range(10000) AS t2 + ON (t1.value + t2.value) % 1000 = 0; + "#, + // Q7: RIGHT OUTER 10K x 200K | LOW 0.1% + r#" + SELECT * + FROM range(10000) AS t1 + RIGHT JOIN range(200000) AS t2 + ON (t1.value + t2.value) % 1000 = 0; + "#, + // Q8: LEFT OUTER 200K x 10K | LOW 0.1% + r#" + SELECT * + FROM range(200000) AS t1 + LEFT JOIN range(10000) AS t2 + ON (t1.value + t2.value) % 1000 = 0; + "#, + // Q9: FULL OUTER 30K x 30K | LOW 0.1% + r#" + SELECT * + FROM range(30000) AS t1 + FULL JOIN range(30000) AS t2 + ON (t1.value + t2.value) % 1000 = 0; + "#, + // Q10: FULL OUTER 30K x 30K | High 90% + r#" + SELECT * + FROM range(30000) AS t1 + FULL JOIN range(30000) AS t2 + ON (t1.value + t2.value) % 10 <> 0; + "#, +]; + +impl RunOpt { + pub async fn run(self) -> Result<()> { + println!("Running NLJ benchmarks with the following options: {self:#?}\n"); + + // Define query range + let query_range = match self.query { + Some(query_id) => { + if query_id >= 1 && query_id <= NLJ_QUERIES.len() { + query_id..=query_id + } else { + return exec_err!( + "Query {query_id} not found. Available queries: 1 to {}", + NLJ_QUERIES.len() + ); + } + } + None => 1..=NLJ_QUERIES.len(), + }; + + let config = self.common.config()?; + let rt_builder = self.common.runtime_env_builder()?; + let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); + + let mut benchmark_run = BenchmarkRun::new(); + for query_id in query_range { + let query_index = query_id - 1; // Convert 1-based to 0-based index + + let sql = NLJ_QUERIES[query_index]; + benchmark_run.start_new_case(&format!("Query {query_id}")); + let query_run = self.benchmark_query(sql, &query_id.to_string(), &ctx).await; + match query_run { + Ok(query_results) => { + for iter in query_results { + benchmark_run.write_iter(iter.elapsed, iter.row_count); + } + } + Err(e) => { + return Err(DataFusionError::Context( + "NLJ benchmark Q{query_id} failed with error:".to_string(), + Box::new(e), + )); + } + } + } + + benchmark_run.maybe_write_json(self.output_path.as_ref())?; + Ok(()) + } + + /// Validates that the query's physical plan uses a NestedLoopJoin (NLJ), + /// then executes the query and collects execution times. + /// + /// TODO: ensure the optimizer won't change the join order (it's not at + /// v48.0.0). + async fn benchmark_query( + &self, + sql: &str, + query_name: &str, + ctx: &SessionContext, + ) -> Result> { + let mut query_results = vec![]; + + // Validate that the query plan includes a Nested Loop Join + let df = ctx.sql(sql).await?; + let physical_plan = df.create_physical_plan().await?; + let plan_string = format!("{physical_plan:#?}"); + + if !plan_string.contains("NestedLoopJoinExec") { + return Err(exec_datafusion_err!( + "Query {query_name} does not use Nested Loop Join. Physical plan: {plan_string}" + )); + } + + for i in 0..self.common.iterations { + let start = Instant::now(); + let df = ctx.sql(sql).await?; + let batches = df.collect().await?; + let elapsed = start.elapsed(); + + let row_count = batches.iter().map(|b| b.num_rows()).sum(); + println!( + "Query {query_name} iteration {i} returned {row_count} rows in {elapsed:?}" + ); + + query_results.push(QueryResult { elapsed, row_count }); + } + + Ok(query_results) + } +} diff --git a/datafusion/physical-plan/src/joins/mod.rs b/datafusion/physical-plan/src/joins/mod.rs index 1d36db996434e..035d736cb1430 100644 --- a/datafusion/physical-plan/src/joins/mod.rs +++ b/datafusion/physical-plan/src/joins/mod.rs @@ -29,6 +29,7 @@ pub use symmetric_hash_join::SymmetricHashJoinExec; mod cross_join; mod hash_join; mod nested_loop_join; +mod nlj; mod sort_merge_join; mod stream_join_utils; mod symmetric_hash_join; diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 5bb1673d4af26..338102e1da499 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -31,6 +31,7 @@ use super::utils::{ }; use crate::common::can_project; use crate::execution_plan::{boundedness_from_children, EmissionType}; +use crate::joins::nlj::NLJStream; use crate::joins::utils::{ adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_from_indices, build_join_schema, check_join_is_valid, estimate_join_statistics, @@ -66,8 +67,10 @@ use datafusion_physical_expr::equivalence::{ use futures::{ready, Stream, StreamExt, TryStreamExt}; use parking_lot::Mutex; +static USE_NLJ_V2: bool = true; + /// Left (build-side) data -struct JoinLeftData { +pub(crate) struct JoinLeftData { /// Build-side data collected to single batch batch: RecordBatch, /// Shared bitmap builder for visited left indices @@ -82,7 +85,7 @@ struct JoinLeftData { } impl JoinLeftData { - fn new( + pub(crate) fn new( batch: RecordBatch, bitmap: SharedBitmapBuilder, probe_threads_counter: AtomicUsize, @@ -96,17 +99,17 @@ impl JoinLeftData { } } - fn batch(&self) -> &RecordBatch { + pub(crate) fn batch(&self) -> &RecordBatch { &self.batch } - fn bitmap(&self) -> &SharedBitmapBuilder { + pub(crate) fn bitmap(&self) -> &SharedBitmapBuilder { &self.bitmap } /// Decrements counter of running threads, and returns `true` /// if caller is the last running thread - fn report_probe_completed(&self) -> bool { + pub(crate) fn report_probe_completed(&self) -> bool { self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1 } } @@ -307,29 +310,9 @@ impl NestedLoopJoinExec { )) } - /// Returns a vector indicating whether the left and right inputs maintain their order. - /// The first element corresponds to the left input, and the second to the right. - /// - /// The left (build-side) input's order may change, but the right (probe-side) input's - /// order is maintained for INNER, RIGHT, RIGHT ANTI, and RIGHT SEMI joins. - /// - /// Maintaining the right input's order helps optimize the nodes down the pipeline - /// (See [`ExecutionPlan::maintains_input_order`]). - /// - /// This is a separate method because it is also called when computing properties, before - /// a [`NestedLoopJoinExec`] is created. It also takes [`JoinType`] as an argument, as - /// opposed to `Self`, for the same reason. - fn maintains_input_order(join_type: JoinType) -> Vec { - vec![ - false, - matches!( - join_type, - JoinType::Inner - | JoinType::Right - | JoinType::RightAnti - | JoinType::RightSemi - ), - ] + /// This join implementation does not preserve the input order of either side. + fn maintains_input_order(_join_type: JoinType) -> Vec { + vec![false, false] } pub fn contains_projection(&self) -> bool { @@ -530,21 +513,35 @@ impl ExecutionPlan for NestedLoopJoinExec { None => self.column_indices.clone(), }; - Ok(Box::pin(NestedLoopJoinStream { - schema: self.schema(), - filter: self.filter.clone(), - join_type: self.join_type, - outer_table, - inner_table, - column_indices: column_indices_after_projection, - join_metrics, - indices_cache, - right_side_ordered, - state: NestedLoopJoinStreamState::WaitBuildSide, - left_data: None, - join_result_status: None, - intermediate_batch_size: batch_size, - })) + if USE_NLJ_V2 { + // println!("Using NLJ v2"); + Ok(Box::pin(NLJStream::new( + self.schema(), + self.filter.clone(), + self.join_type, + outer_table, + inner_table, + column_indices_after_projection, + join_metrics, + batch_size, + ))) + } else { + Ok(Box::pin(NestedLoopJoinStream { + schema: self.schema(), + filter: self.filter.clone(), + join_type: self.join_type, + outer_table, + inner_table, + column_indices: column_indices_after_projection, + join_metrics, + indices_cache, + right_side_ordered, + state: NestedLoopJoinStreamState::WaitBuildSide, + left_data: None, + join_result_status: None, + intermediate_batch_size: batch_size, + })) + } } fn metrics(&self) -> Option { @@ -1194,7 +1191,6 @@ pub(crate) mod tests { common, expressions::Column, repartition::RepartitionExec, test::build_table_i32, }; - use arrow::array::Int32Array; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field}; use datafusion_common::test_util::batches_to_sort_string; @@ -1748,167 +1744,6 @@ pub(crate) mod tests { Ok(()) } - fn prepare_mod_join_filter() -> JoinFilter { - let column_indices = vec![ - ColumnIndex { - index: 1, - side: JoinSide::Left, - }, - ColumnIndex { - index: 1, - side: JoinSide::Right, - }, - ]; - let intermediate_schema = Schema::new(vec![ - Field::new("x", DataType::Int32, true), - Field::new("x", DataType::Int32, true), - ]); - - // left.b1 % 3 - let left_mod = Arc::new(BinaryExpr::new( - Arc::new(Column::new("x", 0)), - Operator::Modulo, - Arc::new(Literal::new(ScalarValue::Int32(Some(3)))), - )) as Arc; - // left.b1 % 3 != 0 - let left_filter = Arc::new(BinaryExpr::new( - left_mod, - Operator::NotEq, - Arc::new(Literal::new(ScalarValue::Int32(Some(0)))), - )) as Arc; - - // right.b2 % 5 - let right_mod = Arc::new(BinaryExpr::new( - Arc::new(Column::new("x", 1)), - Operator::Modulo, - Arc::new(Literal::new(ScalarValue::Int32(Some(5)))), - )) as Arc; - // right.b2 % 5 != 0 - let right_filter = Arc::new(BinaryExpr::new( - right_mod, - Operator::NotEq, - Arc::new(Literal::new(ScalarValue::Int32(Some(0)))), - )) as Arc; - // filter = left.b1 % 3 != 0 and right.b2 % 5 != 0 - let filter_expression = - Arc::new(BinaryExpr::new(left_filter, Operator::And, right_filter)) - as Arc; - - JoinFilter::new( - filter_expression, - column_indices, - Arc::new(intermediate_schema), - ) - } - - fn generate_columns(num_columns: usize, num_rows: usize) -> Vec> { - let column = (1..=num_rows).map(|x| x as i32).collect(); - vec![column; num_columns] - } - - #[rstest] - #[tokio::test] - async fn join_maintains_right_order( - #[values( - JoinType::Inner, - JoinType::Right, - JoinType::RightAnti, - JoinType::RightSemi - )] - join_type: JoinType, - #[values(1, 100, 1000)] left_batch_size: usize, - #[values(1, 100, 1000)] right_batch_size: usize, - #[values(1001, 10000)] batch_size: usize, - ) -> Result<()> { - let left_columns = generate_columns(3, 1000); - let left = build_table( - ("a1", &left_columns[0]), - ("b1", &left_columns[1]), - ("c1", &left_columns[2]), - Some(left_batch_size), - Vec::new(), - ); - - let right_columns = generate_columns(3, 1000); - let right = build_table( - ("a2", &right_columns[0]), - ("b2", &right_columns[1]), - ("c2", &right_columns[2]), - Some(right_batch_size), - vec!["a2", "b2", "c2"], - ); - - let filter = prepare_mod_join_filter(); - - let nested_loop_join = Arc::new(NestedLoopJoinExec::try_new( - left, - Arc::clone(&right), - Some(filter), - &join_type, - None, - )?) as Arc; - assert_eq!(nested_loop_join.maintains_input_order(), vec![false, true]); - - let right_column_indices = match join_type { - JoinType::Inner | JoinType::Right => vec![3, 4, 5], - JoinType::RightAnti | JoinType::RightSemi => vec![0, 1, 2], - _ => unreachable!(), - }; - - let right_ordering = right.output_ordering().unwrap(); - let join_ordering = nested_loop_join.output_ordering().unwrap(); - for (right, join) in right_ordering.iter().zip(join_ordering.iter()) { - let right_column = right.expr.as_any().downcast_ref::().unwrap(); - let join_column = join.expr.as_any().downcast_ref::().unwrap(); - assert_eq!(join_column.name(), join_column.name()); - assert_eq!( - right_column_indices[right_column.index()], - join_column.index() - ); - assert_eq!(right.options, join.options); - } - - let task_ctx = new_task_ctx(batch_size); - let batches = nested_loop_join - .execute(0, task_ctx)? - .try_collect::>() - .await?; - - // Make sure that the order of the right side is maintained - let mut prev_values = [i32::MIN, i32::MIN, i32::MIN]; - - for (batch_index, batch) in batches.iter().enumerate() { - let columns: Vec<_> = right_column_indices - .iter() - .map(|&i| { - batch - .column(i) - .as_any() - .downcast_ref::() - .unwrap() - }) - .collect(); - - for row in 0..batch.num_rows() { - let current_values = [ - columns[0].value(row), - columns[1].value(row), - columns[2].value(row), - ]; - assert!( - current_values - .into_iter() - .zip(prev_values) - .all(|(current, prev)| current >= prev), - "batch_index: {batch_index} row: {row} current: {current_values:?}, prev: {prev_values:?}" - ); - prev_values = current_values; - } - } - - Ok(()) - } - /// Returns the column names on the schema fn columns(schema: &Schema) -> Vec { schema.fields().iter().map(|f| f.name().clone()).collect() diff --git a/datafusion/physical-plan/src/joins/nlj.rs b/datafusion/physical-plan/src/joins/nlj.rs new file mode 100644 index 0000000000000..033cfac4c1a6c --- /dev/null +++ b/datafusion/physical-plan/src/joins/nlj.rs @@ -0,0 +1,897 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Implementation of the Nested Loop Join operator. +//! +//! For detailed information regarding the operator's state machine and execution flow, +//! please refer to the documentation provided in the `poll_next()` method. + +use arrow::buffer::BooleanBuffer; +use arrow::compute::{filter_record_batch, BatchCoalescer}; +use futures::{ready, StreamExt}; +use log::debug; +use std::ops::BitOr; +use std::sync::Arc; +use std::task::Poll; + +use crate::joins::nested_loop_join::JoinLeftData; +use crate::joins::utils::{ + build_batch_from_indices, need_produce_result_in_final, BuildProbeJoinMetrics, + ColumnIndex, JoinFilter, OnceFut, +}; +use crate::metrics::Count; +use crate::{RecordBatchStream, SendableRecordBatchStream}; + +use arrow::array::{ + Array, BooleanArray, RecordBatchOptions, UInt32Builder, UInt64Array, UInt64Builder, +}; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use datafusion_common::{ + cast::as_boolean_array, internal_datafusion_err, unwrap_or_internal_err, + DataFusionError, JoinSide, Result, ScalarValue, +}; +use datafusion_expr::JoinType; + +use futures::Stream; + +/// States for join processing. See `poll_next()` comment for more details about +/// state transitions. +#[derive(Debug, Clone, Copy)] +enum NLJState { + BufferingLeft, + FetchingRight, + ProbeRight, + EmitRightUnmatched, + EmitLeftUnmatched, + Done, +} + +pub(crate) struct NLJStream { + // ======================================================================== + // PROPERTIES: + // Operator's properties that remain constant + // ======================================================================== + /// Output schema + pub(crate) output_schema: Arc, + /// join filter + pub(crate) join_filter: Option, + /// type of the join + pub(crate) join_type: JoinType, + /// the outer table data of the nested loop join + pub(crate) outer_table: SendableRecordBatchStream, + /// the inner table data of the nested loop join + pub(crate) inner_table: OnceFut, + /// Information of index and left / right placement of columns + pub(crate) column_indices: Vec, + /// Join execution metrics + pub(crate) join_metrics: BuildProbeJoinMetrics, + + /// `batch_size` from configuration + cfg_batch_size: usize, + + /// Should we use a bitmap to track each incoming right batch's each row's + /// 'joined' status. + /// For example in right joins, we have to use a bit map to track matched + /// right side rows, and later enter a `EmitRightUnmatched` stage to emit + /// unmatched right rows. + should_track_unmatched_right: bool, + + // ======================================================================== + // STATE FLAGS/BUFFERS: + // Fields that hold intermediate data/flags during execution + // ======================================================================== + /// State Tracking + state: NLJState, + /// Output buffer holds the join result to output. It will emit eagerly when + /// the threshold is reached. + output_buffer: Box, + /// See comments in [`NLJState::Done`] for its purpose + handled_empty_output: bool, + + // Buffer(left) side + // ----------------- + /// The current buffered left data to join + buffered_left_data: Option>, + /// Index into the left buffered batch. Used in `ProbeRight` state + l_probe_idx: usize, + /// Index into the left buffered batch. Used in `EmitLeftUnmatched` state + l_emit_idx: u64, + /// Should we go back to `BufferingLeft` state again after `EmitLeftUnmatched` + /// state is over. + left_exhausted: bool, + /// If we can buffer all left data in one pass + /// TODO(now): this is for the (unimplemented) memory-limited execution + #[allow(dead_code)] + left_buffered_in_one_pass: bool, + + // Probe(right) side + // ----------------- + /// The current probe batch to process + current_right_batch: Option, + // For right join, keep track of matched rows in `current_right_batch` + // Constructed when fetching each new incoming right batch in `FetchingRight` state. + current_right_batch_matched: Option, +} + +impl Stream for NLJStream { + type Item = Result; + + /// # Design + /// + /// The high-level control flow for this operator is: + /// 1. Buffer all batches from the left side (unless memory limit is reached, + /// in which case see notes at 'Memory-limited Execution'). + /// - Rationale: The right side scanning can be expensive (it might + /// include decoding Parquet files), so it tries to buffer more left + /// batches at once to minimize the scan passes. + /// 2. Read right side batch one at a time. For each iteration, it only + /// evaluates the join filter on (1-left-row x right-batch), and puts the + /// result into the output buffer. Once the output buffer has reached + /// the threshold, output immediately. + /// - Rationale: Making the intermediate data smaller can 1) be more cache + /// friendly for processing to execute faster, and 2) use less memory. + /// + /// Note: Currently, both the filter-evaluation granularity and output + /// buffer size are `batch_size` from the configuration (default 8192). + /// We might try to tune it slightly for performance in the future. + /// + /// + /// + /// # Memory-limited Execution + /// + /// TODO. + /// The idea is each time buffer as much batches from the left side as + /// possible, then scan the right side once for all buffered left data. + /// Then buffer another left batches, scan right side again until finish. + /// + /// + /// + /// # Implementation + /// + /// This function is the entry point of NLJ operator's state machine + /// transitions. The rough state transition graph is as follow, for more + /// details see the comment in each state's matching arm. + /// + /// Draft state transition graph: + /// + /// (start) --> BufferingLeft + /// ---------------------------- + /// BufferingLeft → FetchingRight + /// + /// FetchingRight → ProbeRight (if right batch available) + /// FetchingRight → EmitLeftUnmatched (if right exhausted) + /// + /// ProbeRight → ProbeRight (next left row or after yielding output) + /// ProbeRight → EmitRightUnmatched (for special join types like right join) + /// ProbeRight → FetchingRight (done with the current right batch) + /// + /// EmitRightUnmatched → FetchingRight + /// + /// EmitLeftUnmatched → EmitLeftUnmatched (only process 1 chunk for each + /// iteration) + /// EmitLeftUnmatched → Done (if finished) + /// ---------------------------- + /// Done → (end) + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + loop { + match self.state { + // # NLJState transitions + // --> FetchingRight + // This state will prepare the left side batches, next state + // `FetchingRight` is responsible for preparing a single probe + // side batch, before start joining. + NLJState::BufferingLeft => { + debug!("[NLJState] Entering: {:?}", self.state); + match ready!(self.inner_table.get_shared(cx)) { + Ok(left_data) => { + self.buffered_left_data = Some(left_data); + // TOOD: implement memory-limited case + self.left_exhausted = true; + self.state = NLJState::FetchingRight; + continue; + } + Err(e) => return Poll::Ready(Some(Err(e))), + } + } + + // # NLJState transitions: + // 1. --> ProbeRight + // Start processing the join for the newly fetched right + // batch. + // 2. --> EmitLeftUnmatched: When the right side input is exhausted, (maybe) emit + // unmatched left side rows. + // + // After fetching a new batch from the right side, it will + // process all rows from the buffered left data: + // ```text + // for batch in right_side: + // for row in left_buffer: + // join(batch, row) + // ``` + // Note: the implementation does this step incrementally, + // instead of materializing all intermediate Cartesian products + // at once in memory. + // + // So after the right side input is exhausted, the join phase + // for the current buffered left data is finished. We can go to + // the next `EmitLeftUnmatched` phase to check if there is any + // special handling (e.g., in cases like left join). + NLJState::FetchingRight => { + debug!("[NLJState] Entering: {:?}", self.state); + match ready!(self.outer_table.poll_next_unpin(cx)) { + Some(Ok(right_batch)) => { + let right_batch_size = right_batch.num_rows(); + + // Skip the empty batch + if right_batch_size == 0 { + continue; + } + + self.current_right_batch = Some(right_batch); + + // Prepare right bitmap + if self.should_track_unmatched_right { + let zeroed_buf = + BooleanBuffer::new_unset(right_batch_size); + self.current_right_batch_matched = + Some(BooleanArray::new(zeroed_buf, None)); + } + + self.l_probe_idx = 0; + self.state = NLJState::ProbeRight; + continue; + } + Some(Err(e)) => return Poll::Ready(Some(Err(e))), + None => { + // Right stream exhausted/as + self.state = NLJState::EmitLeftUnmatched; + continue; + } + } + } + + // NLJState transitions: + // 1. --> ProbeRight(1) + // If we have already buffered enough output to yield, it + // will first give back control to the parent state machine, + // then resume at the same place. + // 2. --> ProbeRight(2) + // After probing one right batch, and evaluating the + // join filter on (left-row x right-batch), it will advance + // to the next left row, then re-enter the current state and + // continue joining. + // 3. --> FetchRight + // After it has done with the current right batch (to join + // with all rows in the left buffer), it will go to + // FetchRight state to check what to do next. + NLJState::ProbeRight => { + debug!("[NLJState] Entering: {:?}", self.state); + // Return any completed batches first + if self.output_buffer.has_completed_batch() { + if let Some(batch) = self.output_buffer.next_completed_batch() { + let poll = Poll::Ready(Some(Ok(batch))); + return self.join_metrics.baseline.record_poll(poll); + } + } + + // Process current probe state + match self.process_probe_batch() { + // State unchanged (ProbeRight) + // Continue probing until we have done joining the + // current right batch with all buffered left rows. + Ok(true) => continue, + // To next FetchRightState + // We have finished joining + // (cur_right_batch x buffered_left_batches) + Ok(false) => { + // Left exhausted, transition to FetchingRight + self.l_probe_idx = 0; + if self.should_track_unmatched_right { + debug_assert!(self.current_right_batch_matched.is_some()); + self.state = NLJState::EmitRightUnmatched; + } else { + self.current_right_batch = None; + self.state = NLJState::FetchingRight; + } + continue; + } + Err(e) => return Poll::Ready(Some(Err(e))), + } + } + + // In the `current_right_batch_matched` bitmap, all trues mean + // it has been outputed by the join. In this state we have to + // output unmatched rows for current right batch (with null + // padding for left relation) + // Precondition: we have checked the join type so that it's + // possible to output right unmatched (e.g. it's right join) + NLJState::EmitRightUnmatched => { + debug!("[NLJState] Entering: {:?}", self.state); + debug_assert!(self.current_right_batch.is_some()); + debug_assert!(self.current_right_batch_matched.is_some()); + + // Construct the result batch for unmatched right rows using a utility function + if let Some(batch) = self.process_right_unmatched()? { + self.output_buffer.push_batch(batch)?; + } + + // Processed all in one pass + // cleared inside `process_right_unmatched` + debug_assert!(self.current_right_batch.is_none()); + self.state = NLJState::FetchingRight; + } + + // NLJState transitions: + // 1. --> EmitLeftUnmatched(1) + // If we have already buffered enough output to yield, it + // will first give back control to the parent state machine, + // then resume at the same place. + // 2. --> EmitLeftUnmatched(2) + // After processing some unmatched rows, it will re-enter + // the same state, to check if there are any more final + // results to output. + // 3. --> Done + // It has processed all data, go to the final state and ready + // to exit. + // + // TODO: For memory-limited case, go back to `BufferingLeft` + // state again. + NLJState::EmitLeftUnmatched => { + debug!("[NLJState] Entering: {:?}", self.state); + // Return any completed batches first + if self.output_buffer.has_completed_batch() { + if let Some(batch) = self.output_buffer.next_completed_batch() { + let poll = Poll::Ready(Some(Ok(batch))); + return self.join_metrics.baseline.record_poll(poll); + } + } + + // Process current unmatched state + match self.process_left_unmatched() { + // State unchanged (EmitLeftUnmatched) + // Continue processing until we have processed all unmatched rows + Ok(true) => continue, + // To Done state + // We have finished processing all unmatched rows + Ok(false) => { + self.output_buffer.finish_buffered_batch()?; + self.state = NLJState::Done; + continue; + } + Err(e) => return Poll::Ready(Some(Err(e))), + } + } + + // The final state and the exit point + NLJState::Done => { + debug!("[NLJState] Entering: {:?}", self.state); + // Return any remaining completed batches before final termination + if self.output_buffer.has_completed_batch() { + if let Some(batch) = self.output_buffer.next_completed_batch() { + let poll = Poll::Ready(Some(Ok(batch))); + return self.join_metrics.baseline.record_poll(poll); + } + } + + // HACK for the doc test in https://github.com/apache/datafusion/blob/main/datafusion/core/src/dataframe/mod.rs#L1265 + // If this operator directly return `Poll::Ready(None)` + // for empty result, the final result will become an empty + // batch with empty schema, however the expected result + // should be with the expected schema for this operator + if !self.handled_empty_output { + let zero_count = Count::new(); + if *self.join_metrics.baseline.output_rows() == zero_count { + let empty_batch = + RecordBatch::new_empty(Arc::clone(&self.output_schema)); + self.handled_empty_output = true; + return Poll::Ready(Some(Ok(empty_batch))); + } + } + + return Poll::Ready(None); + } + } + } + } +} + +impl RecordBatchStream for NLJStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.output_schema) + } +} + +impl NLJStream { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + schema: Arc, + filter: Option, + join_type: JoinType, + outer_table: SendableRecordBatchStream, + inner_table: OnceFut, + column_indices: Vec, + join_metrics: BuildProbeJoinMetrics, + cfg_batch_size: usize, + ) -> Self { + let should_track_unmatched_right = matches!( + join_type, + JoinType::Full + | JoinType::Right + | JoinType::RightAnti + | JoinType::RightMark + | JoinType::RightSemi + ); + + Self { + output_schema: Arc::clone(&schema), + join_filter: filter, + join_type, + outer_table, + column_indices, + inner_table, + join_metrics, + buffered_left_data: None, + output_buffer: Box::new(BatchCoalescer::new(schema, cfg_batch_size)), + cfg_batch_size, + current_right_batch: None, + current_right_batch_matched: None, + state: NLJState::BufferingLeft, + l_probe_idx: 0, + l_emit_idx: 0, + left_exhausted: false, + left_buffered_in_one_pass: true, + handled_empty_output: false, + should_track_unmatched_right, + } + } + + // ==== Core logic handling for each state ==== + + /// Returns bool to indicate should it continue probing + /// true -> continue in the same ProbeRight state + /// false -> It has done with the (buffered_left x cur_right_batch), go to + /// next state (ProbeRight) + fn process_probe_batch(&mut self) -> Result { + let left_data = + Arc::clone(self.buffered_left_data.as_ref().ok_or_else(|| { + internal_datafusion_err!("LeftData should be available") + })?); + let right_batch = self + .current_right_batch + .as_ref() + .ok_or_else(|| internal_datafusion_err!("Right batch should be available"))? + .clone(); + + // stop probing, the caller will go to the next state + if self.l_probe_idx >= left_data.batch().num_rows() { + return Ok(false); + } + + // ======== + // Join (l_row x right_batch) + // and push the result into output_buffer + // ======== + + let l_idx = self.l_probe_idx; + let join_batch = + self.process_single_left_row_join(&left_data, &right_batch, l_idx)?; + + if let Some(batch) = join_batch { + self.output_buffer.push_batch(batch)?; + } + + // ==== Prepare for the next iteration ==== + + // Advance left cursor + self.l_probe_idx += 1; + + // Return true to continue probing + Ok(true) + } + + /// Process a single left row join with the current right batch. + /// Returns a RecordBatch containing the join results (None if empty) + fn process_single_left_row_join( + &mut self, + left_data: &JoinLeftData, + right_batch: &RecordBatch, + l_index: usize, + ) -> Result> { + let right_row_count = right_batch.num_rows(); + if right_row_count == 0 { + return Ok(None); + } + + let cur_right_bitmap = if let Some(filter) = &self.join_filter { + apply_join_filter_to_single_left_row( + left_data.batch(), + l_index, + right_batch, + filter, + JoinSide::Left, + )? + } else { + BooleanArray::from(vec![true; right_row_count]) + }; + + let joined_len = cur_right_bitmap.true_count(); + + // Update left row match bitmap for outer join support + if need_produce_result_in_final(self.join_type) && (joined_len > 0) { + let mut bitmap = left_data.bitmap().lock(); + bitmap.set_bit(l_index, true); + } + + // For special joins like RightJoin, update the bitmap for matched rows + // in the current right batch + if self.should_track_unmatched_right { + debug_assert!(self.current_right_batch_matched.is_some()); + // after bit-wise or, it will be put back + let right_bitmap = std::mem::take(&mut self.current_right_batch_matched) + .ok_or_else(|| { + internal_datafusion_err!("right batch's bitmap should be present") + })?; + let (buf, nulls) = right_bitmap.into_parts(); + debug_assert!(nulls.is_none()); + let updated_right_bitmap = buf.bitor(cur_right_bitmap.values()); + + self.current_right_batch_matched = + Some(BooleanArray::new(updated_right_bitmap, None)); + } + + // For the following join types: here we only have to set the left/right + // bitmap, and no need to output result + if matches!( + self.join_type, + JoinType::LeftAnti + | JoinType::LeftSemi + | JoinType::LeftMark + | JoinType::RightAnti + | JoinType::RightMark + | JoinType::RightSemi + ) { + return Ok(None); + } + + if joined_len == 0 { + Ok(None) + } else { + // Use the optimized approach similar to build_intermediate_batch_for_single_left_row + let join_batch = build_batch_for_single_left_row( + &self.output_schema, + left_data.batch(), + l_index, + right_batch, + Some(cur_right_bitmap), + &self.column_indices, + )?; + Ok(Some(join_batch)) + } + } + + // Returns bool to indicate should it continue processing unmatched rows + // true -> continue in the same EmitLeftUnmatched state + // false -> next state (Done) + fn process_left_unmatched(&mut self) -> Result { + let left_data = self + .buffered_left_data + .as_ref() + .ok_or_else(|| internal_datafusion_err!("LeftData should be available"))?; + let left_batch = left_data.batch(); + + // Early return if join type can't have unmatched rows + if !need_produce_result_in_final(self.join_type) { + return Ok(false); + } + + // Early return if another thread is already processing unmatched rows + if self.l_emit_idx == 0 && !left_data.report_probe_completed() { + return Ok(false); + } + + // Stop processing unmatched rows, the caller will go to the next state + if self.l_emit_idx >= left_batch.num_rows() as u64 { + return Ok(false); + } + + // ======== + // Process unmatched rows and push the result into output_buffer + // Each time, the number to process is up to batch size + // ======== + let start_idx = self.l_emit_idx as usize; + let end_idx = + std::cmp::min(start_idx + self.cfg_batch_size, left_batch.num_rows()); + + if let Some(batch) = + self.process_left_unmatched_range(left_data, start_idx, end_idx)? + { + self.output_buffer.push_batch(batch)?; + } + + // ==== Prepare for the next iteration ==== + self.l_emit_idx = end_idx as u64; + + // Return true to continue processing unmatched rows + Ok(true) + } + + /// Process unmatched rows from the left data within the specified range. + /// Returns a RecordBatch containing the unmatched rows (None if empty). + /// + /// # Arguments + /// * `left_data` - The left side data containing the batch and bitmap + /// * `start_idx` - Start index (inclusive) of the range to process + /// * `end_idx` - End index (exclusive) of the range to process + /// + /// # Safety + /// The caller is responsible for ensuring that `start_idx` and `end_idx` are + /// within valid bounds of the left batch. This function does not perform + /// bounds checking. + fn process_left_unmatched_range( + &self, + left_data: &JoinLeftData, + start_idx: usize, + end_idx: usize, + ) -> Result> { + let mut left_indices_builder = UInt64Builder::new(); + let mut right_indices_builder = UInt32Builder::new(); + + let bitmap = left_data.bitmap().lock(); + for i in start_idx..end_idx { + let should_include = match self.join_type { + JoinType::LeftSemi => bitmap.get_bit(i), + JoinType::LeftMark => { + left_indices_builder.append_value(i as u64); + if bitmap.get_bit(i) { + right_indices_builder.append_value(0); + } else { + right_indices_builder.append_null(); + } + false // handled above + } + _ => !bitmap.get_bit(i), // Left, LeftAnti, Full - include unmatched + }; + + if should_include { + left_indices_builder.append_value(i as u64); + right_indices_builder.append_null(); + } + } + + let left_indices = left_indices_builder.finish(); + let right_indices = right_indices_builder.finish(); + + if !left_indices.is_empty() { + let empty_right_batch = RecordBatch::new_empty(self.outer_table.schema()); + let result_batch = build_batch_from_indices( + &self.output_schema, + left_data.batch(), + &empty_right_batch, + &left_indices, + &right_indices, + &self.column_indices, + JoinSide::Left, + )?; + Ok(Some(result_batch)) + } else { + Ok(None) + } + } + + /// Process unmatched rows from the current right batch and reset the bitmap. + /// Returns a RecordBatch containing the unmatched right rows (None if empty). + fn process_right_unmatched(&mut self) -> Result> { + // ==== Take current right batch and its bitmap ==== + let right_batch_bitmap: BooleanArray = + std::mem::take(&mut self.current_right_batch_matched).ok_or_else(|| { + internal_datafusion_err!("right bitmap should be available") + })?; + + let right_batch = self.current_right_batch.take(); + let cur_right_batch = unwrap_or_internal_err!(right_batch); + + // ==== Setup unmatched indices ==== + // If Right Mark + // ---- + if self.join_type == JoinType::RightMark { + // For RightMark, output all right rows, left is null where bitmap is unset, right is 0..N + let right_row_count = cur_right_batch.num_rows(); + let right_indices = UInt64Array::from_iter_values(0..right_row_count as u64); + // TODO(now-perf): directly copy the null buffer to make this step + // faster + let mut left_indices_builder = UInt32Builder::new(); + for i in 0..right_row_count { + if right_batch_bitmap.value(i) { + left_indices_builder.append_value(i as u32); + } else { + left_indices_builder.append_null(); + } + } + let left_indices = left_indices_builder.finish(); + + let left_data = self.buffered_left_data.as_ref().ok_or_else(|| { + internal_datafusion_err!("LeftData should be available") + })?; + let left_batch = left_data.batch(); + let empty_left_batch = + RecordBatch::new_empty(Arc::clone(&left_batch.schema())); + + if right_indices.is_empty() { + return Ok(None); + } + + let result_batch = build_batch_from_indices( + &self.output_schema, + &cur_right_batch, // swapped: right is build side + &empty_left_batch, + &right_indices, + &left_indices, + &self.column_indices, + JoinSide::Right, + )?; + + self.current_right_batch_matched = None; + return Ok(Some(result_batch)); + } + + // Non Right Mark + // ---- + // TODO(polish): now the actual length of bitmap might be longer than + // the actual in-use. So we have to use right batch length here to + // iterate through the bitmap + let mut right_indices_builder = UInt32Builder::new(); + for i in 0..cur_right_batch.num_rows() { + let i_joined = right_batch_bitmap.value(i); + // TODO(polish): make those flips more understandable + let should_output = match self.join_type { + JoinType::Right => !i_joined, + JoinType::Full => !i_joined, + JoinType::RightAnti => !i_joined, + JoinType::RightMark => i_joined, + JoinType::RightSemi => i_joined, + _ => unreachable!("Not possible for other join types"), + }; + if should_output { + right_indices_builder.append_value(i as u32); + } + } + let right_indices = right_indices_builder.finish(); + let left_indices = UInt64Array::new_null(right_indices.len()); + + // ==== Build the output batch ==== + let left_data = self + .buffered_left_data + .as_ref() + .ok_or_else(|| internal_datafusion_err!("LeftData should be available"))?; + let left_batch = left_data.batch(); + let empty_left_batch = RecordBatch::new_empty(Arc::clone(&left_batch.schema())); + + let result_batch = if left_indices.is_empty() { + None + } else { + let result_batch = build_batch_from_indices( + &self.output_schema, + &empty_left_batch, + &cur_right_batch, + &left_indices, + &right_indices, + &self.column_indices, + JoinSide::Left, + )?; + Some(result_batch) + }; + + // ==== Clean-up ==== + self.current_right_batch_matched = None; + + Ok(result_batch) + } +} + +/// Apply the join filter between: +/// l_index th row in left_batch, with right batch +fn apply_join_filter_to_single_left_row( + left_batch: &RecordBatch, + l_index: usize, + right_batch: &RecordBatch, + filter: &JoinFilter, + _build_side: JoinSide, +) -> Result { + debug_assert!(left_batch.num_rows() != 0 && right_batch.num_rows() != 0); + + let intermediate_batch = if filter.schema.fields().is_empty() { + // If filter is constant (e.g. literal `true`), empty batch can be used + // in the later filter step. + let options = RecordBatchOptions::new() + .with_match_field_names(true) + .with_row_count(Some(right_batch.num_rows())); + + RecordBatch::try_new_with_options( + Arc::new((*filter.schema).clone()), + vec![], + &options, + )? + } else { + build_batch_for_single_left_row( + &filter.schema, + left_batch, + l_index, + right_batch, + None, + &filter.column_indices, + )? + }; + + let filter_result = filter + .expression() + .evaluate(&intermediate_batch)? + .into_array(intermediate_batch.num_rows())?; + + Ok(as_boolean_array(&filter_result)?.clone()) +} + +fn build_batch_for_single_left_row( + output_schema: &Schema, + left_batch: &RecordBatch, + l_index: usize, + right_batch: &RecordBatch, + right_batch_filter: Option, + col_indices: &[ColumnIndex], +) -> Result { + let right_row_count = right_batch.num_rows(); + if right_row_count == 0 { + return Ok(RecordBatch::new_empty(Arc::new(output_schema.clone()))); + } + + let filtered_right_batch = if let Some(filter) = right_batch_filter { + &filter_record_batch(right_batch, &filter)? + } else { + right_batch + }; + + if filtered_right_batch.num_rows() == 0 { + return Ok(RecordBatch::new_empty(Arc::new(output_schema.clone()))); + } + + // Build columns directly without index indirection + let mut columns: Vec> = + Vec::with_capacity(output_schema.fields().len()); + + for column_index in col_indices { + let array = if column_index.side == JoinSide::Left { + // Broadcast the single left row to match the filtered right batch length + let original_left_array = left_batch.column(column_index.index); + let scalar_value = + ScalarValue::try_from_array(original_left_array.as_ref(), l_index)?; + scalar_value.to_array_of_size(filtered_right_batch.num_rows())? + } else { + // Take the filtered right column using compute::take + Arc::clone(filtered_right_batch.column(column_index.index)) + }; + + columns.push(array); + } + + Ok(RecordBatch::try_new( + Arc::new(output_schema.clone()), + columns, + )?) +} diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 5d68ed35b2a98..569cce7eb227d 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4127,9 +4127,9 @@ logical_plan 03)----TableScan: left_table projection=[a, b, c] 04)----TableScan: right_table projection=[x, y, z] physical_plan -01)NestedLoopJoinExec: join_type=Inner, filter=a@0 < x@1 -02)--DataSourceExec: partitions=1, partition_sizes=[0] -03)--SortExec: expr=[x@0 ASC NULLS LAST], preserve_partitioning=[false] +01)SortExec: expr=[x@3 ASC NULLS LAST], preserve_partitioning=[false] +02)--NestedLoopJoinExec: join_type=Inner, filter=a@0 < x@1 +03)----DataSourceExec: partitions=1, partition_sizes=[0] 04)----DataSourceExec: partitions=1, partition_sizes=[0] query TT diff --git a/datafusion/sqllogictest/test_files/nlj_tmp.slt b/datafusion/sqllogictest/test_files/nlj_tmp.slt new file mode 100644 index 0000000000000..fcb75e7ab0de9 --- /dev/null +++ b/datafusion/sqllogictest/test_files/nlj_tmp.slt @@ -0,0 +1,612 @@ +# # Licensed to the Apache Software Foundation (ASF) under one +# # or more contributor license agreements. See the NOTICE file +# # distributed with this work for additional information +# # regarding copyright ownership. The ASF licenses this file +# # to you under the Apache License, Version 2.0 (the +# # "License"); you may not use this file except in compliance +# # with the License. You may obtain a copy of the License at + +# # http://www.apache.org/licenses/LICENSE-2.0 + +# # Unless required by applicable law or agreed to in writing, +# # software distributed under the License is distributed on an +# # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# # KIND, either express or implied. See the License for the +# # specific language governing permissions and limitations +# # under the License. + +# ########## +# ## Comprehensive NLJ (Nested Loop Join) Testing +# ## Testing all join types with various table size combinations +# ########## + +# # Test combinations: +# # Join types: inner, left, right, full, left semi, right semi, left anti, right anti, left mark, right mark +# # Table sizes: (10,10), (10,10000), (10000,10), (10000,10000), (1,1), (1,10), (10,1) + +# ####################### +# ## INNER JOIN TESTS +# ####################### + +# # Inner join: (10, 10) +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 100) as p +# from range(10) as t1(v1) +# inner join range(10) as t2(v1) +# on ((t1.v1 + t2.v1) % 100) = 42 +# ---- + +# # Inner join: (10, 10000) +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 10000) as p +# from range(10) as t1(v1) +# inner join range(10000) as t2(v1) +# on ((t1.v1 + t2.v1) % 10000) = 42 +# ---- +# 0 42 42 +# 1 41 42 +# 2 40 42 +# 3 39 42 +# 4 38 42 +# 5 37 42 +# 6 36 42 +# 7 35 42 +# 8 34 42 +# 9 33 42 + +# # Inner join: (10000, 10) +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 10000) as p +# from range(10000) as t1(v1) +# inner join range(10) as t2(v1) +# on ((t1.v1 + t2.v1) % 10000) = 42 +# ---- +# 33 9 42 +# 34 8 42 +# 35 7 42 +# 36 6 42 +# 37 5 42 +# 38 4 42 +# 39 3 42 +# 40 2 42 +# 41 1 42 +# 42 0 42 + +# # Inner join: (10000, 10000) - high selectivity +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 100000) as p +# from range(10000) as t1(v1) +# inner join range(10000) as t2(v1) +# on ((t1.v1 + t2.v1) % 100000) = 42 +# ---- +# 0 42 42 +# 1 41 42 +# 10 32 42 +# 11 31 42 +# 12 30 42 +# 13 29 42 +# 14 28 42 +# 15 27 42 +# 16 26 42 +# 17 25 42 +# 18 24 42 +# 19 23 42 +# 2 40 42 +# 20 22 42 +# 21 21 42 +# 22 20 42 +# 23 19 42 +# 24 18 42 +# 25 17 42 +# 26 16 42 +# 27 15 42 +# 28 14 42 +# 29 13 42 +# 3 39 42 +# 30 12 42 +# 31 11 42 +# 32 10 42 +# 33 9 42 +# 34 8 42 +# 35 7 42 +# 36 6 42 +# 37 5 42 +# 38 4 42 +# 39 3 42 +# 4 38 42 +# 40 2 42 +# 41 1 42 +# 42 0 42 +# 5 37 42 +# 6 36 42 +# 7 35 42 +# 8 34 42 +# 9 33 42 + +# # Inner join: (1, 1) +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 10) as p +# from range(1) as t1(v1) +# inner join range(1) as t2(v1) +# on ((t1.v1 + t2.v1) % 10) = 2 +# ---- + +# # Inner join: (1, 10) +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 10) as p +# from range(1) as t1(v1) +# inner join range(10) as t2(v1) +# on ((t1.v1 + t2.v1) % 10) = 2 +# ---- +# 0 2 2 + +# # Inner join: (10, 1) +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 10) as p +# from range(10) as t1(v1) +# inner join range(1) as t2(v1) +# on ((t1.v1 + t2.v1) % 10) = 2 +# ---- +# 2 0 2 + +# ####################### +# ## LEFT JOIN TESTS +# ####################### + +# # Left join: (10, 10) +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 100) as p +# from range(10) as t1(v1) +# left join range(10) as t2(v1) +# on ((t1.v1 + t2.v1) % 100) = 42 +# ---- +# 0 NULL NULL +# 1 NULL NULL +# 2 NULL NULL +# 3 NULL NULL +# 4 NULL NULL +# 5 NULL NULL +# 6 NULL NULL +# 7 NULL NULL +# 8 NULL NULL +# 9 NULL NULL + +# # Left join: (10, 10000) +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 10000) as p +# from range(10) as t1(v1) +# left join range(10000) as t2(v1) +# on ((t1.v1 + t2.v1) % 10000) = 42 +# ---- +# 0 42 42 +# 1 41 42 +# 2 40 42 +# 3 39 42 +# 4 38 42 +# 5 37 42 +# 6 36 42 +# 7 35 42 +# 8 34 42 +# 9 33 42 + +# # Left join: (10000, 10) +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 10000) as p +# from range(10000) as t1(v1) +# left join range(10) as t2(v1) +# on ((t1.v1 + t2.v1) % 10000) = 42 +# where (t1.v1 IS NOT NULL) AND (t2.v1 IS NOT NULL); +# ---- +# 33 9 42 +# 34 8 42 +# 35 7 42 +# 36 6 42 +# 37 5 42 +# 38 4 42 +# 39 3 42 +# 40 2 42 +# 41 1 42 +# 42 0 42 + +# # Left join: (10000, 10000) - high selectivity +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 100000) as p +# from range(10000) as t1(v1) +# left join range(10000) as t2(v1) +# on ((t1.v1 + t2.v1) % 100000) = 42 +# where (t1.v1 IS NOT NULL) AND (t2.v1 IS NOT NULL); +# ---- +# 0 42 42 +# 1 41 42 +# 10 32 42 +# 11 31 42 +# 12 30 42 +# 13 29 42 +# 14 28 42 +# 15 27 42 +# 16 26 42 +# 17 25 42 +# 18 24 42 +# 19 23 42 +# 2 40 42 +# 20 22 42 +# 21 21 42 +# 22 20 42 +# 23 19 42 +# 24 18 42 +# 25 17 42 +# 26 16 42 +# 27 15 42 +# 28 14 42 +# 29 13 42 +# 3 39 42 +# 30 12 42 +# 31 11 42 +# 32 10 42 +# 33 9 42 +# 34 8 42 +# 35 7 42 +# 36 6 42 +# 37 5 42 +# 38 4 42 +# 39 3 42 +# 4 38 42 +# 40 2 42 +# 41 1 42 +# 42 0 42 +# 5 37 42 +# 6 36 42 +# 7 35 42 +# 8 34 42 +# 9 33 42 + +# # Left join: (1, 1) +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 10) as p +# from range(1) as t1(v1) +# left join range(1) as t2(v1) +# on ((t1.v1 + t2.v1) % 10) = 2 +# ---- +# 0 NULL NULL + +# # Left join: (1, 10) +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 10) as p +# from range(1) as t1(v1) +# left join range(10) as t2(v1) +# on ((t1.v1 + t2.v1) % 10) = 2 +# ---- +# 0 2 2 + +# # Left join: (10, 1) +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 10) as p +# from range(10) as t1(v1) +# left join range(1) as t2(v1) +# on ((t1.v1 + t2.v1) % 10) = 2 +# ---- +# 0 NULL NULL +# 1 NULL NULL +# 2 0 2 +# 3 NULL NULL +# 4 NULL NULL +# 5 NULL NULL +# 6 NULL NULL +# 7 NULL NULL +# 8 NULL NULL +# 9 NULL NULL + +# ####################### +# ## RIGHT JOIN TESTS +# ####################### + +# # Right join: (10, 10) +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 100) as p +# from range(10) as t1(v1) +# right join range(10) as t2(v1) +# on ((t1.v1 + t2.v1) % 100) = 42 +# ---- +# NULL 0 NULL +# NULL 1 NULL +# NULL 2 NULL +# NULL 3 NULL +# NULL 4 NULL +# NULL 5 NULL +# NULL 6 NULL +# NULL 7 NULL +# NULL 8 NULL +# NULL 9 NULL + +# # Right join: (10, 10000) +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 10000) as p +# from range(10) as t1(v1) +# right join range(10000) as t2(v1) +# on ((t1.v1 + t2.v1) % 10000) = 42 +# where (t1.v1 is not null) and (t2.v1 is not null) +# ---- +# 0 42 42 +# 1 41 42 +# 2 40 42 +# 3 39 42 +# 4 38 42 +# 5 37 42 +# 6 36 42 +# 7 35 42 +# 8 34 42 +# 9 33 42 + +# # Right join: (10000, 10) +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 10000) as p +# from range(10000) as t1(v1) +# right join range(10) as t2(v1) +# on ((t1.v1 + t2.v1) % 10000) = 42 +# ---- +# 33 9 42 +# 34 8 42 +# 35 7 42 +# 36 6 42 +# 37 5 42 +# 38 4 42 +# 39 3 42 +# 40 2 42 +# 41 1 42 +# 42 0 42 + +# # Right join: (10000, 10000) - high selectivity +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 100000) as p +# from range(10000) as t1(v1) +# right join range(10000) as t2(v1) +# on ((t1.v1 + t2.v1) % 100000) = 42 +# where (t1.v1 is not null) and (t2.v1 is not null) +# ---- +# 0 42 42 +# 1 41 42 +# 10 32 42 +# 11 31 42 +# 12 30 42 +# 13 29 42 +# 14 28 42 +# 15 27 42 +# 16 26 42 +# 17 25 42 +# 18 24 42 +# 19 23 42 +# 2 40 42 +# 20 22 42 +# 21 21 42 +# 22 20 42 +# 23 19 42 +# 24 18 42 +# 25 17 42 +# 26 16 42 +# 27 15 42 +# 28 14 42 +# 29 13 42 +# 3 39 42 +# 30 12 42 +# 31 11 42 +# 32 10 42 +# 33 9 42 +# 34 8 42 +# 35 7 42 +# 36 6 42 +# 37 5 42 +# 38 4 42 +# 39 3 42 +# 4 38 42 +# 40 2 42 +# 41 1 42 +# 42 0 42 +# 5 37 42 +# 6 36 42 +# 7 35 42 +# 8 34 42 +# 9 33 42 + +# # Right join: (1, 1) +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 10) as p +# from range(1) as t1(v1) +# right join range(1) as t2(v1) +# on ((t1.v1 + t2.v1) % 10) = 2 +# ---- +# NULL 0 NULL + +# # Right join: (1, 10) +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 10) as p +# from range(1) as t1(v1) +# right join range(10) as t2(v1) +# on ((t1.v1 + t2.v1) % 10) = 2 +# ---- +# 0 2 2 +# NULL 0 NULL +# NULL 1 NULL +# NULL 3 NULL +# NULL 4 NULL +# NULL 5 NULL +# NULL 6 NULL +# NULL 7 NULL +# NULL 8 NULL +# NULL 9 NULL + +# # Right join: (10, 1) +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 10) as p +# from range(10) as t1(v1) +# right join range(1) as t2(v1) +# on ((t1.v1 + t2.v1) % 10) = 2 +# ---- +# 2 0 2 + +# ####################### +# ## FULL JOIN TESTS +# ####################### + +# # Full join: (10, 10) +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 100) as p +# from range(10) as t1(v1) +# full join range(10) as t2(v1) +# on ((t1.v1 + t2.v1) % 100) = 42 +# ---- +# 0 NULL NULL +# 1 NULL NULL +# 2 NULL NULL +# 3 NULL NULL +# 4 NULL NULL +# 5 NULL NULL +# 6 NULL NULL +# 7 NULL NULL +# 8 NULL NULL +# 9 NULL NULL +# NULL 0 NULL +# NULL 1 NULL +# NULL 2 NULL +# NULL 3 NULL +# NULL 4 NULL +# NULL 5 NULL +# NULL 6 NULL +# NULL 7 NULL +# NULL 8 NULL +# NULL 9 NULL + +# # Full join: (10, 10000) +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 10000) as p +# from range(10) as t1(v1) +# full join range(10000) as t2(v1) +# on ((t1.v1 + t2.v1) % 10000) = 42 +# where (t1.v1 is not null) and (t2.v1 is not null) +# ---- +# 0 42 42 +# 1 41 42 +# 2 40 42 +# 3 39 42 +# 4 38 42 +# 5 37 42 +# 6 36 42 +# 7 35 42 +# 8 34 42 +# 9 33 42 + +# # Full join: (10000, 10) +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 10000) as p +# from range(10000) as t1(v1) +# full join range(10) as t2(v1) +# on ((t1.v1 + t2.v1) % 10000) = 42 +# where (t1.v1 is not null) and (t2.v1 is not null) +# ---- +# 33 9 42 +# 34 8 42 +# 35 7 42 +# 36 6 42 +# 37 5 42 +# 38 4 42 +# 39 3 42 +# 40 2 42 +# 41 1 42 +# 42 0 42 + +# # Full join: (10000, 10000) - high selectivity +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 100000) as p +# from range(10000) as t1(v1) +# full join range(10000) as t2(v1) +# on ((t1.v1 + t2.v1) % 100000) = 42 +# where (t1.v1 is not null) and (t2.v1 is not null) +# ---- +# 0 42 42 +# 1 41 42 +# 10 32 42 +# 11 31 42 +# 12 30 42 +# 13 29 42 +# 14 28 42 +# 15 27 42 +# 16 26 42 +# 17 25 42 +# 18 24 42 +# 19 23 42 +# 2 40 42 +# 20 22 42 +# 21 21 42 +# 22 20 42 +# 23 19 42 +# 24 18 42 +# 25 17 42 +# 26 16 42 +# 27 15 42 +# 28 14 42 +# 29 13 42 +# 3 39 42 +# 30 12 42 +# 31 11 42 +# 32 10 42 +# 33 9 42 +# 34 8 42 +# 35 7 42 +# 36 6 42 +# 37 5 42 +# 38 4 42 +# 39 3 42 +# 4 38 42 +# 40 2 42 +# 41 1 42 +# 42 0 42 +# 5 37 42 +# 6 36 42 +# 7 35 42 +# 8 34 42 +# 9 33 42 + +# # Full join: (1, 1) +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 10) as p +# from range(1) as t1(v1) +# full join range(1) as t2(v1) +# on ((t1.v1 + t2.v1) % 10) = 2 +# ---- +# 0 NULL NULL +# NULL 0 NULL + +# # Full join: (1, 10) +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 10) as p +# from range(1) as t1(v1) +# full join range(10) as t2(v1) +# on ((t1.v1 + t2.v1) % 10) = 2 +# ---- +# 0 2 2 +# NULL 0 NULL +# NULL 1 NULL +# NULL 3 NULL +# NULL 4 NULL +# NULL 5 NULL +# NULL 6 NULL +# NULL 7 NULL +# NULL 8 NULL +# NULL 9 NULL + +# # Full join: (10, 1) +# query III rowsort +# select t1.v1, t2.v1, ((t1.v1 + t2.v1) % 10) as p +# from range(10) as t1(v1) +# full join range(1) as t2(v1) +# on ((t1.v1 + t2.v1) % 10) = 2 +# ---- +# 0 NULL NULL +# 1 NULL NULL +# 2 0 2 +# 3 NULL NULL +# 4 NULL NULL +# 5 NULL NULL +# 6 NULL NULL +# 7 NULL NULL +# 8 NULL NULL +# 9 NULL NULL