Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2521,6 +2521,10 @@ config_namespace! {
// The input regex for Nulls when loading CSVs.
pub null_regex: Option<String>, default = None
pub comment: Option<u8>, default = None
// Whether to allow truncated rows when parsing.
// By default this is set to false and will error if the CSV rows have different lengths.
// When set to true then it will allow records with less than the expected number of columns
pub truncated_rows: Option<bool>, default = None
}
}

Expand Down Expand Up @@ -2613,6 +2617,15 @@ impl CsvOptions {
self
}

/// Whether to allow truncated rows when parsing.
/// By default this is set to false and will error if the CSV rows have different lengths.
/// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
/// If the record’s schema is not nullable, then it will still return an error.
pub fn with_truncated_rows(mut self, allow: bool) -> Self {
self.truncated_rows = Some(allow);
self
}

/// The delimiter character.
pub fn delimiter(&self) -> u8 {
self.delimiter
Expand Down
179 changes: 178 additions & 1 deletion datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ mod tests {
use datafusion_physical_plan::{collect, ExecutionPlan};

use arrow::array::{
BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray,
Array, BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray,
};
use arrow::compute::concat_batches;
use arrow::csv::ReaderBuilder;
Expand Down Expand Up @@ -1256,4 +1256,181 @@ mod tests {
.build_decoder();
DecoderDeserializer::new(CsvDecoder::new(decoder))
}

fn csv_deserializer_with_truncated(
batch_size: usize,
schema: &Arc<Schema>,
) -> impl BatchDeserializer<Bytes> {
// using Arrow's ReaderBuilder and enabling truncated_rows
let decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(batch_size)
.with_truncated_rows(true) // <- enable runtime truncated_rows
.build_decoder();
DecoderDeserializer::new(CsvDecoder::new(decoder))
}

#[tokio::test]
async fn infer_schema_with_truncated_rows_true() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();

// CSV: header has 3 columns, but first data row has only 2 columns, second row has 3
let csv_data = Bytes::from("a,b,c\n1,2\n3,4,5\n");
let variable_object_store = Arc::new(VariableStream::new(csv_data, 1));
let object_meta = ObjectMeta {
location: Path::parse("/")?,
last_modified: DateTime::default(),
size: u64::MAX,
e_tag: None,
version: None,
};

// Construct CsvFormat and enable truncated_rows via CsvOptions
let csv_options = CsvOptions::default().with_truncated_rows(true);
let csv_format = CsvFormat::default()
.with_has_header(true)
.with_options(csv_options)
.with_schema_infer_max_rec(10);

let inferred_schema = csv_format
.infer_schema(
&state,
&(variable_object_store.clone() as Arc<dyn ObjectStore>),
&[object_meta],
)
.await?;

// header has 3 columns; inferred schema should also have 3
assert_eq!(inferred_schema.fields().len(), 3);

// inferred columns should be nullable
for f in inferred_schema.fields() {
assert!(f.is_nullable());
}

Ok(())
}
#[test]
fn test_decoder_truncated_rows_runtime() -> Result<()> {
// Synchronous test: Decoder API used here is synchronous
let schema = csv_schema(); // helper already defined in file

// Construct a decoder that enables truncated_rows at runtime
let mut deserializer = csv_deserializer_with_truncated(10, &schema);

// Provide two rows: first row complete, second row missing last column
let input = Bytes::from("0,0.0,true,0-string\n1,1.0,true\n");
deserializer.digest(input);

// Finish and collect output
deserializer.finish();

let output = deserializer.next()?;
match output {
DeserializerOutput::RecordBatch(batch) => {
// ensure at least two rows present
assert!(batch.num_rows() >= 2);
// column 4 (index 3) should be a StringArray where second row is NULL
let col4 = batch
.column(3)
.as_any()
.downcast_ref::<StringArray>()
.expect("column 4 should be StringArray");

// first row present, second row should be null
assert!(!col4.is_null(0));
assert!(col4.is_null(1));
}
other => panic!("expected RecordBatch but got {other:?}"),
}
Ok(())
}

#[tokio::test]
async fn infer_schema_truncated_rows_false_error() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();

// CSV: header has 4 cols, first data row has 3 cols -> truncated at end
let csv_data = Bytes::from("id,a,b,c\n1,foo,bar\n2,foo,bar,baz\n");
let variable_object_store = Arc::new(VariableStream::new(csv_data, 1));
let object_meta = ObjectMeta {
location: Path::parse("/")?,
last_modified: DateTime::default(),
size: u64::MAX,
e_tag: None,
version: None,
};

// CsvFormat without enabling truncated_rows (default behavior = false)
let csv_format = CsvFormat::default()
.with_has_header(true)
.with_schema_infer_max_rec(10);

let res = csv_format
.infer_schema(
&state,
&(variable_object_store.clone() as Arc<dyn ObjectStore>),
&[object_meta],
)
.await;

// Expect an error due to unequal lengths / incorrect number of fields
assert!(
res.is_err(),
"expected infer_schema to error on truncated rows when disabled"
);

// Optional: check message contains indicative text (two known possibilities)
if let Err(err) = res {
let msg = format!("{err}");
assert!(
msg.contains("Encountered unequal lengths")
|| msg.contains("incorrect number of fields"),
"unexpected error message: {msg}",
);
}

Ok(())
}

