Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,11 +688,11 @@ impl SessionContext {
LogicalPlan::Statement(Statement::SetVariable(stmt)) => {
self.set_variable(stmt).await
}
LogicalPlan::Prepare(Prepare {
LogicalPlan::Statement(Statement::Prepare(Prepare {
name,
input,
data_types,
}) => {
})) => {
// The number of parameters must match the specified data types length.
if !data_types.is_empty() {
let param_names = input.get_parameter_names()?;
Expand All @@ -712,7 +712,9 @@ impl SessionContext {
self.state.write().store_prepared(name, data_types, input)?;
self.return_empty_dataframe()
}
LogicalPlan::Execute(execute) => self.execute_prepared(execute),
LogicalPlan::Statement(Statement::Execute(execute)) => {
self.execute_prepared(execute)
}
plan => Ok(DataFrame::new(self.state(), plan)),
}
}
Expand Down Expand Up @@ -1773,14 +1775,6 @@ impl<'n, 'a> TreeNodeVisitor<'n> for BadPlanVisitor<'a> {
LogicalPlan::Statement(stmt) if !self.options.allow_statements => {
plan_err!("Statement not supported: {}", stmt.name())
}
// TODO: Implement PREPARE as a LogicalPlan::Statement
LogicalPlan::Prepare(_) if !self.options.allow_statements => {
plan_err!("Statement not supported: PREPARE")
}
// TODO: Implement EXECUTE as a LogicalPlan::Statement
LogicalPlan::Execute(_) if !self.options.allow_statements => {
plan_err!("Statement not supported: EXECUTE")
}
_ => Ok(TreeNodeRecursion::Continue),
}
}
Expand Down
9 changes: 0 additions & 9 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1195,15 +1195,6 @@ impl DefaultPhysicalPlanner {
let name = statement.name();
return not_impl_err!("Unsupported logical plan: Statement({name})");
}
LogicalPlan::Prepare(_) => {
// There is no default plan for "PREPARE" -- it must be
// handled at a higher level (so that the appropriate
// statement can be prepared)
return not_impl_err!("Unsupported logical plan: Prepare");
}
LogicalPlan::Execute(_) => {
return not_impl_err!("Unsupported logical plan: Execute");
}
LogicalPlan::Dml(dml) => {
// DataFusion is a read-only query engine, but also a library, so consumers may implement this
return not_impl_err!("Unsupported logical plan: Dml({0})", dml.op);
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/sql/sql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,12 @@ async fn disable_prepare_and_execute_statement() {
let df = ctx.sql_with_options(prepare_sql, options).await;
assert_eq!(
df.unwrap_err().strip_backtrace(),
"Error during planning: Statement not supported: PREPARE"
"Error during planning: Statement not supported: Prepare"
);
let df = ctx.sql_with_options(execute_sql, options).await;
assert_eq!(
df.unwrap_err().strip_backtrace(),
"Error during planning: Statement not supported: EXECUTE"
"Error during planning: Statement not supported: Execute"
);

let options = options.with_allow_statements(true);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/expr_rewriter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ impl NamePreserver {
| LogicalPlan::Join(_)
| LogicalPlan::TableScan(_)
| LogicalPlan::Limit(_)
| LogicalPlan::Execute(_)
| LogicalPlan::Statement(_)
),
}
}
Expand Down
14 changes: 8 additions & 6 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::utils::{
};
use crate::{
and, binary_expr, lit, DmlStatement, Expr, ExprSchemable, Operator, RecursiveQuery,
TableProviderFilterPushDown, TableSource, WriteOp,
Statement, TableProviderFilterPushDown, TableSource, WriteOp,
};

use super::dml::InsertOp;
Expand Down Expand Up @@ -500,11 +500,13 @@ impl LogicalPlanBuilder {

/// Make a builder for a prepare logical plan from the builder's plan
pub fn prepare(self, name: String, data_types: Vec<DataType>) -> Result<Self> {
Ok(Self::new(LogicalPlan::Prepare(Prepare {
name,
data_types,
input: self.plan,
})))
Ok(Self::new(LogicalPlan::Statement(Statement::Prepare(
Prepare {
name,
data_types,
input: self.plan,
},
))))
}

/// Limit the number of rows returned
Expand Down
26 changes: 4 additions & 22 deletions datafusion/expr/src/logical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use std::collections::HashMap;
use std::fmt;

use crate::{
expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Execute,
Expr, Filter, Join, Limit, LogicalPlan, Partitioning, Prepare, Projection,
RecursiveQuery, Repartition, Sort, Subquery, SubqueryAlias,
TableProviderFilterPushDown, TableScan, Unnest, Values, Window,
expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Expr,
Filter, Join, Limit, LogicalPlan, Partitioning, Projection, RecursiveQuery,
Repartition, Sort, Subquery, SubqueryAlias, TableProviderFilterPushDown, TableScan,
Unnest, Values, Window,
};

use crate::dml::CopyTo;
Expand Down Expand Up @@ -618,24 +618,6 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
"Detail": format!("{:?}", e.node)
})
}
LogicalPlan::Prepare(Prepare {
name, data_types, ..
}) => {
json!({
"Node Type": "Prepare",
"Name": name,
"Data Types": format!("{:?}", data_types)
})
}
LogicalPlan::Execute(Execute {
name, parameters, ..
}) => {
json!({
"Node Type": "Execute",
"Name": name,
"Parameters": expr_vec_fmt!(parameters),
})
}
LogicalPlan::DescribeTable(DescribeTable { .. }) => {
json!({
"Node Type": "DescribeTable"
Expand Down
10 changes: 5 additions & 5 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ pub use ddl::{
pub use dml::{DmlStatement, WriteOp};
pub use plan::{
projection_schema, Aggregate, Analyze, ColumnUnnestList, DescribeTable, Distinct,
DistinctOn, EmptyRelation, Execute, Explain, Extension, FetchType, Filter, Join,
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery,
DistinctOn, EmptyRelation, Explain, Extension, FetchType, Filter, Join,
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Projection,
RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery,
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
};
pub use statement::{
SetVariable, Statement, TransactionAccessMode, TransactionConclusion, TransactionEnd,
TransactionIsolationLevel, TransactionStart,
Execute, Prepare, SetVariable, Statement, TransactionAccessMode,
TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
};

pub use display::display_schema;
Expand Down
105 changes: 27 additions & 78 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ use crate::utils::{
split_conjunction,
};
use crate::{
build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Expr,
ExprSchemable, LogicalPlanBuilder, Operator, TableProviderFilterPushDown,
TableSource, WindowFunctionDefinition,
build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Execute,
Expr, ExprSchemable, LogicalPlanBuilder, Operator, Prepare,
TableProviderFilterPushDown, TableSource, WindowFunctionDefinition,
};

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
Expand Down Expand Up @@ -262,11 +262,6 @@ pub enum LogicalPlan {
/// Remove duplicate rows from the input. This is used to
/// implement SQL `SELECT DISTINCT ...`.
Distinct(Distinct),
/// Prepare a statement and find any bind parameters
/// (e.g. `?`). This is used to implement SQL-prepared statements.
Prepare(Prepare),
/// Execute a prepared statement. This is used to implement SQL 'EXECUTE'.
Execute(Execute),
/// Data Manipulation Language (DML): Insert / Update / Delete
Dml(DmlStatement),
/// Data Definition Language (DDL): CREATE / DROP TABLES / VIEWS / SCHEMAS
Expand Down Expand Up @@ -314,8 +309,6 @@ impl LogicalPlan {
LogicalPlan::Statement(statement) => statement.schema(),
LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
LogicalPlan::Prepare(Prepare { input, .. }) => input.schema(),
LogicalPlan::Execute(Execute { schema, .. }) => schema,
LogicalPlan::Explain(explain) => &explain.schema,
LogicalPlan::Analyze(analyze) => &analyze.schema,
LogicalPlan::Extension(extension) => extension.node.schema(),
Expand Down Expand Up @@ -448,18 +441,16 @@ impl LogicalPlan {
LogicalPlan::Copy(copy) => vec![&copy.input],
LogicalPlan::Ddl(ddl) => ddl.inputs(),
LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
LogicalPlan::Prepare(Prepare { input, .. }) => vec![input],
LogicalPlan::RecursiveQuery(RecursiveQuery {
static_term,
recursive_term,
..
}) => vec![static_term, recursive_term],
LogicalPlan::Statement(stmt) => stmt.inputs(),
// plans without inputs
LogicalPlan::TableScan { .. }
| LogicalPlan::Statement { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::Execute { .. }
| LogicalPlan::DescribeTable(_) => vec![],
}
}
Expand Down Expand Up @@ -562,8 +553,6 @@ impl LogicalPlan {
}
LogicalPlan::Subquery(_) => Ok(None),
LogicalPlan::EmptyRelation(_)
| LogicalPlan::Prepare(_)
| LogicalPlan::Execute(_)
| LogicalPlan::Statement(_)
| LogicalPlan::Values(_)
| LogicalPlan::Explain(_)
Expand Down Expand Up @@ -715,8 +704,6 @@ impl LogicalPlan {
LogicalPlan::RecursiveQuery(_) => Ok(self),
LogicalPlan::Analyze(_) => Ok(self),
LogicalPlan::Explain(_) => Ok(self),
LogicalPlan::Prepare(_) => Ok(self),
LogicalPlan::Execute(_) => Ok(self),
LogicalPlan::TableScan(_) => Ok(self),
LogicalPlan::EmptyRelation(_) => Ok(self),
LogicalPlan::Statement(_) => Ok(self),
Expand Down Expand Up @@ -1070,24 +1057,25 @@ impl LogicalPlan {
logical_optimization_succeeded: e.logical_optimization_succeeded,
}))
}
LogicalPlan::Prepare(Prepare {
name, data_types, ..
}) => {
LogicalPlan::Statement(Statement::Prepare(Prepare {
name,
data_types,
..
})) => {
self.assert_no_expressions(expr)?;
let input = self.only_input(inputs)?;
Ok(LogicalPlan::Prepare(Prepare {
Ok(LogicalPlan::Statement(Statement::Prepare(Prepare {
name: name.clone(),
data_types: data_types.clone(),
input: Arc::new(input),
}))
})))
}
LogicalPlan::Execute(Execute { name, schema, .. }) => {
LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => {
self.assert_no_inputs(inputs)?;
Ok(LogicalPlan::Execute(Execute {
Ok(LogicalPlan::Statement(Statement::Execute(Execute {
name: name.clone(),
schema: Arc::clone(schema),
parameters: expr,
}))
})))
}
LogicalPlan::TableScan(ts) => {
self.assert_no_inputs(inputs)?;
Expand Down Expand Up @@ -1184,8 +1172,8 @@ impl LogicalPlan {
/// Replaces placeholder param values (like `$1`, `$2`) in [`LogicalPlan`]
/// with the specified `param_values`.
///
/// [`LogicalPlan::Prepare`] are
/// converted to their inner logical plan for execution.
/// [`Prepare`] statements are converted to
/// their inner logical plan for execution.
///
/// # Example
/// ```
Expand Down Expand Up @@ -1242,13 +1230,17 @@ impl LogicalPlan {
let plan_with_values = self.replace_params_with_values(&param_values)?;

// unwrap Prepare
Ok(if let LogicalPlan::Prepare(prepare_lp) = plan_with_values {
param_values.verify(&prepare_lp.data_types)?;
// try and take ownership of the input if is not shared, clone otherwise
Arc::unwrap_or_clone(prepare_lp.input)
} else {
plan_with_values
})
Ok(
if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) =
plan_with_values
{
param_values.verify(&prepare_lp.data_types)?;
// try and take ownership of the input if is not shared, clone otherwise
Arc::unwrap_or_clone(prepare_lp.input)
} else {
plan_with_values
},
)
}

/// Returns the maximum number of rows that this plan can output, if known.
Expand Down Expand Up @@ -1346,8 +1338,6 @@ impl LogicalPlan {
| LogicalPlan::Dml(_)
| LogicalPlan::Copy(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Prepare(_)
| LogicalPlan::Execute(_)
| LogicalPlan::Statement(_)
| LogicalPlan::Extension(_) => None,
}
Expand Down Expand Up @@ -1962,14 +1952,6 @@ impl LogicalPlan {
LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
LogicalPlan::Union(_) => write!(f, "Union"),
LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
LogicalPlan::Prepare(Prepare {
name, data_types, ..
}) => {
write!(f, "Prepare: {name:?} {data_types:?} ")
}
LogicalPlan::Execute(Execute { name, parameters, .. }) => {
write!(f, "Execute: {} params=[{}]", name, expr_vec_fmt!(parameters))
}
LogicalPlan::DescribeTable(DescribeTable { .. }) => {
write!(f, "DescribeTable")
}
Expand Down Expand Up @@ -2624,39 +2606,6 @@ impl PartialOrd for Union {
}
}

/// Prepare a statement but do not execute it. Prepare statements can have 0 or more
/// `Expr::Placeholder` expressions that are filled in during execution
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub struct Prepare {
/// The name of the statement
pub name: String,
/// Data types of the parameters ([`Expr::Placeholder`])
pub data_types: Vec<DataType>,
/// The logical plan of the statements
pub input: Arc<LogicalPlan>,
}

/// Execute a prepared statement.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Execute {
/// The name of the prepared statement to execute
pub name: String,
/// The execute parameters
pub parameters: Vec<Expr>,
/// Dummy schema
pub schema: DFSchemaRef,
}

// Comparison excludes the `schema` field.
impl PartialOrd for Execute {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match self.name.partial_cmp(&other.name) {
Some(Ordering::Equal) => self.parameters.partial_cmp(&other.parameters),
cmp => cmp,
}
}
}

/// Describe the schema of table
///
/// # Example output:
Expand Down
Loading