From 7e8866591ea059d89d6095bd17c6527655863ab0 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 4 Mar 2026 15:19:05 -0500 Subject: [PATCH 1/3] TableOptions require checking format --- datafusion/ffi/src/session/mod.rs | 63 ++++++++++++++++++++++++++++--- 1 file changed, 57 insertions(+), 6 deletions(-) diff --git a/datafusion/ffi/src/session/mod.rs b/datafusion/ffi/src/session/mod.rs index aa910abb9149a..c1d7148254ead 100644 --- a/datafusion/ffi/src/session/mod.rs +++ b/datafusion/ffi/src/session/mod.rs @@ -26,7 +26,7 @@ use arrow_schema::SchemaRef; use arrow_schema::ffi::FFI_ArrowSchema; use async_ffi::{FfiFuture, FutureExt}; use async_trait::async_trait; -use datafusion_common::config::{ConfigOptions, TableOptions}; +use datafusion_common::config::{ConfigFileType, ConfigOptions, TableOptions}; use datafusion_common::{DFSchema, DataFusionError}; use datafusion_execution::TaskContext; use datafusion_execution::config::SessionConfig; @@ -438,15 +438,66 @@ impl Clone for FFI_SessionRef { } fn table_options_from_rhashmap(options: RHashMap) -> TableOptions { - let options = options + let options: HashMap = options .into_iter() .map(|kv_pair| (kv_pair.0.into_string(), kv_pair.1.into_string())) .collect(); - TableOptions::from_string_hash_map(&options).unwrap_or_else(|err| { - log::warn!("Error parsing default table options: {err}"); - TableOptions::default() - }) + let mut table_options = TableOptions::default(); + let formats = [ + ConfigFileType::CSV, + ConfigFileType::JSON, + ConfigFileType::PARQUET, + ]; + for format in formats { + // It is imperative that if new enum variants are added below that they be + // included in the formats list above and in the extension check below. + let format_name = match &format { + ConfigFileType::CSV => "csv", + ConfigFileType::PARQUET => "parquet", + ConfigFileType::JSON => "json", + }; + let format_options: HashMap = options + .iter() + .filter_map(|(k, v)| { + let (prefix, key) = k.split_once(".")?; + if prefix == format_name { + Some((format!("format.{key}"), v.to_owned())) + } else { + None + } + }) + .collect(); + if !format_options.is_empty() { + table_options.current_format = Some(format.clone()); + table_options + .alter_with_string_hash_map(&format_options) + .unwrap_or_else(|err| log::warn!("Error parsing table options: {err}")); + } + } + + let extension_options: HashMap = options + .iter() + .filter_map(|(k, v)| { + let (prefix, _) = k.split_once(".")?; + if !["json", "parquet", "csv"].contains(&prefix) { + Some((k.to_owned(), v.to_owned())) + } else { + None + } + }) + .collect(); + if !extension_options.is_empty() { + table_options + .alter_with_string_hash_map(&extension_options) + .unwrap_or_else(|err| log::warn!("Error parsing table options: {err}")); + } + + // TODO(apache/datafusion#20704) We should query the `current_format` set and then pass + // this across the FFI barrier and set it here, but that is a breaking change that + // would need to go into the next release. + table_options.current_format = None; + table_options } #[async_trait] From 258942e46f7f0d5f532613c1b6c8c74fff6c4527 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 4 Mar 2026 16:50:46 -0500 Subject: [PATCH 2/3] Add unit test coverage --- datafusion/ffi/src/session/mod.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/datafusion/ffi/src/session/mod.rs b/datafusion/ffi/src/session/mod.rs index c1d7148254ead..254ce0a9bcef5 100644 --- a/datafusion/ffi/src/session/mod.rs +++ b/datafusion/ffi/src/session/mod.rs @@ -607,6 +607,7 @@ mod tests { use std::sync::Arc; use arrow_schema::{DataType, Field, Schema}; + use datafusion::execution::SessionStateBuilder; use datafusion_common::DataFusionError; use datafusion_expr::col; use datafusion_expr::registry::FunctionRegistry; @@ -617,7 +618,15 @@ mod tests { #[tokio::test] async fn test_ffi_session() -> Result<(), DataFusionError> { let (ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx(); - let state = ctx.state(); + let mut table_options = TableOptions::default(); + table_options.csv.has_header = Some(true); + table_options.json.schema_infer_max_rec = Some(10); + table_options.parquet.global.coerce_int96 = Some("123456789".into()); + + let state = SessionStateBuilder::new_from_existing(ctx.state()) + .with_table_options(table_options) + .build(); + let logical_codec = FFI_LogicalExtensionCodec::new( Arc::new(DefaultLogicalExtensionCodec {}), None, From 70da9e9c012a64e591387f867db6b0a464ad2abf Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 5 Mar 2026 08:03:03 -0500 Subject: [PATCH 3/3] Set current format --- datafusion/ffi/src/session/mod.rs | 43 ++++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/datafusion/ffi/src/session/mod.rs b/datafusion/ffi/src/session/mod.rs index 254ce0a9bcef5..6b8664a437495 100644 --- a/datafusion/ffi/src/session/mod.rs +++ b/datafusion/ffi/src/session/mod.rs @@ -240,12 +240,30 @@ unsafe extern "C" fn window_functions_fn_wrapper( .collect() } -fn table_options_to_rhash(options: &TableOptions) -> RHashMap { - options +fn table_options_to_rhash(mut options: TableOptions) -> RHashMap { + // It is important that we mutate options here and set current format + // to None so that when we call `entries()` we get ALL format entries. + // We will pass current_format as a special case and strip it on the + // other side of the boundary. + let current_format = options.current_format.take(); + let mut options: HashMap = options .entries() .into_iter() .filter_map(|entry| entry.value.map(|v| (entry.key.into(), v.into()))) - .collect() + .collect(); + if let Some(current_format) = current_format { + options.insert( + "datafusion_ffi.table_current_format".into(), + match current_format { + ConfigFileType::JSON => "json", + ConfigFileType::PARQUET => "parquet", + ConfigFileType::CSV => "csv", + } + .into(), + ); + } + + options.into() } unsafe extern "C" fn table_options_fn_wrapper( @@ -253,7 +271,7 @@ unsafe extern "C" fn table_options_fn_wrapper( ) -> RHashMap { let session = session.inner(); let table_options = session.table_options(); - table_options_to_rhash(table_options) + table_options_to_rhash(table_options.clone()) } unsafe extern "C" fn default_table_options_fn_wrapper( @@ -262,7 +280,7 @@ unsafe extern "C" fn default_table_options_fn_wrapper( let session = session.inner(); let table_options = session.default_table_options(); - table_options_to_rhash(&table_options) + table_options_to_rhash(table_options) } unsafe extern "C" fn task_ctx_fn_wrapper(session: &FFI_SessionRef) -> FFI_TaskContext { @@ -438,10 +456,11 @@ impl Clone for FFI_SessionRef { } fn table_options_from_rhashmap(options: RHashMap) -> TableOptions { - let options: HashMap = options + let mut options: HashMap = options .into_iter() .map(|kv_pair| (kv_pair.0.into_string(), kv_pair.1.into_string())) .collect(); + let current_format = options.remove("datafusion_ffi.table_current_format"); let mut table_options = TableOptions::default(); let formats = [ @@ -493,10 +512,13 @@ fn table_options_from_rhashmap(options: RHashMap) -> TableOpti .unwrap_or_else(|err| log::warn!("Error parsing table options: {err}")); } - // TODO(apache/datafusion#20704) We should query the `current_format` set and then pass - // this across the FFI barrier and set it here, but that is a breaking change that - // would need to go into the next release. - table_options.current_format = None; + table_options.current_format = + current_format.and_then(|format| match format.as_str() { + "csv" => Some(ConfigFileType::CSV), + "parquet" => Some(ConfigFileType::PARQUET), + "json" => Some(ConfigFileType::JSON), + _ => None, + }); table_options } @@ -622,6 +644,7 @@ mod tests { table_options.csv.has_header = Some(true); table_options.json.schema_infer_max_rec = Some(10); table_options.parquet.global.coerce_int96 = Some("123456789".into()); + table_options.current_format = Some(ConfigFileType::JSON); let state = SessionStateBuilder::new_from_existing(ctx.state()) .with_table_options(table_options)