diff --git a/scripts/run_clickbench_rss.sh b/scripts/run_clickbench_rss.sh new file mode 100755 index 00000000000..c9e1b0c7bdb --- /dev/null +++ b/scripts/run_clickbench_rss.sh @@ -0,0 +1,57 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +cd "$ROOT_DIR" + +if ! command -v gtime >/dev/null 2>&1; then + echo "error: gtime not found in PATH" >&2 + exit 1 +fi + +BINARY="${BINARY:-$ROOT_DIR/target/release/datafusion-bench}" +FORMAT="${FORMAT:-vortex}" +START_QUERY="${START_QUERY:-0}" +END_QUERY="${END_QUERY:-42}" +OUTPUT_DIR="${OUTPUT_DIR:-$ROOT_DIR/data/clickbench-rss}" + +mkdir -p "$OUTPUT_DIR" + +echo "building release benchmark binary..." +cargo build -p datafusion-bench --release + +timestamp="$(date +%Y%m%d-%H%M%S)" +csv_path="$OUTPUT_DIR/clickbench-rss-$timestamp.csv" + +cat >"$csv_path" <<'EOF' +query,max_rss_kb,elapsed,user_seconds,system_seconds,cpu_percent,exit_status +EOF + +for ((q = START_QUERY; q <= END_QUERY; q++)); do + log_path="$OUTPUT_DIR/q${q}.gtime.$timestamp.log" + echo "running clickbench query $q ..." + + gtime --verbose \ + "$BINARY" clickbench --formats "$FORMAT" --queries "$q" \ + >"$OUTPUT_DIR/q${q}.stdout.$timestamp.log" \ + 2>"$log_path" + + max_rss_kb="$(awk -F': ' '/Maximum resident set size/ {print $2}' "$log_path" | tr -d '[:space:]')" + elapsed="$(awk -F': ' '/Elapsed \(wall clock\) time/ {print $2}' "$log_path" | sed 's/^ *//')" + user_seconds="$(awk -F': ' '/User time \(seconds\)/ {print $2}' "$log_path" | tr -d '[:space:]')" + system_seconds="$(awk -F': ' '/System time \(seconds\)/ {print $2}' "$log_path" | tr -d '[:space:]')" + cpu_percent="$(awk -F': ' '/Percent of CPU this job got/ {print $2}' "$log_path" | tr -d '[:space:]')" + exit_status="$(awk -F': ' '/Exit status/ {print $2}' "$log_path" | tr -d '[:space:]')" + + printf '%s,%s,%s,%s,%s,%s,%s\n' \ + "$q" \ + "${max_rss_kb:-}" \ + "${elapsed:-}" \ + "${user_seconds:-}" \ + "${system_seconds:-}" \ + "${cpu_percent:-}" \ + "${exit_status:-}" \ + >>"$csv_path" +done + +echo "wrote summary to $csv_path" diff --git a/vortex-array/src/arrow/executor/list.rs b/vortex-array/src/arrow/executor/list.rs index a3326115608..f035f997994 100644 --- a/vortex-array/src/arrow/executor/list.rs +++ b/vortex-array/src/arrow/executor/list.rs @@ -17,6 +17,7 @@ use crate::ArrayRef; use crate::Canonical; use crate::DynArray; use crate::ExecutionCtx; +use crate::arrays::Chunked; use crate::arrays::List; use crate::arrays::ListArray; use crate::arrays::ListView; @@ -55,6 +56,24 @@ pub(super) fn to_arrow_list( Err(a) => a, }; + // Handle ChunkedArray by converting each chunk individually. + // This preserves the fast list_to_list path for inner ListArray chunks + // instead of falling through to the expensive execute:: path. + if let Some(chunked) = array.as_opt::() { + let mut arrow_chunks: Vec = Vec::with_capacity(chunked.nchunks()); + for chunk in chunked.chunks() { + arrow_chunks.push(to_arrow_list::(chunk.clone(), elements_field, ctx)?); + } + if arrow_chunks.len() == 1 { + return Ok(arrow_chunks + .into_iter() + .next() + .vortex_expect("known length")); + } + let refs: Vec<&dyn arrow_array::Array> = arrow_chunks.iter().map(|a| a.as_ref()).collect(); + return Ok(arrow_select::concat::concat(&refs)?); + } + // Otherwise, we execute the array to become a ListViewArray, then rebuild to ZCTL. // Note: arrow_cast::cast supports ListView → List (apache/arrow-rs#8735), but it // unconditionally uses take. Our rebuild uses a heuristic that picks list-by-list diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index fb3d5f9db11..3df2ccf6f2c 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -101,7 +101,7 @@ config_namespace! { /// all expressions are evaluated after the scan. pub projection_pushdown: bool, default = false /// The intra-partition scan concurrency, controlling the number of row splits to process - /// concurrently per-thread within each file. + /// concurrently within each file. /// /// This does not affect the overall parallelism /// across partitions, which is controlled by DataFusion's execution configuration. diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 5986a06da49..056b13c716d 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -296,7 +296,8 @@ impl FileOpener for VortexOpener { } }; - let mut scan_builder = ScanBuilder::new(session.clone(), layout_reader); + let mut scan_builder = ScanBuilder::new(session.clone(), layout_reader) + .with_segment_source(vxf.segment_source()); if let Some(extensions) = file.extensions && let Some(vortex_plan) = extensions.downcast_ref::() diff --git a/vortex-duckdb/src/e2e_test/vortex_scan_test.rs b/vortex-duckdb/src/e2e_test/vortex_scan_test.rs index 75db1ac3cd6..5209f9c7829 100644 --- a/vortex-duckdb/src/e2e_test/vortex_scan_test.rs +++ b/vortex-duckdb/src/e2e_test/vortex_scan_test.rs @@ -3,6 +3,7 @@ //! This module contains tests for the `vortex_scan` table function. +use std::collections::BTreeMap; use std::ffi::CStr; use std::io::Write; use std::net::TcpListener; @@ -174,6 +175,28 @@ async fn write_vortex_file_to_dir( temp_file_path } +async fn write_vortex_struct_file_to_dir( + dir: &Path, + prefix: &str, + iter: impl Iterator, impl IntoArray)>, +) -> NamedTempFile { + let struct_array = StructArray::try_from_iter(iter).unwrap(); + let temp_file_path = tempfile::Builder::new() + .prefix(prefix) + .suffix(".vortex") + .tempfile_in(dir) + .unwrap(); + + let mut file = async_fs::File::create(&temp_file_path).await.unwrap(); + SESSION + .write_options() + .write(&mut file, struct_array.to_array_stream()) + .await + .unwrap(); + + temp_file_path +} + #[test] fn test_scan_function_registration() { let conn = database_connection(); @@ -350,6 +373,284 @@ fn test_vortex_scan_multiple_files() { assert_eq!(total_sum, 21); } +#[test] +fn test_vortex_scan_tpch_q13_style_left_join_over_multifile_orders() { + let (tempdir, _customer_file, _orders_file1, _orders_file2) = RUNTIME.block_on(async { + let tempdir = tempfile::tempdir().unwrap(); + + let customer_file = write_vortex_struct_file_to_dir( + tempdir.path(), + "customer_", + [ + ("c_custkey", buffer![1i64, 2, 3, 4, 5].into_array()), + ( + "c_comment", + VarBinArray::from(vec!["c1", "c2", "c3", "c4", "c5"]).into_array(), + ), + ] + .into_iter(), + ) + .await; + + let orders_file1 = write_vortex_struct_file_to_dir( + tempdir.path(), + "orders_", + [ + ("o_orderkey", buffer![10i64, 20, 30].into_array()), + ("o_custkey", buffer![1i64, 2, 3].into_array()), + ( + "o_comment", + VarBinArray::from(vec![ + "ordinary order", + "special handling requests", + "regular comment", + ]) + .into_array(), + ), + ] + .into_iter(), + ) + .await; + + let orders_file2 = write_vortex_struct_file_to_dir( + tempdir.path(), + "orders_", + [ + ("o_orderkey", buffer![11i64, 21, 40].into_array()), + ("o_custkey", buffer![1i64, 2, 4].into_array()), + ( + "o_comment", + VarBinArray::from(vec![ + "special service requests", + "another normal order", + "special packaging requests", + ]) + .into_array(), + ), + ] + .into_iter(), + ) + .await; + + (tempdir, customer_file, orders_file1, orders_file2) + }); + + let customer_glob = format!("{}/customer_*.vortex", tempdir.path().display()); + let orders_glob = format!("{}/orders_*.vortex", tempdir.path().display()); + + let conn = database_connection(); + conn.query(&format!( + "CREATE OR REPLACE VIEW customer AS SELECT * FROM read_vortex('{customer_glob}') WHERE c_custkey IS NOT NULL" + )) + .unwrap(); + conn.query(&format!( + "CREATE OR REPLACE VIEW orders AS SELECT * FROM read_vortex('{orders_glob}') WHERE o_orderkey IS NOT NULL" + )) + .unwrap(); + + let result = conn + .query( + " + SELECT + c_count, + count(*) AS custdist + FROM ( + SELECT + c_custkey, + count(o_orderkey) AS c_count + FROM + customer + LEFT OUTER JOIN orders + ON c_custkey = o_custkey + AND o_comment NOT LIKE '%special%requests%' + GROUP BY + c_custkey + ) AS c_orders(c_custkey, c_count) + GROUP BY + c_count + ORDER BY + custdist DESC, + c_count DESC + ", + ) + .unwrap(); + + let mut rows = Vec::new(); + for chunk in result { + let len = chunk.len().as_(); + let counts = chunk.get_vector(0); + let dists = chunk.get_vector(1); + let counts = counts.as_slice_with_len::(len); + let dists = dists.as_slice_with_len::(len); + rows.extend(counts.iter().copied().zip(dists.iter().copied())); + } + + assert_eq!(rows, vec![(1, 3), (0, 2)]); +} + +#[test] +fn test_vortex_scan_tpch_q13_style_large_multifile_orders_matches_expected() { + const CUSTOMER_COUNT: i64 = 2048; + const ORDER_FILE_COUNT: usize = 3; + const INCLUDED_MODULUS: i64 = 64; + const EXCLUDED_MODULUS: i64 = 7; + + let mut expected_counts = BTreeMap::::new(); + let (tempdir, _customer_file, _order_files) = RUNTIME.block_on(async { + let tempdir = tempfile::tempdir().unwrap(); + + let customer_keys: Vec = (1..=CUSTOMER_COUNT).collect(); + let customer_comments: Vec = (1..=CUSTOMER_COUNT) + .map(|custkey| format!("customer {custkey}")) + .collect(); + let customer_file = write_vortex_struct_file_to_dir( + tempdir.path(), + "customer_", + [ + ( + "c_custkey", + customer_keys + .into_iter() + .collect::() + .into_array(), + ), + ( + "c_comment", + VarBinArray::from(customer_comments).into_array(), + ), + ] + .into_iter(), + ) + .await; + + let mut order_keys_per_file = vec![Vec::new(); ORDER_FILE_COUNT]; + let mut order_custkeys_per_file = vec![Vec::new(); ORDER_FILE_COUNT]; + let mut order_comments_per_file = vec![Vec::new(); ORDER_FILE_COUNT]; + let mut next_orderkey = 1_i64; + + for custkey in 1..=CUSTOMER_COUNT { + let included = custkey % INCLUDED_MODULUS; + let excluded = custkey % EXCLUDED_MODULUS; + *expected_counts.entry(included).or_default() += 1; + + for _ in 0..included { + let file_idx = usize::try_from(next_orderkey).unwrap() % ORDER_FILE_COUNT; + order_keys_per_file[file_idx].push(next_orderkey); + order_custkeys_per_file[file_idx].push(custkey); + order_comments_per_file[file_idx].push("ordinary order".to_string()); + next_orderkey += 1; + } + + for _ in 0..excluded { + let file_idx = usize::try_from(next_orderkey).unwrap() % ORDER_FILE_COUNT; + order_keys_per_file[file_idx].push(next_orderkey); + order_custkeys_per_file[file_idx].push(custkey); + order_comments_per_file[file_idx].push("special handling requests".to_string()); + next_orderkey += 1; + } + } + + let mut order_files = Vec::with_capacity(ORDER_FILE_COUNT); + for file_idx in 0..ORDER_FILE_COUNT { + order_files.push( + write_vortex_struct_file_to_dir( + tempdir.path(), + "orders_", + [ + ( + "o_orderkey", + std::mem::take(&mut order_keys_per_file[file_idx]) + .into_iter() + .collect::() + .into_array(), + ), + ( + "o_custkey", + std::mem::take(&mut order_custkeys_per_file[file_idx]) + .into_iter() + .collect::() + .into_array(), + ), + ( + "o_comment", + VarBinArray::from(std::mem::take( + &mut order_comments_per_file[file_idx], + )) + .into_array(), + ), + ] + .into_iter(), + ) + .await, + ); + } + + (tempdir, customer_file, order_files) + }); + + let customer_glob = format!("{}/customer_*.vortex", tempdir.path().display()); + let orders_glob = format!("{}/orders_*.vortex", tempdir.path().display()); + + let mut expected_rows: Vec<(i64, i64)> = expected_counts.into_iter().collect(); + expected_rows.sort_by(|(left_count, left_dist), (right_count, right_dist)| { + right_dist + .cmp(left_dist) + .then_with(|| right_count.cmp(left_count)) + }); + + let conn = database_connection(); + conn.query(&format!( + "CREATE OR REPLACE VIEW customer AS SELECT * FROM read_vortex('{customer_glob}')" + )) + .unwrap(); + conn.query(&format!( + "CREATE OR REPLACE VIEW orders AS SELECT * FROM read_vortex('{orders_glob}')" + )) + .unwrap(); + + let result = conn + .query( + " + SELECT + c_count, + count(*) AS custdist + FROM ( + SELECT + c_custkey, + count(o_orderkey) AS c_count + FROM + customer + LEFT OUTER JOIN orders + ON c_custkey = o_custkey + AND o_comment NOT LIKE '%special%requests%' + GROUP BY + c_custkey + ) AS c_orders(c_custkey, c_count) + GROUP BY + c_count + ORDER BY + custdist DESC, + c_count DESC + ", + ) + .unwrap(); + + let row_count = result.row_count(); + let mut actual_rows = Vec::new(); + for chunk in result { + let len = chunk.len().as_(); + let counts = chunk.get_vector(0); + let dists = chunk.get_vector(1); + let counts = counts.as_slice_with_len::(len); + let dists = dists.as_slice_with_len::(len); + actual_rows.extend(counts.iter().copied().zip(dists.iter().copied())); + } + + assert_eq!(usize::try_from(row_count).unwrap(), expected_rows.len()); + assert_eq!(actual_rows.len(), expected_rows.len()); + assert_eq!(actual_rows, expected_rows); +} + #[test] fn test_vortex_scan_over_http() { let file = RUNTIME.block_on(async { diff --git a/vortex-duckdb/src/exporter/varbinview.rs b/vortex-duckdb/src/exporter/varbinview.rs index 13a59d242ef..6ff01dac10b 100644 --- a/vortex-duckdb/src/exporter/varbinview.rs +++ b/vortex-duckdb/src/exporter/varbinview.rs @@ -123,7 +123,6 @@ fn to_ptr_binary_view<'a>( _ref: PtrRef { size: v.len(), prefix: view.prefix, - // TODO(joe) verify this. ptr: unsafe { buffers[view.buffer_index as usize] .as_ptr() diff --git a/vortex-file/src/file.rs b/vortex-file/src/file.rs index d113cc8cf98..a7172e3ed3f 100644 --- a/vortex-file/src/file.rs +++ b/vortex-file/src/file.rs @@ -103,18 +103,18 @@ impl VortexFile { self.session.clone(), )); } - Ok(Arc::new(LayoutReaderDataSource::new( - reader, - self.session.clone(), - ))) + Ok(Arc::new( + LayoutReaderDataSource::new(reader, self.session.clone()) + .with_segment_source(self.segment_source()), + )) } /// Initiate a scan of the file, returning a builder for configuring the scan. pub fn scan(&self) -> VortexResult> { - Ok(ScanBuilder::new( - self.session.clone(), - self.layout_reader()?, - )) + Ok( + ScanBuilder::new(self.session.clone(), self.layout_reader()?) + .with_segment_source(self.segment_source()), + ) } /// Returns true if the expression will never match any rows in the file. diff --git a/vortex-file/src/read/driver.rs b/vortex-file/src/read/driver.rs index d8fd638adc8..04a919ec5cf 100644 --- a/vortex-file/src/read/driver.rs +++ b/vortex-file/src/read/driver.rs @@ -140,6 +140,7 @@ impl State { tracing::debug!(?req, "ReadRequest dropped before registration"); return; } + self.metrics.registered_requests.add(1); self.requests_by_offset.insert((req.offset, req.id)); self.requests.insert(req.id, req); } @@ -149,20 +150,43 @@ impl State { self.requests_by_offset.remove(&(req.offset, req_id)); tracing::debug!(?req, "ReadRequest dropped before poll"); } else { + self.metrics.polled_requests.add(1); self.polled_requests.insert(req_id, req); } } } ReadEvent::Dropped(req_id) => { if let Some(req) = self.requests.remove(&req_id) { + self.metrics.dropped_requests.add(1); self.requests_by_offset.remove(&(req.offset, req_id)); tracing::debug!(?req, "ReadRequest dropped before poll"); } if let Some(req) = self.polled_requests.remove(&req_id) { + self.metrics.dropped_requests.add(1); self.requests_by_offset.remove(&(req.offset, req_id)); tracing::debug!(?req, "ReadRequest dropped after poll"); } } + ReadEvent::BatchBoundary => { + // Promote all registered-but-unpolled requests to polled status. + // This tells the coalescer that the entire batch is needed now, + // allowing it to form optimal coalesced reads. + let promoted = self.requests.len(); + if promoted > 0 { + tracing::debug!( + promoted, + "BatchBoundary: promoting registered requests to polled" + ); + self.metrics.polled_requests.add(promoted as u64); + for (req_id, req) in std::mem::take(&mut self.requests) { + if req.callback.is_closed() { + self.requests_by_offset.remove(&(req.offset, req_id)); + } else { + self.polled_requests.insert(req_id, req); + } + } + } + } } } @@ -215,6 +239,9 @@ impl State { let first_req = self.next_uncoalesced()?; let mut requests = vec![first_req]; + let mut payload_bytes = requests[0].length as u64; + let mut registered_only_requests = 0usize; + let mut polled_requests = 1usize; let mut current_start = requests[0].offset; let mut current_end = requests[0].offset + requests[0].length as u64; let align = *self.coalesced_buffer_alignment as u64; @@ -269,18 +296,28 @@ impl State { let new_total_size = new_end - aligned_start; if new_total_size > window.max_size { + self.metrics.batched_skipped_max_size.add(1); // Skip it but keep it available for future coalescing operations. continue; } current_start = new_start; current_end = new_end; - let req = self - .polled_requests - .remove(&req_id) - .or_else(|| self.requests.remove(&req_id)) - .vortex_expect("Missing request in requests_by_offset"); + let (req, was_polled) = if let Some(req) = self.polled_requests.remove(&req_id) + { + (req, true) + } else if let Some(req) = self.requests.remove(&req_id) { + (req, false) + } else { + unreachable!("Missing request in requests_by_offset"); + }; + payload_bytes = payload_bytes.saturating_add(req.length as u64); + if was_polled { + polled_requests = polled_requests.saturating_add(1); + } else { + registered_only_requests = registered_only_requests.saturating_add(1); + } requests.push(req); if ids_to_remove.insert(req_id) { keys_to_remove.push((req_offset, req_id)); @@ -302,6 +339,18 @@ impl State { requests.sort_unstable_by_key(|r| r.offset); let aligned_start = current_start - (current_start % align); + let range_bytes = current_end - aligned_start; + + self.metrics.batched_range_bytes.update(range_bytes as f64); + self.metrics + .batched_payload_bytes + .update(payload_bytes as f64); + self.metrics + .batched_registered_only_requests + .update(registered_only_requests as f64); + self.metrics + .batched_polled_requests + .update(polled_requests as f64); tracing::debug!( "Coalesced {} requests into range {}..{} (len={})", @@ -808,4 +857,86 @@ mod tests { assert_eq!(individual_count, 2, "Expected 2 individual requests"); assert_eq!(coalesced_operations, 0, "Expected 0 coalesced operations"); } + + #[tokio::test] + async fn test_metrics_record_registered_only_batch_members() { + let (req1, _rx1) = create_request(1, 0, 10); + let (req2, _rx2) = create_request(2, 50, 10); + let (req3, _rx3) = create_request(3, 100, 10); + + let events = vec![ + ReadEvent::Request(req1), + ReadEvent::Request(req2), + ReadEvent::Request(req3), + ReadEvent::Polled(2), + ]; + + let event_stream = stream::iter(events); + let metrics_registry = DefaultMetricsRegistry::default(); + let metrics = RequestMetrics::new(&metrics_registry, vec![]); + let io_stream = IoRequestStream::new( + event_stream, + Some(CoalesceConfig { + distance: 60, + max_size: 1024, + }), + Alignment::none(), + metrics, + ); + + let outputs: Vec = io_stream.collect().await; + assert_eq!(outputs.len(), 1); + + let snapshot = metrics_registry.snapshot(); + let mut registered = 0u64; + let mut polled = 0u64; + let mut coalesced = 0u64; + let mut registered_only_count = 0usize; + let mut registered_only_total = 0.0; + let mut polled_in_batch_count = 0usize; + let mut polled_in_batch_total = 0.0; + + for metric in snapshot.iter() { + match metric.value() { + MetricValue::Counter(counter) => match metric.name().as_ref() { + "io.requests.registered" => registered = counter.value(), + "io.requests.polled" => polled = counter.value(), + "io.requests.coalesced" => coalesced = counter.value(), + _ => {} + }, + MetricValue::Histogram(histogram) => match metric.name().as_ref() { + "io.requests.batched.registered_only_requests" => { + registered_only_count = histogram.count(); + registered_only_total = histogram.total(); + } + "io.requests.batched.polled_requests" => { + polled_in_batch_count = histogram.count(); + polled_in_batch_total = histogram.total(); + } + _ => {} + }, + _ => {} + } + } + + assert_eq!(registered, 3, "Expected 3 registered requests"); + assert_eq!(polled, 1, "Expected 1 polled request"); + assert_eq!(coalesced, 1, "Expected 1 coalesced operation"); + assert_eq!( + registered_only_count, 1, + "Expected one histogram sample for registered-only requests" + ); + assert_eq!( + registered_only_total, 2.0, + "Expected two registered-only requests in the coalesced batch" + ); + assert_eq!( + polled_in_batch_count, 1, + "Expected one histogram sample for polled requests" + ); + assert_eq!( + polled_in_batch_total, 1.0, + "Expected one polled request in the coalesced batch" + ); + } } diff --git a/vortex-file/src/segments/source.rs b/vortex-file/src/segments/source.rs index 8f83150c4bb..238496d5e7a 100644 --- a/vortex-file/src/segments/source.rs +++ b/vortex-file/src/segments/source.rs @@ -39,6 +39,10 @@ pub enum ReadEvent { Request(ReadRequest), Polled(RequestId), Dropped(RequestId), + /// Signals that a logical batch of requests has been fully registered. + /// The driver promotes all registered-but-unpolled requests to polled status, + /// allowing the coalescer to form optimal reads over the entire batch. + BatchBoundary, } /// A [`SegmentSource`] for file-like IO. @@ -134,6 +138,10 @@ impl FileSegmentSource { } impl SegmentSource for FileSegmentSource { + fn flush(&self) { + drop(self.events.unbounded_send(ReadEvent::BatchBoundary)); + } + fn request(&self, id: SegmentId) -> SegmentFuture { // We eagerly register the read request here assuming the behaviour of [`FileRead`], where // coalescing becomes effective prior to the future being polled. @@ -232,14 +240,31 @@ impl Drop for ReadFuture { } pub struct RequestMetrics { + pub registered_requests: Counter, + pub polled_requests: Counter, + pub dropped_requests: Counter, pub individual_requests: Counter, pub coalesced_requests: Counter, pub num_requests_coalesced: Histogram, + pub batched_range_bytes: Histogram, + pub batched_payload_bytes: Histogram, + pub batched_registered_only_requests: Histogram, + pub batched_polled_requests: Histogram, + pub batched_skipped_max_size: Counter, } impl RequestMetrics { pub fn new(metrics_registry: &dyn MetricsRegistry, labels: Vec