#[tokio::test]
async fn test_read_csv_truncated_rows_via_tempfile() -> Result<()> {
use std::io::Write;

// create a SessionContext
let ctx = SessionContext::new();

// Create a temp file with a .csv suffix so the reader accepts it
let mut tmp = tempfile::Builder::new().suffix(".csv").tempfile()?; // ensures path ends with .csv
// CSV has header "a,b,c". First data row is truncated (only "1,2"), second row is complete.
write!(tmp, "a,b,c\n1,2\n3,4,5\n")?;
let path = tmp.path().to_str().unwrap().to_string();

// Build CsvReadOptions: header present, enable truncated_rows.
// (Use the exact builder method your crate exposes: `truncated_rows(true)` here,
// if the method name differs in your codebase use the appropriate one.)
let options = CsvReadOptions::default().truncated_rows(true);

println!("options: {}, path: {path}", options.truncated_rows);

// Call the API under test
let df = ctx.read_csv(&path, options).await?;

// Collect the results and combine batches so we can inspect columns
let batches = df.collect().await?;
let combined = concat_batches(&batches[0].schema(), &batches)?;

// Column 'c' is the 3rd column (index 2). The first data row was truncated -> should be NULL.
let col_c = combined.column(2);
assert!(
col_c.is_null(0),
"expected first row column 'c' to be NULL due to truncated row"
);

// Also ensure we read at least one row
assert!(combined.num_rows() >= 2);

Ok(())
}
}
18 changes: 17 additions & 1 deletion datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ pub struct CsvReadOptions<'a> {
pub file_sort_order: Vec<Vec<SortExpr>>,
/// Optional regex to match null values
pub null_regex: Option<String>,
/// Whether to allow truncated rows when parsing.
/// By default this is set to false and will error if the CSV rows have different lengths.
/// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
/// If the record’s schema is not nullable, then it will still return an error.
pub truncated_rows: bool,
}

impl Default for CsvReadOptions<'_> {
Expand All @@ -117,6 +122,7 @@ impl<'a> CsvReadOptions<'a> {
file_sort_order: vec![],
comment: None,
null_regex: None,
truncated_rows: false,
}
}

Expand Down Expand Up @@ -223,6 +229,15 @@ impl<'a> CsvReadOptions<'a> {
self.null_regex = null_regex;
self
}

/// Configure whether to allow truncated rows when parsing.
/// By default this is set to false and will error if the CSV rows have different lengths
/// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
/// If the record’s schema is not nullable, then it will still return an error.
pub fn truncated_rows(mut self, truncated_rows: bool) -> Self {
self.truncated_rows = truncated_rows;
self
}
}

/// Options that control the reading of Parquet files.
Expand Down Expand Up @@ -558,7 +573,8 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
.with_newlines_in_values(self.newlines_in_values)
.with_schema_infer_max_rec(self.schema_infer_max_records)
.with_file_compression_type(self.file_compression_type.to_owned())
.with_null_regex(self.null_regex.clone());
.with_null_regex(self.null_regex.clone())
.with_truncated_rows(self.truncated_rows);

ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
Expand Down
19 changes: 17 additions & 2 deletions datafusion/datasource-csv/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ impl CsvFormat {
self
}

