diff --git a/src/ast/helpers/stmt_data_loading.rs b/src/ast/helpers/stmt_data_loading.rs index d5ba6eda0b..a259e664b3 100644 --- a/src/ast/helpers/stmt_data_loading.rs +++ b/src/ast/helpers/stmt_data_loading.rs @@ -24,6 +24,7 @@ use core::fmt::Formatter; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; +use crate::ast::Ident; #[cfg(feature = "visitor")] use sqlparser_derive::{Visit, VisitMut}; @@ -63,6 +64,16 @@ pub struct DataLoadingOption { pub value: String, } +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))] +pub struct StageLoadSelectItem { + pub alias: Option, + pub file_col_num: i32, + pub element: Option, + pub item_as: Option, +} + impl fmt::Display for StageParamsObject { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let url = &self.url.as_ref(); @@ -121,3 +132,19 @@ impl fmt::Display for DataLoadingOption { Ok(()) } } + +impl fmt::Display for StageLoadSelectItem { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + if self.alias.is_some() { + write!(f, "{}.", self.alias.as_ref().unwrap())?; + } + write!(f, "${}", self.file_col_num)?; + if self.element.is_some() { + write!(f, ":{}", self.element.as_ref().unwrap())?; + } + if self.item_as.is_some() { + write!(f, " AS {}", self.item_as.as_ref().unwrap())?; + } + Ok(()) + } +} diff --git a/src/ast/mod.rs b/src/ast/mod.rs index 94b237b4f1..e412c78200 100644 --- a/src/ast/mod.rs +++ b/src/ast/mod.rs @@ -44,7 +44,9 @@ pub use self::value::{ escape_quoted_string, DateTimeField, DollarQuotedString, TrimWhereField, Value, }; -use crate::ast::helpers::stmt_data_loading::{DataLoadingOptions, StageParamsObject}; +use crate::ast::helpers::stmt_data_loading::{ + DataLoadingOptions, StageLoadSelectItem, StageParamsObject, +}; #[cfg(feature = "visitor")] pub use visitor::*; @@ -1187,6 +1189,26 @@ pub enum Statement { /// VALUES a vector of values to be copied values: Vec>, }, + /// ```sql + /// COPY INTO + /// ``` + /// See + /// Copy Into syntax available for Snowflake is different than the one implemented in + /// Postgres. Although they share common prefix, it is reasonable to implement them + /// in different enums. This can be refactored later once custom dialects + /// are allowed to have custom Statements. + CopyIntoSnowflake { + into: ObjectName, + from_stage: ObjectName, + from_stage_alias: Option, + stage_params: StageParamsObject, + from_transformations: Option>, + files: Option>, + pattern: Option, + file_format: DataLoadingOptions, + copy_options: DataLoadingOptions, + validation_mode: Option, + }, /// Close - closes the portal underlying an open cursor. Close { /// Cursor name @@ -2795,6 +2817,64 @@ impl fmt::Display for Statement { } Ok(()) } + Statement::CopyIntoSnowflake { + into, + from_stage, + from_stage_alias, + stage_params, + from_transformations, + files, + pattern, + file_format, + copy_options, + validation_mode, + } => { + write!(f, "COPY INTO {}", into)?; + if from_transformations.is_none() { + // Standard data load + write!(f, " FROM {}{}", from_stage, stage_params)?; + if from_stage_alias.as_ref().is_some() { + write!(f, " AS {}", from_stage_alias.as_ref().unwrap())?; + } + } else { + // Data load with transformation + write!( + f, + " FROM (SELECT {} FROM {}{}", + display_separated(from_transformations.as_ref().unwrap(), ", "), + from_stage, + stage_params, + )?; + if from_stage_alias.as_ref().is_some() { + write!(f, " AS {}", from_stage_alias.as_ref().unwrap())?; + } + write!(f, ")")?; + } + if files.is_some() { + write!( + f, + " FILES = ('{}')", + display_separated(files.as_ref().unwrap(), "', '") + )?; + } + if pattern.is_some() { + write!(f, " PATTERN = '{}'", pattern.as_ref().unwrap())?; + } + if !file_format.options.is_empty() { + write!(f, " FILE_FORMAT=({})", file_format)?; + } + if !copy_options.options.is_empty() { + write!(f, " COPY_OPTIONS=({})", copy_options)?; + } + if validation_mode.is_some() { + write!( + f, + " VALIDATION_MODE = {}", + validation_mode.as_ref().unwrap() + )?; + } + Ok(()) + } } } } diff --git a/src/dialect/snowflake.rs b/src/dialect/snowflake.rs index 0be5e1df62..713394a1e1 100644 --- a/src/dialect/snowflake.rs +++ b/src/dialect/snowflake.rs @@ -13,17 +13,20 @@ #[cfg(not(feature = "std"))] use crate::alloc::string::ToString; use crate::ast::helpers::stmt_data_loading::{ - DataLoadingOption, DataLoadingOptionType, DataLoadingOptions, StageParamsObject, + DataLoadingOption, DataLoadingOptionType, DataLoadingOptions, StageLoadSelectItem, + StageParamsObject, }; -use crate::ast::Statement; +use crate::ast::{Ident, ObjectName, Statement}; use crate::dialect::Dialect; use crate::keywords::Keyword; use crate::parser::{Parser, ParserError}; use crate::tokenizer::Token; #[cfg(not(feature = "std"))] -use alloc::vec; +use alloc::string::String; #[cfg(not(feature = "std"))] use alloc::vec::Vec; +#[cfg(not(feature = "std"))] +use alloc::{format, vec}; #[derive(Debug, Default)] pub struct SnowflakeDialect; @@ -31,7 +34,7 @@ pub struct SnowflakeDialect; impl Dialect for SnowflakeDialect { // see https://docs.snowflake.com/en/sql-reference/identifiers-syntax.html fn is_identifier_start(&self, ch: char) -> bool { - ch.is_ascii_lowercase() || ch.is_ascii_uppercase() || ch == '_' + ch.is_ascii_lowercase() || ch.is_ascii_uppercase() || ch == '_' || ch == '@' || ch == '%' } fn is_identifier_part(&self, ch: char) -> bool { @@ -40,6 +43,8 @@ impl Dialect for SnowflakeDialect { || ch.is_ascii_digit() || ch == '$' || ch == '_' + || ch == '/' + || ch == '~' } fn supports_within_after_array_aggregation(&self) -> bool { @@ -71,6 +76,11 @@ impl Dialect for SnowflakeDialect { } } } + if parser.parse_keywords(&[Keyword::COPY, Keyword::INTO]) { + // COPY INTO + return Some(parse_copy_into(parser)); + } + None } } @@ -137,6 +147,209 @@ pub fn parse_create_stage( }) } +pub fn parse_copy_into(parser: &mut Parser) -> Result { + let into: ObjectName = parser.parse_object_name()?; + let mut files: Vec = vec![]; + let mut from_transformations: Option> = None; + let from_stage_alias; + let from_stage: ObjectName; + let stage_params: StageParamsObject; + + parser.expect_keyword(Keyword::FROM)?; + // check if data load transformations are present + match parser.next_token().token { + Token::LParen => { + // data load with transformations + parser.expect_keyword(Keyword::SELECT)?; + from_transformations = parse_select_items_for_data_load(parser)?; + + parser.expect_keyword(Keyword::FROM)?; + from_stage = parser.parse_object_name()?; + stage_params = parse_stage_params(parser)?; + + // as + from_stage_alias = if parser.parse_keyword(Keyword::AS) { + Some(match parser.next_token().token { + Token::Word(w) => Ok(Ident::new(w.value)), + _ => parser.expected("stage alias", parser.peek_token()), + }?) + } else { + None + }; + parser.expect_token(&Token::RParen)?; + } + _ => { + parser.prev_token(); + from_stage = parser.parse_object_name()?; + stage_params = parse_stage_params(parser)?; + + // as + from_stage_alias = if parser.parse_keyword(Keyword::AS) { + Some(match parser.next_token().token { + Token::Word(w) => Ok(Ident::new(w.value)), + _ => parser.expected("stage alias", parser.peek_token()), + }?) + } else { + None + }; + } + }; + + // [ files ] + if parser.parse_keyword(Keyword::FILES) { + parser.expect_token(&Token::Eq)?; + parser.expect_token(&Token::LParen)?; + let mut continue_loop = true; + while continue_loop { + continue_loop = false; + let next_token = parser.next_token(); + match next_token.token { + Token::SingleQuotedString(s) => files.push(s), + _ => parser.expected("file token", next_token)?, + }; + if parser.next_token().token.eq(&Token::Comma) { + continue_loop = true; + } else { + parser.prev_token(); // not a comma, need to go back + } + } + parser.expect_token(&Token::RParen)?; + } + + // [ pattern ] + let mut pattern = None; + if parser.parse_keyword(Keyword::PATTERN) { + parser.expect_token(&Token::Eq)?; + let next_token = parser.next_token(); + pattern = Some(match next_token.token { + Token::SingleQuotedString(s) => s, + _ => parser.expected("pattern", next_token)?, + }); + } + + // [ file_format] + let mut file_format = Vec::new(); + if parser.parse_keyword(Keyword::FILE_FORMAT) { + parser.expect_token(&Token::Eq)?; + file_format = parse_parentheses_options(parser)?; + } + + // [ copy_options ] + let mut copy_options = Vec::new(); + if parser.parse_keyword(Keyword::COPY_OPTIONS) { + parser.expect_token(&Token::Eq)?; + copy_options = parse_parentheses_options(parser)?; + } + + // [ VALIDATION_MODE ] + let mut validation_mode = None; + if parser.parse_keyword(Keyword::VALIDATION_MODE) { + parser.expect_token(&Token::Eq)?; + validation_mode = Some(parser.next_token().token.to_string()); + } + + Ok(Statement::CopyIntoSnowflake { + into, + from_stage, + from_stage_alias, + stage_params, + from_transformations, + files: if files.is_empty() { None } else { Some(files) }, + pattern, + file_format: DataLoadingOptions { + options: file_format, + }, + copy_options: DataLoadingOptions { + options: copy_options, + }, + validation_mode, + }) +} + +fn parse_select_items_for_data_load( + parser: &mut Parser, +) -> Result>, ParserError> { + // [.]$[.] [ , [.]$[.] ... ] + let mut select_items: Vec = vec![]; + loop { + let mut alias: Option = None; + let mut file_col_num: i32 = 0; + let mut element: Option = None; + let mut item_as: Option = None; + + let next_token = parser.next_token(); + match next_token.token { + Token::Placeholder(w) => { + file_col_num = w.to_string().split_off(1).parse::().map_err(|e| { + ParserError::ParserError(format!("Could not parse '{w}' as i32: {e}")) + })?; + Ok(()) + } + Token::Word(w) => { + alias = Some(Ident::new(w.value)); + Ok(()) + } + _ => parser.expected("alias or file_col_num", next_token), + }?; + + if alias.is_some() { + parser.expect_token(&Token::Period)?; + // now we get col_num token + let col_num_token = parser.next_token(); + match col_num_token.token { + Token::Placeholder(w) => { + file_col_num = w.to_string().split_off(1).parse::().map_err(|e| { + ParserError::ParserError(format!("Could not parse '{w}' as i32: {e}")) + })?; + Ok(()) + } + _ => parser.expected("file_col_num", col_num_token), + }?; + } + + // try extracting optional element + match parser.next_token().token { + Token::Colon => { + // parse element + element = Some(Ident::new(match parser.next_token().token { + Token::Word(w) => Ok(w.value), + _ => parser.expected("file_col_num", parser.peek_token()), + }?)); + } + _ => { + // element not present move back + parser.prev_token(); + } + } + + // as + if parser.parse_keyword(Keyword::AS) { + item_as = Some(match parser.next_token().token { + Token::Word(w) => Ok(Ident::new(w.value)), + _ => parser.expected("column item alias", parser.peek_token()), + }?); + } + + select_items.push(StageLoadSelectItem { + alias, + file_col_num, + element, + item_as, + }); + + match parser.next_token().token { + Token::Comma => { + // continue + } + _ => { + parser.prev_token(); // need to move back + break; + } + } + } + Ok(Some(select_items)) +} + fn parse_stage_params(parser: &mut Parser) -> Result { let (mut url, mut storage_integration, mut endpoint) = (None, None, None); let mut encryption: DataLoadingOptions = DataLoadingOptions { options: vec![] }; diff --git a/src/keywords.rs b/src/keywords.rs index 8ec5e1d540..a0c5b68cba 100644 --- a/src/keywords.rs +++ b/src/keywords.rs @@ -252,6 +252,7 @@ define_keywords!( FETCH, FIELDS, FILE, + FILES, FILE_FORMAT, FILTER, FIRST, @@ -433,6 +434,7 @@ define_keywords!( PARTITIONED, PARTITIONS, PASSWORD, + PATTERN, PERCENT, PERCENTILE_CONT, PERCENTILE_DISC, @@ -615,6 +617,7 @@ define_keywords!( USING, UUID, VALID, + VALIDATION_MODE, VALUE, VALUES, VALUE_OF, diff --git a/tests/sqlparser_snowflake.rs b/tests/sqlparser_snowflake.rs index ea14cc44b8..8bcbb7b5a8 100644 --- a/tests/sqlparser_snowflake.rs +++ b/tests/sqlparser_snowflake.rs @@ -14,7 +14,9 @@ //! Test SQL syntax specific to Snowflake. The parser based on the //! generic dialect is also tested (on the inputs it can handle). -use sqlparser::ast::helpers::stmt_data_loading::{DataLoadingOption, DataLoadingOptionType}; +use sqlparser::ast::helpers::stmt_data_loading::{ + DataLoadingOption, DataLoadingOptionType, StageLoadSelectItem, +}; use sqlparser::ast::*; use sqlparser::dialect::{GenericDialect, SnowflakeDialect}; use sqlparser::parser::ParserError; @@ -69,7 +71,7 @@ fn test_snowflake_single_line_tokenize() { assert_eq!(expected, tokens); - let sql = "CREATE TABLE// this is a comment \ntable_1"; + let sql = "CREATE TABLE // this is a comment \ntable_1"; let mut tokenizer = Tokenizer::new(&dialect, sql); let tokens = tokenizer.tokenize().unwrap(); @@ -77,6 +79,7 @@ fn test_snowflake_single_line_tokenize() { Token::make_keyword("CREATE"), Token::Whitespace(Whitespace::Space), Token::make_keyword("TABLE"), + Token::Whitespace(Whitespace::Space), Token::Whitespace(Whitespace::SingleLineComment { prefix: "//".to_string(), comment: " this is a comment \n".to_string(), @@ -734,3 +737,305 @@ fn test_create_stage_with_copy_options() { }; assert_eq!(snowflake().verified_stmt(sql).to_string(), sql); } + +#[test] +fn test_copy_into() { + let sql = concat!( + "COPY INTO my_company.emp_basic ", + "FROM 'gcs://mybucket/./../a.csv'" + ); + match snowflake().verified_stmt(sql) { + Statement::CopyIntoSnowflake { + into, + from_stage, + files, + pattern, + validation_mode, + .. + } => { + assert_eq!( + into, + ObjectName(vec![Ident::new("my_company"), Ident::new("emp_basic")]) + ); + assert_eq!( + from_stage, + ObjectName(vec![Ident::with_quote('\'', "gcs://mybucket/./../a.csv")]) + ); + assert!(files.is_none()); + assert!(pattern.is_none()); + assert!(validation_mode.is_none()); + } + _ => unreachable!(), + }; + assert_eq!(snowflake().verified_stmt(sql).to_string(), sql); +} + +#[test] +fn test_copy_into_with_stage_params() { + let sql = concat!( + "COPY INTO my_company.emp_basic ", + "FROM 's3://load/files/' ", + "STORAGE_INTEGRATION=myint ", + "ENDPOINT='' ", + "CREDENTIALS=(AWS_KEY_ID='1a2b3c' AWS_SECRET_KEY='4x5y6z') ", + "ENCRYPTION=(MASTER_KEY='key' TYPE='AWS_SSE_KMS')" + ); + + match snowflake().verified_stmt(sql) { + Statement::CopyIntoSnowflake { + from_stage, + stage_params, + .. + } => { + //assert_eq!("s3://load/files/", stage_params.url.unwrap()); + assert_eq!( + from_stage, + ObjectName(vec![Ident::with_quote('\'', "s3://load/files/")]) + ); + assert_eq!("myint", stage_params.storage_integration.unwrap()); + assert_eq!( + "", + stage_params.endpoint.unwrap() + ); + assert!(stage_params + .credentials + .options + .contains(&DataLoadingOption { + option_name: "AWS_KEY_ID".to_string(), + option_type: DataLoadingOptionType::STRING, + value: "1a2b3c".to_string() + })); + assert!(stage_params + .credentials + .options + .contains(&DataLoadingOption { + option_name: "AWS_SECRET_KEY".to_string(), + option_type: DataLoadingOptionType::STRING, + value: "4x5y6z".to_string() + })); + assert!(stage_params + .encryption + .options + .contains(&DataLoadingOption { + option_name: "MASTER_KEY".to_string(), + option_type: DataLoadingOptionType::STRING, + value: "key".to_string() + })); + assert!(stage_params + .encryption + .options + .contains(&DataLoadingOption { + option_name: "TYPE".to_string(), + option_type: DataLoadingOptionType::STRING, + value: "AWS_SSE_KMS".to_string() + })); + } + _ => unreachable!(), + }; + + assert_eq!(snowflake().verified_stmt(sql).to_string(), sql); + + // stage params within copy into with transformations + let sql = concat!( + "COPY INTO my_company.emp_basic FROM ", + "(SELECT t1.$1 FROM 's3://load/files/' STORAGE_INTEGRATION=myint)", + ); + + match snowflake().verified_stmt(sql) { + Statement::CopyIntoSnowflake { + from_stage, + stage_params, + .. + } => { + assert_eq!( + from_stage, + ObjectName(vec![Ident::with_quote('\'', "s3://load/files/")]) + ); + assert_eq!("myint", stage_params.storage_integration.unwrap()); + } + _ => unreachable!(), + } +} + +#[test] +fn test_copy_into_with_files_and_pattern_and_verification() { + let sql = concat!( + "COPY INTO my_company.emp_basic ", + "FROM 'gcs://mybucket/./../a.csv' AS some_alias ", + "FILES = ('file1.json', 'file2.json') ", + "PATTERN = '.*employees0[1-5].csv.gz' ", + "VALIDATION_MODE = RETURN_7_ROWS" + ); + + match snowflake().verified_stmt(sql) { + Statement::CopyIntoSnowflake { + files, + pattern, + validation_mode, + from_stage_alias, + .. + } => { + assert_eq!(files.unwrap(), vec!["file1.json", "file2.json"]); + assert_eq!(pattern.unwrap(), ".*employees0[1-5].csv.gz"); + assert_eq!(validation_mode.unwrap(), "RETURN_7_ROWS"); + assert_eq!(from_stage_alias.unwrap(), Ident::new("some_alias")); + } + _ => unreachable!(), + } + assert_eq!(snowflake().verified_stmt(sql).to_string(), sql); +} + +#[test] +fn test_copy_into_with_transformations() { + let sql = concat!( + "COPY INTO my_company.emp_basic FROM ", + "(SELECT t1.$1:st AS st, $1:index, t2.$1 FROM @schema.general_finished AS T) ", + "FILES = ('file1.json', 'file2.json') ", + "PATTERN = '.*employees0[1-5].csv.gz' ", + "VALIDATION_MODE = RETURN_7_ROWS" + ); + + match snowflake().verified_stmt(sql) { + Statement::CopyIntoSnowflake { + from_stage, + from_transformations, + .. + } => { + assert_eq!( + from_stage, + ObjectName(vec![Ident::new("@schema"), Ident::new("general_finished")]) + ); + assert_eq!( + from_transformations.as_ref().unwrap()[0], + StageLoadSelectItem { + alias: Some(Ident::new("t1")), + file_col_num: 1, + element: Some(Ident::new("st")), + item_as: Some(Ident::new("st")) + } + ); + assert_eq!( + from_transformations.as_ref().unwrap()[1], + StageLoadSelectItem { + alias: None, + file_col_num: 1, + element: Some(Ident::new("index")), + item_as: None + } + ); + assert_eq!( + from_transformations.as_ref().unwrap()[2], + StageLoadSelectItem { + alias: Some(Ident::new("t2")), + file_col_num: 1, + element: None, + item_as: None + } + ); + } + _ => unreachable!(), + } + assert_eq!(snowflake().verified_stmt(sql).to_string(), sql); +} + +#[test] +fn test_copy_into_file_format() { + let sql = concat!( + "COPY INTO my_company.emp_basic ", + "FROM 'gcs://mybucket/./../a.csv' ", + "FILES = ('file1.json', 'file2.json') ", + "PATTERN = '.*employees0[1-5].csv.gz' ", + "FILE_FORMAT=(COMPRESSION=AUTO BINARY_FORMAT=HEX ESCAPE='\\')" + ); + + match snowflake().verified_stmt(sql) { + Statement::CopyIntoSnowflake { file_format, .. } => { + assert!(file_format.options.contains(&DataLoadingOption { + option_name: "COMPRESSION".to_string(), + option_type: DataLoadingOptionType::ENUM, + value: "AUTO".to_string() + })); + assert!(file_format.options.contains(&DataLoadingOption { + option_name: "BINARY_FORMAT".to_string(), + option_type: DataLoadingOptionType::ENUM, + value: "HEX".to_string() + })); + assert!(file_format.options.contains(&DataLoadingOption { + option_name: "ESCAPE".to_string(), + option_type: DataLoadingOptionType::STRING, + value: "\\".to_string() + })); + } + _ => unreachable!(), + } + assert_eq!(snowflake().verified_stmt(sql).to_string(), sql); +} + +#[test] +fn test_copy_into_copy_options() { + let sql = concat!( + "COPY INTO my_company.emp_basic ", + "FROM 'gcs://mybucket/./../a.csv' ", + "FILES = ('file1.json', 'file2.json') ", + "PATTERN = '.*employees0[1-5].csv.gz' ", + "COPY_OPTIONS=(ON_ERROR=CONTINUE FORCE=TRUE)" + ); + + match snowflake().verified_stmt(sql) { + Statement::CopyIntoSnowflake { copy_options, .. } => { + assert!(copy_options.options.contains(&DataLoadingOption { + option_name: "ON_ERROR".to_string(), + option_type: DataLoadingOptionType::ENUM, + value: "CONTINUE".to_string() + })); + assert!(copy_options.options.contains(&DataLoadingOption { + option_name: "FORCE".to_string(), + option_type: DataLoadingOptionType::BOOLEAN, + value: "TRUE".to_string() + })); + } + _ => unreachable!(), + }; + assert_eq!(snowflake().verified_stmt(sql).to_string(), sql); +} + +#[test] +fn test_snowflake_stage_object_names() { + let allowed_formatted_names = vec![ + "my_company.emp_basic", + "@namespace.%table_name", + "@namespace.%table_name/path", + "@namespace.stage_name/path", + "@~/path", + ]; + let mut allowed_object_names = vec![ + ObjectName(vec![Ident::new("my_company"), Ident::new("emp_basic")]), + ObjectName(vec![Ident::new("@namespace"), Ident::new("%table_name")]), + ObjectName(vec![ + Ident::new("@namespace"), + Ident::new("%table_name/path"), + ]), + ObjectName(vec![ + Ident::new("@namespace"), + Ident::new("stage_name/path"), + ]), + ObjectName(vec![Ident::new("@~/path")]), + ]; + + for it in allowed_formatted_names + .iter() + .zip(allowed_object_names.iter_mut()) + { + let (formatted_name, object_name) = it; + let sql = format!( + "COPY INTO {} FROM 'gcs://mybucket/./../a.csv'", + formatted_name + ); + match snowflake().verified_stmt(&sql) { + Statement::CopyIntoSnowflake { into, .. } => { + assert_eq!(into.0, object_name.0) + } + _ => unreachable!(), + } + } +}