diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 7868a7f9e59c7..ff009a01645f2 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -688,11 +688,11 @@ impl SessionContext { LogicalPlan::Statement(Statement::SetVariable(stmt)) => { self.set_variable(stmt).await } - LogicalPlan::Prepare(Prepare { + LogicalPlan::Statement(Statement::Prepare(Prepare { name, input, data_types, - }) => { + })) => { // The number of parameters must match the specified data types length. if !data_types.is_empty() { let param_names = input.get_parameter_names()?; @@ -712,7 +712,9 @@ impl SessionContext { self.state.write().store_prepared(name, data_types, input)?; self.return_empty_dataframe() } - LogicalPlan::Execute(execute) => self.execute_prepared(execute), + LogicalPlan::Statement(Statement::Execute(execute)) => { + self.execute_prepared(execute) + } plan => Ok(DataFrame::new(self.state(), plan)), } } @@ -1773,14 +1775,6 @@ impl<'n, 'a> TreeNodeVisitor<'n> for BadPlanVisitor<'a> { LogicalPlan::Statement(stmt) if !self.options.allow_statements => { plan_err!("Statement not supported: {}", stmt.name()) } - // TODO: Implement PREPARE as a LogicalPlan::Statement - LogicalPlan::Prepare(_) if !self.options.allow_statements => { - plan_err!("Statement not supported: PREPARE") - } - // TODO: Implement EXECUTE as a LogicalPlan::Statement - LogicalPlan::Execute(_) if !self.options.allow_statements => { - plan_err!("Statement not supported: EXECUTE") - } _ => Ok(TreeNodeRecursion::Continue), } } diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 7d475ad2e2a1a..2d3899adb00e1 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1195,15 +1195,6 @@ impl DefaultPhysicalPlanner { let name = statement.name(); return not_impl_err!("Unsupported logical plan: Statement({name})"); } - LogicalPlan::Prepare(_) => { - // There is no default plan for "PREPARE" -- it must be - // handled at a higher level (so that the appropriate - // statement can be prepared) - return not_impl_err!("Unsupported logical plan: Prepare"); - } - LogicalPlan::Execute(_) => { - return not_impl_err!("Unsupported logical plan: Execute"); - } LogicalPlan::Dml(dml) => { // DataFusion is a read-only query engine, but also a library, so consumers may implement this return not_impl_err!("Unsupported logical plan: Dml({0})", dml.op); diff --git a/datafusion/core/tests/sql/sql_api.rs b/datafusion/core/tests/sql/sql_api.rs index b2ffefa43708f..034d6fa23d9cb 100644 --- a/datafusion/core/tests/sql/sql_api.rs +++ b/datafusion/core/tests/sql/sql_api.rs @@ -124,12 +124,12 @@ async fn disable_prepare_and_execute_statement() { let df = ctx.sql_with_options(prepare_sql, options).await; assert_eq!( df.unwrap_err().strip_backtrace(), - "Error during planning: Statement not supported: PREPARE" + "Error during planning: Statement not supported: Prepare" ); let df = ctx.sql_with_options(execute_sql, options).await; assert_eq!( df.unwrap_err().strip_backtrace(), - "Error during planning: Statement not supported: EXECUTE" + "Error during planning: Statement not supported: Execute" ); let options = options.with_allow_statements(true); diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index c86696854ca38..b944428977c4c 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -314,7 +314,7 @@ impl NamePreserver { | LogicalPlan::Join(_) | LogicalPlan::TableScan(_) | LogicalPlan::Limit(_) - | LogicalPlan::Execute(_) + | LogicalPlan::Statement(_) ), } } diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index beb729fc7c179..eb19c760e1dc8 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -42,7 +42,7 @@ use crate::utils::{ }; use crate::{ and, binary_expr, lit, DmlStatement, Expr, ExprSchemable, Operator, RecursiveQuery, - TableProviderFilterPushDown, TableSource, WriteOp, + Statement, TableProviderFilterPushDown, TableSource, WriteOp, }; use super::dml::InsertOp; @@ -500,11 +500,13 @@ impl LogicalPlanBuilder { /// Make a builder for a prepare logical plan from the builder's plan pub fn prepare(self, name: String, data_types: Vec) -> Result { - Ok(Self::new(LogicalPlan::Prepare(Prepare { - name, - data_types, - input: self.plan, - }))) + Ok(Self::new(LogicalPlan::Statement(Statement::Prepare( + Prepare { + name, + data_types, + input: self.plan, + }, + )))) } /// Limit the number of rows returned diff --git a/datafusion/expr/src/logical_plan/display.rs b/datafusion/expr/src/logical_plan/display.rs index 84efd85419408..b808defcb959c 100644 --- a/datafusion/expr/src/logical_plan/display.rs +++ b/datafusion/expr/src/logical_plan/display.rs @@ -21,10 +21,10 @@ use std::collections::HashMap; use std::fmt; use crate::{ - expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Execute, - Expr, Filter, Join, Limit, LogicalPlan, Partitioning, Prepare, Projection, - RecursiveQuery, Repartition, Sort, Subquery, SubqueryAlias, - TableProviderFilterPushDown, TableScan, Unnest, Values, Window, + expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Expr, + Filter, Join, Limit, LogicalPlan, Partitioning, Projection, RecursiveQuery, + Repartition, Sort, Subquery, SubqueryAlias, TableProviderFilterPushDown, TableScan, + Unnest, Values, Window, }; use crate::dml::CopyTo; @@ -618,24 +618,6 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { "Detail": format!("{:?}", e.node) }) } - LogicalPlan::Prepare(Prepare { - name, data_types, .. - }) => { - json!({ - "Node Type": "Prepare", - "Name": name, - "Data Types": format!("{:?}", data_types) - }) - } - LogicalPlan::Execute(Execute { - name, parameters, .. - }) => { - json!({ - "Node Type": "Execute", - "Name": name, - "Parameters": expr_vec_fmt!(parameters), - }) - } LogicalPlan::DescribeTable(DescribeTable { .. }) => { json!({ "Node Type": "DescribeTable" diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index 59654a2278290..b5bd2e0128717 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -36,14 +36,14 @@ pub use ddl::{ pub use dml::{DmlStatement, WriteOp}; pub use plan::{ projection_schema, Aggregate, Analyze, ColumnUnnestList, DescribeTable, Distinct, - DistinctOn, EmptyRelation, Execute, Explain, Extension, FetchType, Filter, Join, - JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare, - Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery, + DistinctOn, EmptyRelation, Explain, Extension, FetchType, Filter, Join, + JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Projection, + RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window, }; pub use statement::{ - SetVariable, Statement, TransactionAccessMode, TransactionConclusion, TransactionEnd, - TransactionIsolationLevel, TransactionStart, + Execute, Prepare, SetVariable, Statement, TransactionAccessMode, + TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart, }; pub use display::display_schema; diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index db309d9b52322..56e13b356b478 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -39,9 +39,9 @@ use crate::utils::{ split_conjunction, }; use crate::{ - build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Expr, - ExprSchemable, LogicalPlanBuilder, Operator, TableProviderFilterPushDown, - TableSource, WindowFunctionDefinition, + build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Execute, + Expr, ExprSchemable, LogicalPlanBuilder, Operator, Prepare, + TableProviderFilterPushDown, TableSource, WindowFunctionDefinition, }; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; @@ -262,11 +262,6 @@ pub enum LogicalPlan { /// Remove duplicate rows from the input. This is used to /// implement SQL `SELECT DISTINCT ...`. Distinct(Distinct), - /// Prepare a statement and find any bind parameters - /// (e.g. `?`). This is used to implement SQL-prepared statements. - Prepare(Prepare), - /// Execute a prepared statement. This is used to implement SQL 'EXECUTE'. - Execute(Execute), /// Data Manipulation Language (DML): Insert / Update / Delete Dml(DmlStatement), /// Data Definition Language (DDL): CREATE / DROP TABLES / VIEWS / SCHEMAS @@ -314,8 +309,6 @@ impl LogicalPlan { LogicalPlan::Statement(statement) => statement.schema(), LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(), LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema, - LogicalPlan::Prepare(Prepare { input, .. }) => input.schema(), - LogicalPlan::Execute(Execute { schema, .. }) => schema, LogicalPlan::Explain(explain) => &explain.schema, LogicalPlan::Analyze(analyze) => &analyze.schema, LogicalPlan::Extension(extension) => extension.node.schema(), @@ -448,18 +441,16 @@ impl LogicalPlan { LogicalPlan::Copy(copy) => vec![©.input], LogicalPlan::Ddl(ddl) => ddl.inputs(), LogicalPlan::Unnest(Unnest { input, .. }) => vec![input], - LogicalPlan::Prepare(Prepare { input, .. }) => vec![input], LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, recursive_term, .. }) => vec![static_term, recursive_term], + LogicalPlan::Statement(stmt) => stmt.inputs(), // plans without inputs LogicalPlan::TableScan { .. } - | LogicalPlan::Statement { .. } | LogicalPlan::EmptyRelation { .. } | LogicalPlan::Values { .. } - | LogicalPlan::Execute { .. } | LogicalPlan::DescribeTable(_) => vec![], } } @@ -562,8 +553,6 @@ impl LogicalPlan { } LogicalPlan::Subquery(_) => Ok(None), LogicalPlan::EmptyRelation(_) - | LogicalPlan::Prepare(_) - | LogicalPlan::Execute(_) | LogicalPlan::Statement(_) | LogicalPlan::Values(_) | LogicalPlan::Explain(_) @@ -715,8 +704,6 @@ impl LogicalPlan { LogicalPlan::RecursiveQuery(_) => Ok(self), LogicalPlan::Analyze(_) => Ok(self), LogicalPlan::Explain(_) => Ok(self), - LogicalPlan::Prepare(_) => Ok(self), - LogicalPlan::Execute(_) => Ok(self), LogicalPlan::TableScan(_) => Ok(self), LogicalPlan::EmptyRelation(_) => Ok(self), LogicalPlan::Statement(_) => Ok(self), @@ -1070,24 +1057,25 @@ impl LogicalPlan { logical_optimization_succeeded: e.logical_optimization_succeeded, })) } - LogicalPlan::Prepare(Prepare { - name, data_types, .. - }) => { + LogicalPlan::Statement(Statement::Prepare(Prepare { + name, + data_types, + .. + })) => { self.assert_no_expressions(expr)?; let input = self.only_input(inputs)?; - Ok(LogicalPlan::Prepare(Prepare { + Ok(LogicalPlan::Statement(Statement::Prepare(Prepare { name: name.clone(), data_types: data_types.clone(), input: Arc::new(input), - })) + }))) } - LogicalPlan::Execute(Execute { name, schema, .. }) => { + LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => { self.assert_no_inputs(inputs)?; - Ok(LogicalPlan::Execute(Execute { + Ok(LogicalPlan::Statement(Statement::Execute(Execute { name: name.clone(), - schema: Arc::clone(schema), parameters: expr, - })) + }))) } LogicalPlan::TableScan(ts) => { self.assert_no_inputs(inputs)?; @@ -1184,8 +1172,8 @@ impl LogicalPlan { /// Replaces placeholder param values (like `$1`, `$2`) in [`LogicalPlan`] /// with the specified `param_values`. /// - /// [`LogicalPlan::Prepare`] are - /// converted to their inner logical plan for execution. + /// [`Prepare`] statements are converted to + /// their inner logical plan for execution. /// /// # Example /// ``` @@ -1242,13 +1230,17 @@ impl LogicalPlan { let plan_with_values = self.replace_params_with_values(¶m_values)?; // unwrap Prepare - Ok(if let LogicalPlan::Prepare(prepare_lp) = plan_with_values { - param_values.verify(&prepare_lp.data_types)?; - // try and take ownership of the input if is not shared, clone otherwise - Arc::unwrap_or_clone(prepare_lp.input) - } else { - plan_with_values - }) + Ok( + if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) = + plan_with_values + { + param_values.verify(&prepare_lp.data_types)?; + // try and take ownership of the input if is not shared, clone otherwise + Arc::unwrap_or_clone(prepare_lp.input) + } else { + plan_with_values + }, + ) } /// Returns the maximum number of rows that this plan can output, if known. @@ -1346,8 +1338,6 @@ impl LogicalPlan { | LogicalPlan::Dml(_) | LogicalPlan::Copy(_) | LogicalPlan::DescribeTable(_) - | LogicalPlan::Prepare(_) - | LogicalPlan::Execute(_) | LogicalPlan::Statement(_) | LogicalPlan::Extension(_) => None, } @@ -1962,14 +1952,6 @@ impl LogicalPlan { LogicalPlan::Analyze { .. } => write!(f, "Analyze"), LogicalPlan::Union(_) => write!(f, "Union"), LogicalPlan::Extension(e) => e.node.fmt_for_explain(f), - LogicalPlan::Prepare(Prepare { - name, data_types, .. - }) => { - write!(f, "Prepare: {name:?} {data_types:?} ") - } - LogicalPlan::Execute(Execute { name, parameters, .. }) => { - write!(f, "Execute: {} params=[{}]", name, expr_vec_fmt!(parameters)) - } LogicalPlan::DescribeTable(DescribeTable { .. }) => { write!(f, "DescribeTable") } @@ -2624,39 +2606,6 @@ impl PartialOrd for Union { } } -/// Prepare a statement but do not execute it. Prepare statements can have 0 or more -/// `Expr::Placeholder` expressions that are filled in during execution -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)] -pub struct Prepare { - /// The name of the statement - pub name: String, - /// Data types of the parameters ([`Expr::Placeholder`]) - pub data_types: Vec, - /// The logical plan of the statements - pub input: Arc, -} - -/// Execute a prepared statement. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct Execute { - /// The name of the prepared statement to execute - pub name: String, - /// The execute parameters - pub parameters: Vec, - /// Dummy schema - pub schema: DFSchemaRef, -} - -// Comparison excludes the `schema` field. -impl PartialOrd for Execute { - fn partial_cmp(&self, other: &Self) -> Option { - match self.name.partial_cmp(&other.name) { - Some(Ordering::Equal) => self.parameters.partial_cmp(&other.parameters), - cmp => cmp, - } - } -} - /// Describe the schema of table /// /// # Example output: diff --git a/datafusion/expr/src/logical_plan/statement.rs b/datafusion/expr/src/logical_plan/statement.rs index 7ad18ce7bbf77..9ba8170f8eb27 100644 --- a/datafusion/expr/src/logical_plan/statement.rs +++ b/datafusion/expr/src/logical_plan/statement.rs @@ -15,17 +15,26 @@ // specific language governing permissions and limitations // under the License. -use datafusion_common::DFSchemaRef; -use std::cmp::Ordering; +use arrow::datatypes::DataType; +use datafusion_common::tree_node::{Transformed, TreeNodeIterator}; +use datafusion_common::{DFSchema, DFSchemaRef, Result}; use std::fmt::{self, Display}; +use std::sync::{Arc, OnceLock}; + +use super::tree_node::rewrite_arc; +use crate::{expr_vec_fmt, Expr, LogicalPlan}; + +/// Statements have a unchanging empty schema. +/// TODO: Use `LazyLock` when MSRV is 1.80.0 +static STATEMENT_EMPTY_SCHEMA: OnceLock = OnceLock::new(); /// Various types of Statements. /// /// # Transactions: /// /// While DataFusion does not offer support transactions, it provides -/// [`LogicalPlan`](crate::LogicalPlan) support to assist building -/// database systems using DataFusion +/// [`LogicalPlan`] support to assist building database systems +/// using DataFusion #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)] pub enum Statement { // Begin a transaction @@ -34,16 +43,17 @@ pub enum Statement { TransactionEnd(TransactionEnd), /// Set a Variable SetVariable(SetVariable), + /// Prepare a statement and find any bind parameters + /// (e.g. `?`). This is used to implement SQL-prepared statements. + Prepare(Prepare), + /// Execute a prepared statement. This is used to implement SQL 'EXECUTE'. + Execute(Execute), } impl Statement { /// Get a reference to the logical plan's schema pub fn schema(&self) -> &DFSchemaRef { - match self { - Statement::TransactionStart(TransactionStart { schema, .. }) => schema, - Statement::TransactionEnd(TransactionEnd { schema, .. }) => schema, - Statement::SetVariable(SetVariable { schema, .. }) => schema, - } + STATEMENT_EMPTY_SCHEMA.get_or_init(|| Arc::new(DFSchema::empty())) } /// Return a descriptive string describing the type of this @@ -53,6 +63,63 @@ impl Statement { Statement::TransactionStart(_) => "TransactionStart", Statement::TransactionEnd(_) => "TransactionEnd", Statement::SetVariable(_) => "SetVariable", + Statement::Prepare(_) => "Prepare", + Statement::Execute(_) => "Execute", + } + } + + /// Returns input LogicalPlans in the current `Statement`. + pub(super) fn inputs(&self) -> Vec<&LogicalPlan> { + match self { + Statement::Prepare(Prepare { input, .. }) => vec![input.as_ref()], + _ => vec![], + } + } + + /// Rewrites input LogicalPlans in the current `Statement` using `f`. + pub(super) fn map_inputs< + F: FnMut(LogicalPlan) -> Result>, + >( + self, + f: F, + ) -> Result> { + match self { + Statement::Prepare(Prepare { + input, + name, + data_types, + }) => Ok(rewrite_arc(input, f)?.update_data(|input| { + Statement::Prepare(Prepare { + input, + name, + data_types, + }) + })), + _ => Ok(Transformed::no(self)), + } + } + + /// Returns a iterator over all expressions in the current `Statement`. + pub(super) fn expression_iter(&self) -> impl Iterator { + match self { + Statement::Execute(Execute { parameters, .. }) => parameters.iter(), + _ => [].iter(), + } + } + + /// Rewrites all expressions in the current `Statement` using `f`. + pub(super) fn map_expressions Result>>( + self, + f: F, + ) -> Result> { + match self { + Statement::Execute(Execute { name, parameters }) => Ok(parameters + .into_iter() + .map_until_stop_and_collect(f)? + .update_data(|parameters| { + Statement::Execute(Execute { parameters, name }) + })), + _ => Ok(Transformed::no(self)), } } @@ -85,6 +152,21 @@ impl Statement { }) => { write!(f, "SetVariable: set {variable:?} to {value:?}") } + Statement::Prepare(Prepare { + name, data_types, .. + }) => { + write!(f, "Prepare: {name:?} {data_types:?} ") + } + Statement::Execute(Execute { + name, parameters, .. + }) => { + write!( + f, + "Execute: {} params=[{}]", + name, + expr_vec_fmt!(parameters) + ) + } } } } @@ -116,67 +198,50 @@ pub enum TransactionIsolationLevel { } /// Indicator that the following statements should be committed or rolled back atomically -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Hash)] pub struct TransactionStart { /// indicates if transaction is allowed to write pub access_mode: TransactionAccessMode, // indicates ANSI isolation level pub isolation_level: TransactionIsolationLevel, - /// Empty schema - pub schema: DFSchemaRef, -} - -// Manual implementation needed because of `schema` field. Comparison excludes this field. -impl PartialOrd for TransactionStart { - fn partial_cmp(&self, other: &Self) -> Option { - match self.access_mode.partial_cmp(&other.access_mode) { - Some(Ordering::Equal) => { - self.isolation_level.partial_cmp(&other.isolation_level) - } - cmp => cmp, - } - } } /// Indicator that any current transaction should be terminated -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Hash)] pub struct TransactionEnd { /// whether the transaction committed or aborted pub conclusion: TransactionConclusion, /// if specified a new transaction is immediately started with same characteristics pub chain: bool, - /// Empty schema - pub schema: DFSchemaRef, -} - -// Manual implementation needed because of `schema` field. Comparison excludes this field. -impl PartialOrd for TransactionEnd { - fn partial_cmp(&self, other: &Self) -> Option { - match self.conclusion.partial_cmp(&other.conclusion) { - Some(Ordering::Equal) => self.chain.partial_cmp(&other.chain), - cmp => cmp, - } - } } /// Set a Variable's value -- value in /// [`ConfigOptions`](datafusion_common::config::ConfigOptions) -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Hash)] pub struct SetVariable { /// The variable name pub variable: String, /// The value to set pub value: String, - /// Dummy schema - pub schema: DFSchemaRef, } -// Manual implementation needed because of `schema` field. Comparison excludes this field. -impl PartialOrd for SetVariable { - fn partial_cmp(&self, other: &Self) -> Option { - match self.variable.partial_cmp(&other.value) { - Some(Ordering::Equal) => self.value.partial_cmp(&other.value), - cmp => cmp, - } - } +/// Prepare a statement but do not execute it. Prepare statements can have 0 or more +/// `Expr::Placeholder` expressions that are filled in during execution +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)] +pub struct Prepare { + /// The name of the statement + pub name: String, + /// Data types of the parameters ([`Expr::Placeholder`]) + pub data_types: Vec, + /// The logical plan of the statements + pub input: Arc, +} + +/// Execute a prepared statement. +#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Hash)] +pub struct Execute { + /// The name of the prepared statement to execute + pub name: String, + /// The execute parameters + pub parameters: Vec, } diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index ff2c1ec1d58f9..d16fe42098f51 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -38,10 +38,9 @@ //! * [`LogicalPlan::expressions`]: Return a copy of the plan's expressions use crate::{ dml::CopyTo, Aggregate, Analyze, CreateMemoryTable, CreateView, DdlStatement, - Distinct, DistinctOn, DmlStatement, Execute, Explain, Expr, Extension, Filter, Join, - Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition, - Sort, Subquery, SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode, - Values, Window, + Distinct, DistinctOn, DmlStatement, Explain, Expr, Extension, Filter, Join, Limit, + LogicalPlan, Partitioning, Projection, RecursiveQuery, Repartition, Sort, Subquery, + SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode, Values, Window, }; use std::ops::Deref; use std::sync::Arc; @@ -329,17 +328,6 @@ impl TreeNode for LogicalPlan { options, }) }), - LogicalPlan::Prepare(Prepare { - name, - data_types, - input, - }) => rewrite_arc(input, f)?.update_data(|input| { - LogicalPlan::Prepare(Prepare { - name, - data_types, - input, - }) - }), LogicalPlan::RecursiveQuery(RecursiveQuery { name, static_term, @@ -358,19 +346,20 @@ impl TreeNode for LogicalPlan { is_distinct, }) }), + LogicalPlan::Statement(stmt) => { + stmt.map_inputs(f)?.update_data(LogicalPlan::Statement) + } // plans without inputs LogicalPlan::TableScan { .. } - | LogicalPlan::Statement { .. } | LogicalPlan::EmptyRelation { .. } | LogicalPlan::Values { .. } - | LogicalPlan::Execute { .. } | LogicalPlan::DescribeTable(_) => Transformed::no(self), }) } } /// Applies `f` to rewrite a `Arc` without copying, if possible -fn rewrite_arc Result>>( +pub(super) fn rewrite_arc Result>>( plan: Arc, mut f: F, ) -> Result>> { @@ -506,15 +495,12 @@ impl LogicalPlan { .chain(fetch.iter()) .map(|e| e.deref()) .apply_until_stop(f), - LogicalPlan::Execute(Execute { parameters, .. }) => { - parameters.iter().apply_until_stop(f) - } + LogicalPlan::Statement(stmt) => stmt.expression_iter().apply_until_stop(f), // plans without expressions LogicalPlan::EmptyRelation(_) | LogicalPlan::RecursiveQuery(_) | LogicalPlan::Subquery(_) | LogicalPlan::SubqueryAlias(_) - | LogicalPlan::Statement(_) | LogicalPlan::Analyze(_) | LogicalPlan::Explain(_) | LogicalPlan::Union(_) @@ -522,8 +508,7 @@ impl LogicalPlan { | LogicalPlan::Dml(_) | LogicalPlan::Ddl(_) | LogicalPlan::Copy(_) - | LogicalPlan::DescribeTable(_) - | LogicalPlan::Prepare(_) => Ok(TreeNodeRecursion::Continue), + | LogicalPlan::DescribeTable(_) => Ok(TreeNodeRecursion::Continue), } } @@ -738,27 +723,15 @@ impl LogicalPlan { }) }) } - LogicalPlan::Execute(Execute { - parameters, - name, - schema, - }) => parameters - .into_iter() - .map_until_stop_and_collect(f)? - .update_data(|parameters| { - LogicalPlan::Execute(Execute { - parameters, - name, - schema, - }) - }), + LogicalPlan::Statement(stmt) => { + stmt.map_expressions(f)?.update_data(LogicalPlan::Statement) + } // plans without expressions LogicalPlan::EmptyRelation(_) | LogicalPlan::Unnest(_) | LogicalPlan::RecursiveQuery(_) | LogicalPlan::Subquery(_) | LogicalPlan::SubqueryAlias(_) - | LogicalPlan::Statement(_) | LogicalPlan::Analyze(_) | LogicalPlan::Explain(_) | LogicalPlan::Union(_) @@ -766,8 +739,7 @@ impl LogicalPlan { | LogicalPlan::Dml(_) | LogicalPlan::Ddl(_) | LogicalPlan::Copy(_) - | LogicalPlan::DescribeTable(_) - | LogicalPlan::Prepare(_) => Transformed::no(self), + | LogicalPlan::DescribeTable(_) => Transformed::no(self), }) } diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 53a0453d80011..71327ad3e21d4 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -563,9 +563,7 @@ impl OptimizerRule for CommonSubexprEliminate { | LogicalPlan::Dml(_) | LogicalPlan::Copy(_) | LogicalPlan::Unnest(_) - | LogicalPlan::RecursiveQuery(_) - | LogicalPlan::Prepare(_) - | LogicalPlan::Execute(_) => { + | LogicalPlan::RecursiveQuery(_) => { // This rule handles recursion itself in a `ApplyOrder::TopDown` like // manner. plan.map_children(|c| self.rewrite(c, config))? diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 67d888abda52f..04a523f9b1156 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -295,7 +295,7 @@ fn optimize_projections( }) .collect::>()? } - LogicalPlan::Limit(_) | LogicalPlan::Prepare(_) => { + LogicalPlan::Limit(_) => { // Pass index requirements from the parent as well as column indices // that appear in this plan's expressions to its child. These operators // do not benefit from "small" inputs, so the projection_beneficial @@ -311,6 +311,7 @@ fn optimize_projections( | LogicalPlan::Explain(_) | LogicalPlan::Analyze(_) | LogicalPlan::Subquery(_) + | LogicalPlan::Statement(_) | LogicalPlan::Distinct(Distinct::All(_)) => { // These plans require all their fields, and their children should // be treated as final plans -- otherwise, we may have schema a @@ -346,10 +347,8 @@ fn optimize_projections( } LogicalPlan::EmptyRelation(_) | LogicalPlan::RecursiveQuery(_) - | LogicalPlan::Statement(_) | LogicalPlan::Values(_) - | LogicalPlan::DescribeTable(_) - | LogicalPlan::Execute(_) => { + | LogicalPlan::DescribeTable(_) => { // These operators have no inputs, so stop the optimization process. return Ok(Transformed::no(plan)); } diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 1993598f5cf70..a55fecec98f6d 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -66,7 +66,7 @@ use datafusion_expr::{ SubqueryAlias, TableScan, Values, Window, }, DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr, - WindowUDF, + Statement, WindowUDF, }; use datafusion_expr::{AggregateUDF, ColumnUnnestList, FetchType, SkipType, Unnest}; @@ -1502,11 +1502,11 @@ impl AsLogicalPlan for LogicalPlanNode { )), }) } - LogicalPlan::Prepare(Prepare { + LogicalPlan::Statement(Statement::Prepare(Prepare { name, data_types, input, - }) => { + })) => { let input = LogicalPlanNode::try_from_logical_plan(input, extension_codec)?; Ok(LogicalPlanNode { @@ -1633,9 +1633,6 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlan::RecursiveQuery(_) => Err(proto_error( "LogicalPlan serde is not yet implemented for RecursiveQuery", )), - LogicalPlan::Execute(_) => Err(proto_error( - "LogicalPlan serde is not yet implemented for Execute", - )), } } } diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 00949aa13ae1c..3b32a437576b8 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -636,11 +636,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { *statement, &mut planner_context, )?; - Ok(LogicalPlan::Prepare(Prepare { + Ok(LogicalPlan::Statement(PlanStatement::Prepare(Prepare { name: ident_to_string(&name), data_types, input: Arc::new(plan), - })) + }))) } Statement::Execute { name, @@ -660,11 +660,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context)) .collect::>>()?; - Ok(LogicalPlan::Execute(Execute { + Ok(LogicalPlan::Statement(PlanStatement::Execute(Execute { name: ident_to_string(&name), parameters, - schema: DFSchemaRef::new(empty_schema), - })) + }))) } Statement::ShowTables { @@ -841,7 +840,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let statement = PlanStatement::TransactionStart(TransactionStart { access_mode, isolation_level, - schema: DFSchemaRef::new(DFSchema::empty()), }); Ok(LogicalPlan::Statement(statement)) } @@ -849,7 +847,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let statement = PlanStatement::TransactionEnd(TransactionEnd { conclusion: TransactionConclusion::Commit, chain, - schema: DFSchemaRef::new(DFSchema::empty()), }); Ok(LogicalPlan::Statement(statement)) } @@ -860,7 +857,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let statement = PlanStatement::TransactionEnd(TransactionEnd { conclusion: TransactionConclusion::Rollback, chain, - schema: DFSchemaRef::new(DFSchema::empty()), }); Ok(LogicalPlan::Statement(statement)) } @@ -1535,7 +1531,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let statement = PlanStatement::SetVariable(SetVariable { variable: variable_lower, value: value_string, - schema: DFSchemaRef::new(DFSchema::empty()), }); Ok(LogicalPlan::Statement(statement)) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 8167ddacffb4e..c46bc62063798 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -111,8 +111,6 @@ impl Unparser<'_> { LogicalPlan::Explain(_) | LogicalPlan::Analyze(_) | LogicalPlan::Extension(_) - | LogicalPlan::Prepare(_) - | LogicalPlan::Execute(_) | LogicalPlan::Ddl(_) | LogicalPlan::Copy(_) | LogicalPlan::DescribeTable(_) diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 698c408e538f5..b2f128778a1c3 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -33,7 +33,7 @@ use datafusion_expr::{ logical_plan::{LogicalPlan, Prepare}, test::function_stub::sum_udaf, ColumnarValue, CreateExternalTable, CreateIndex, DdlStatement, ScalarUDF, - ScalarUDFImpl, Signature, Volatility, + ScalarUDFImpl, Signature, Statement, Volatility, }; use datafusion_functions::{string, unicode}; use datafusion_sql::{ @@ -2710,7 +2710,9 @@ fn prepare_stmt_quick_test( assert_eq!(format!("{assert_plan}"), expected_plan); // verify data types - if let LogicalPlan::Prepare(Prepare { data_types, .. }) = assert_plan { + if let LogicalPlan::Statement(Statement::Prepare(Prepare { data_types, .. })) = + assert_plan + { let dt = format!("{data_types:?}"); assert_eq!(dt, expected_data_types); } diff --git a/datafusion/sqllogictest/test_files/prepare.slt b/datafusion/sqllogictest/test_files/prepare.slt index b0c67af9e14fd..f149cec96f197 100644 --- a/datafusion/sqllogictest/test_files/prepare.slt +++ b/datafusion/sqllogictest/test_files/prepare.slt @@ -199,19 +199,34 @@ EXECUTE get_N_rand_ints_from_last_run(2); statement ok DROP TABLE test; +statement ok +SET datafusion.explain.logical_plan_only=true; + +# OptimizeProjections rule works with PREPARE and pushes down the `id` projection to TableScan +query TT +EXPLAIN PREPARE my_plan(INT) AS SELECT id + $1 FROM person; +---- +logical_plan +01)Prepare: "my_plan" [Int32] +02)--Projection: person.id + $1 +03)----TableScan: person projection=[id] # test creating logical plan for EXECUTE statements query TT EXPLAIN EXECUTE my_plan; ---- logical_plan Execute: my_plan params=[] -physical_plan_error This feature is not implemented: Unsupported logical plan: Execute query TT EXPLAIN EXECUTE my_plan(10*2 + 1, 'Foo'); ---- logical_plan Execute: my_plan params=[Int64(21), Utf8("Foo")] -physical_plan_error This feature is not implemented: Unsupported logical plan: Execute query error DataFusion error: Schema error: No field named a\. EXPLAIN EXECUTE my_plan(a); + +statement ok +SET datafusion.explain.logical_plan_only=false; + +statement ok +DROP TABLE person;