Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 85 additions & 34 deletions datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use datafusion::logical_expr::{
use std::sync::Arc;
use substrait::proto::expression::MaskExpression;
use substrait::proto::read_rel::ReadType;
use substrait::proto::read_rel::local_files::file_or_files::PathType::UriFile;
use substrait::proto::read_rel::local_files::file_or_files::PathType;
use substrait::proto::{Expression, ReadRel};
use url::Url;

Expand Down Expand Up @@ -176,45 +176,96 @@ pub async fn from_read_rel(
}))
}
Some(ReadType::LocalFiles(lf)) => {
fn extract_filename(name: &str) -> Option<String> {
let corrected_url =
if name.starts_with("file://") && !name.starts_with("file:///") {
name.replacen("file://", "file:///", 1)
} else {
name.to_string()
};

Url::parse(&corrected_url).ok().and_then(|url| {
let path = url.path();
std::path::Path::new(path)
.file_name()
.map(|filename| filename.to_string_lossy().to_string())
/// Parses the URI string from a PathType variant.
/// Returns an error if the URI is malformed.
fn parse_uri(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note in case someone comes to this in the future and as the same question.

The Url crate is quite permissive, so while the type is called Url, it handles general URIs without a problem.

path_type: Option<&PathType>,
) -> datafusion::common::Result<Option<Url>> {
let path_str = match path_type {
Some(PathType::UriPath(p)) => p,
Some(PathType::UriPathGlob(p)) => p,
Some(PathType::UriFile(p)) => p,
Some(PathType::UriFolder(p)) => p,
None => return Ok(None),
};

Url::parse(path_str).map(Some).map_err(|e| {
datafusion::error::DataFusionError::Substrait(format!(
"Failed to parse URI '{path_str}': {e}"
))
})
}

// we could use the file name to check the original table provider
// TODO: currently does not support multiple local files
let filename: Option<String> =
lf.items.first().and_then(|x| match x.path_type.as_ref() {
Some(UriFile(name)) => extract_filename(name),
_ => None,
});
// Collect all file URIs from LocalFiles items
let uris: Vec<Url> = lf
.items
.iter()
.map(|item| parse_uri(item.path_type.as_ref()))
.collect::<datafusion::common::Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect();

if lf.items.len() > 1 || filename.is_none() {
return not_impl_err!("Only single file reads are supported");
}
let name = filename.unwrap();
// directly use unwrap here since we could determine it is a valid one
let table_reference = TableReference::Bare { table: name.into() };
// Generate a table name from the first URI's path component
let table_name = uris
.first()
.and_then(|uri| {
std::path::Path::new(uri.path())
.file_name()
.map(|n| n.to_string_lossy().to_string())
})
.unwrap_or_else(|| "local_files".to_string());

read_with_schema(
consumer,
let table_reference = TableReference::Bare {
table: table_name.clone().into(),
};

// Try to resolve files using the consumer's resolve_local_files method.
// If not implemented (default returns not_impl_err), fall back to the
// legacy single-file behavior for backward compatibility. For multiple
// files, propagate the error to avoid silently producing incorrect results.
let provider =
match consumer.resolve_local_files(&uris, &substrait_schema).await {
Ok(provider) => provider,
Err(e) => {
if uris.len() <= 1 {
// Single-file fallback: look up a pre-registered table
// by filename, maintaining backward compatibility
match consumer.resolve_table_ref(&table_reference).await? {
Some(provider) => provider,
None => return Err(e),
}
} else {
// Multi-file: don't fall back, as resolve_table_ref would
// only use the first filename and silently ignore the rest
return Err(e);
}
}
};

// Build the scan plan inline
let schema = substrait_schema.replace_qualifier(table_reference.clone());

let filters = if let Some(f) = &read.filter {
let filter_expr = consumer.consume_expression(f, &schema).await?;
split_conjunction_owned(filter_expr)
} else {
vec![]
};

let plan = LogicalPlanBuilder::scan_with_filters(
table_reference,
substrait_schema,
&read.projection,
&read.filter,
)
.await
provider_as_source(provider),
None,
filters,
)?
.build()?;

ensure_schema_compatibility(plan.schema(), schema.clone())?;

let schema = apply_masking(schema, &read.projection)?;

apply_projection(plan, schema)
}
_ => {
not_impl_err!("Unsupported Readtype: {:?}", read.read_type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,33 @@ pub trait SubstraitConsumer: Send + Sync + Sized {
table_ref: &TableReference,
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>>;

/// Resolves a list of local file URIs to a TableProvider.
///
/// Override this method to customize how file URIs from Substrait LocalFiles
/// are resolved to table providers. The URIs preserve the full scheme (e.g.,
/// `file:///`, `s3://`, `hdfs://`) so implementations can determine how to
/// access the files.
///
/// Note: The `url::Url` type is used here despite the name — it handles
/// general URIs (not just URLs) without issue.
///
/// The default implementation returns an error since resolving arbitrary
/// file URIs requires access to an object store or file system which may
/// not be available in all contexts.
///
/// # Arguments
/// * `uris` - List of parsed file URIs from Substrait LocalFiles
/// * `schema` - The expected schema of the files
async fn resolve_local_files(
&self,
_uris: &[url::Url],
_schema: &DFSchema,
) -> datafusion::common::Result<Arc<dyn TableProvider>> {
not_impl_err!(
"resolve_local_files is not implemented. Override this method to support LocalFiles read types."
)
}

// TODO: Remove these two methods
// Ideally, the abstract consumer should not place any constraints on implementations.
// The functionality for which the Extensions and FunctionRegistry is needed should be abstracted
Expand Down
Loading