Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions scripts/run_clickbench_rss.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/usr/bin/env bash
set -euo pipefail

ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
cd "$ROOT_DIR"

if ! command -v gtime >/dev/null 2>&1; then
echo "error: gtime not found in PATH" >&2
exit 1
fi

BINARY="${BINARY:-$ROOT_DIR/target/release/datafusion-bench}"
FORMAT="${FORMAT:-vortex}"
START_QUERY="${START_QUERY:-0}"
END_QUERY="${END_QUERY:-42}"
OUTPUT_DIR="${OUTPUT_DIR:-$ROOT_DIR/data/clickbench-rss}"

mkdir -p "$OUTPUT_DIR"

echo "building release benchmark binary..."
cargo build -p datafusion-bench --release

timestamp="$(date +%Y%m%d-%H%M%S)"
csv_path="$OUTPUT_DIR/clickbench-rss-$timestamp.csv"

cat >"$csv_path" <<'EOF'
query,max_rss_kb,elapsed,user_seconds,system_seconds,cpu_percent,exit_status
EOF

for ((q = START_QUERY; q <= END_QUERY; q++)); do
log_path="$OUTPUT_DIR/q${q}.gtime.$timestamp.log"
echo "running clickbench query $q ..."

gtime --verbose \
"$BINARY" clickbench --formats "$FORMAT" --queries "$q" \
>"$OUTPUT_DIR/q${q}.stdout.$timestamp.log" \
2>"$log_path"

max_rss_kb="$(awk -F': ' '/Maximum resident set size/ {print $2}' "$log_path" | tr -d '[:space:]')"
elapsed="$(awk -F': ' '/Elapsed \(wall clock\) time/ {print $2}' "$log_path" | sed 's/^ *//')"
user_seconds="$(awk -F': ' '/User time \(seconds\)/ {print $2}' "$log_path" | tr -d '[:space:]')"
system_seconds="$(awk -F': ' '/System time \(seconds\)/ {print $2}' "$log_path" | tr -d '[:space:]')"
cpu_percent="$(awk -F': ' '/Percent of CPU this job got/ {print $2}' "$log_path" | tr -d '[:space:]')"
exit_status="$(awk -F': ' '/Exit status/ {print $2}' "$log_path" | tr -d '[:space:]')"

printf '%s,%s,%s,%s,%s,%s,%s\n' \
"$q" \
"${max_rss_kb:-}" \
"${elapsed:-}" \
"${user_seconds:-}" \
"${system_seconds:-}" \
"${cpu_percent:-}" \
"${exit_status:-}" \
>>"$csv_path"
done

echo "wrote summary to $csv_path"
19 changes: 19 additions & 0 deletions vortex-array/src/arrow/executor/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::ArrayRef;
use crate::Canonical;
use crate::DynArray;
use crate::ExecutionCtx;
use crate::arrays::Chunked;
use crate::arrays::List;
use crate::arrays::ListArray;
use crate::arrays::ListView;
Expand Down Expand Up @@ -55,6 +56,24 @@ pub(super) fn to_arrow_list<O: OffsetSizeTrait + NativePType>(
Err(a) => a,
};

// Handle ChunkedArray by converting each chunk individually.
// This preserves the fast list_to_list path for inner ListArray chunks
// instead of falling through to the expensive execute::<ListViewArray> path.
if let Some(chunked) = array.as_opt::<Chunked>() {
let mut arrow_chunks: Vec<ArrowArrayRef> = Vec::with_capacity(chunked.nchunks());
for chunk in chunked.chunks() {
arrow_chunks.push(to_arrow_list::<O>(chunk.clone(), elements_field, ctx)?);
}
if arrow_chunks.len() == 1 {
return Ok(arrow_chunks
.into_iter()
.next()
.vortex_expect("known length"));
}
let refs: Vec<&dyn arrow_array::Array> = arrow_chunks.iter().map(|a| a.as_ref()).collect();
return Ok(arrow_select::concat::concat(&refs)?);
}

// Otherwise, we execute the array to become a ListViewArray, then rebuild to ZCTL.
// Note: arrow_cast::cast supports ListView → List (apache/arrow-rs#8735), but it
// unconditionally uses take. Our rebuild uses a heuristic that picks list-by-list
Expand Down
2 changes: 1 addition & 1 deletion vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ config_namespace! {
/// all expressions are evaluated after the scan.
pub projection_pushdown: bool, default = false
/// The intra-partition scan concurrency, controlling the number of row splits to process
/// concurrently per-thread within each file.
/// concurrently within each file.
///
/// This does not affect the overall parallelism
/// across partitions, which is controlled by DataFusion's execution configuration.
Expand Down
3 changes: 2 additions & 1 deletion vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,8 @@ impl FileOpener for VortexOpener {
}
};

let mut scan_builder = ScanBuilder::new(session.clone(), layout_reader);
let mut scan_builder = ScanBuilder::new(session.clone(), layout_reader)
.with_segment_source(vxf.segment_source());

if let Some(extensions) = file.extensions
&& let Some(vortex_plan) = extensions.downcast_ref::<VortexAccessPlan>()
Expand Down
Loading
Loading