pub fn with_truncated_rows(mut self, truncated_rows: bool) -> Self {
self.options.truncated_rows = Some(truncated_rows);
self
}

/// Set the regex to use for null values in the CSV reader.
/// - default to treat empty values as null.
pub fn with_null_regex(mut self, null_regex: Option<String>) -> Self {
Expand Down Expand Up @@ -291,6 +296,13 @@ impl CsvFormat {
self
}

/// Set whether rows should be truncated to the column width
/// - defaults to false
pub fn with_truncate_rows(mut self, truncate_rows: bool) -> Self {
self.options.truncated_rows = Some(truncate_rows);
self
}

/// The delimiter character.
pub fn delimiter(&self) -> u8 {
self.options.delimiter
Expand Down Expand Up @@ -426,11 +438,13 @@ impl FileFormat for CsvFormat {
.with_file_compression_type(self.options.compression.into())
.with_newlines_in_values(newlines_in_values);

let truncated_rows = self.options.truncated_rows.unwrap_or(false);
let source = Arc::new(
CsvSource::new(has_header, self.options.delimiter, self.options.quote)
.with_escape(self.options.escape)
.with_terminator(self.options.terminator)
.with_comment(self.options.comment),
.with_comment(self.options.comment)
.with_truncate_rows(truncated_rows),
);

let config = conf_builder.with_source(source).build();
Expand Down Expand Up @@ -509,7 +523,8 @@ impl CsvFormat {
.unwrap_or_else(|| state.config_options().catalog.has_header),
)
.with_delimiter(self.options.delimiter)
.with_quote(self.options.quote);
.with_quote(self.options.quote)
.with_truncated_rows(self.options.truncated_rows.unwrap_or(false));

if let Some(null_regex) = &self.options.null_regex {
let regex = Regex::new(null_regex.as_str())
Expand Down
17 changes: 16 additions & 1 deletion datafusion/datasource-csv/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ pub struct CsvSource {
metrics: ExecutionPlanMetricsSet,
projected_statistics: Option<Statistics>,
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
truncate_rows: bool,
}

impl CsvSource {
Expand All @@ -111,6 +112,11 @@ impl CsvSource {
pub fn has_header(&self) -> bool {
self.has_header
}

// true if rows length support truncate
pub fn truncate_rows(&self) -> bool {
self.truncate_rows
}
/// A column delimiter
pub fn delimiter(&self) -> u8 {
self.delimiter
Expand Down Expand Up @@ -156,6 +162,13 @@ impl CsvSource {
conf.comment = comment;
conf
}

/// Whether to support truncate rows when read csv file
pub fn with_truncate_rows(&self, truncate_rows: bool) -> Self {
let mut conf = self.clone();
conf.truncate_rows = truncate_rows;
conf
}
}

impl CsvSource {
Expand All @@ -175,7 +188,8 @@ impl CsvSource {
.expect("Batch size must be set before initializing builder"),
)
.with_header(self.has_header)
.with_quote(self.quote);
.with_quote(self.quote)
.with_truncated_rows(self.truncate_rows);
if let Some(terminator) = self.terminator {
builder = builder.with_terminator(terminator);
}
Expand Down Expand Up @@ -340,6 +354,7 @@ impl FileOpener for CsvOpener {

let config = CsvSource {
has_header: csv_has_header,
truncate_rows: self.config.truncate_rows,
..(*self.config).clone()
};

Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ message CsvOptions {
bytes double_quote = 15; // Indicates if quotes are doubled
bytes newlines_in_values = 16; // Indicates if newlines are supported in values
bytes terminator = 17; // Optional terminator character as a byte
bytes truncated_rows = 18; // Indicates if truncated rows are allowed
}

// Options controlling CSV format
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
null_regex: (!proto_opts.null_regex.is_empty())
.then(|| proto_opts.null_regex.clone()),
comment: proto_opts.comment.first().copied(),
truncated_rows: proto_opts.truncated_rows.first().map(|h| *h != 0),
})
}
}
Expand Down
Loading