diff --git a/Cargo.lock b/Cargo.lock index 94552282..adfbf4eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -510,6 +510,12 @@ version = "3.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" +[[package]] +name = "bytemuck" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8334215b81e418a0a7bdb8ef0849474f40bb10c8b71f1c4ed315cff49f32494d" + [[package]] name = "byteorder" version = "1.5.0" @@ -1940,11 +1946,12 @@ checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" [[package]] name = "orc-rust" -version = "0.3.1" -source = "git+https://github.com/blaze-init/datafusion-orc.git?rev=c54bfb5#c54bfb5bed3e74a51229be54373828e739dd53e6" +version = "0.4.1" +source = "git+https://github.com/blaze-init/datafusion-orc.git?rev=9c74ac3#9c74ac3779c2ebb371404d71adb915b49eee1930" dependencies = [ "arrow", "async-trait", + "bytemuck", "bytes", "chrono", "chrono-tz", diff --git a/Cargo.toml b/Cargo.toml index 8c59dadc..9b7700c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,7 @@ datafusion-ext-plans = { path = "./native-engine/datafusion-ext-plans" } # datafusion: branch=v42-blaze datafusion = { version = "42.0.0" } -orc-rust = { version = "0.3.1" } +orc-rust = { version = "0.4.1" } # arrow: branch=v53-blaze arrow = { version = "53.0.0", features = ["ffi"]} @@ -72,7 +72,7 @@ datafusion-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", datafusion-execution = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "9a09e14"} datafusion-optimizer = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "9a09e14"} datafusion-physical-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "9a09e14"} -orc-rust = { git = "https://github.com/blaze-init/datafusion-orc.git", rev = "c54bfb5"} +orc-rust = { git = "https://github.com/blaze-init/datafusion-orc.git", rev = "9c74ac3"} # arrow: branch=v53-blaze arrow = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"} diff --git a/native-engine/datafusion-ext-plans/src/orc_exec.rs b/native-engine/datafusion-ext-plans/src/orc_exec.rs index 4ae11395..06ca1a45 100644 --- a/native-engine/datafusion-ext-plans/src/orc_exec.rs +++ b/native-engine/datafusion-ext-plans/src/orc_exec.rs @@ -29,7 +29,10 @@ use datafusion::{ execution::context::TaskContext, physical_expr::EquivalenceProperties, physical_plan::{ - metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricValue, MetricsSet, Time}, + metrics::{ + BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, + MetricsSet, Time, + }, stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Metric, Partitioning, PhysicalExpr, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -166,11 +169,11 @@ impl ExecutionPlan for OrcExec { }; let opener = OrcOpener { + partition_index, projection, batch_size: batch_size(), - _limit: self.base_config.limit, table_schema: self.base_config.file_schema.clone(), - _metrics: self.metrics.clone(), + metrics: self.metrics.clone(), fs_provider, }; @@ -210,20 +213,31 @@ impl ExecutionPlan for OrcExec { } struct OrcOpener { + partition_index: usize, projection: Vec, batch_size: usize, - _limit: Option, table_schema: SchemaRef, - _metrics: ExecutionPlanMetricsSet, + metrics: ExecutionPlanMetricsSet, fs_provider: Arc, } impl FileOpener for OrcOpener { fn open(&self, file_meta: FileMeta) -> Result { - let reader = OrcFileReaderRef(Arc::new(InternalFileReader::try_new( - self.fs_provider.clone(), - file_meta.object_meta.clone(), - )?)); + let reader = OrcFileReaderRef { + inner: Arc::new(InternalFileReader::try_new( + self.fs_provider.clone(), + file_meta.object_meta.clone(), + )?), + metrics: OrcFileMetrics::new( + self.partition_index, + file_meta + .object_meta + .location + .filename() + .unwrap_or("missing filename"), + &self.metrics.clone(), + ), + }; let batch_size = self.batch_size; let projection = self.projection.clone(); let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?); @@ -233,7 +247,7 @@ impl FileOpener for OrcOpener { let mut builder = ArrowReaderBuilder::try_new_async(reader) .await .or_else(|err| df_execution_err!("create orc reader error: {err}"))?; - if let Some(range) = file_meta.range.clone() { + if let Some(range) = &file_meta.range { let range = range.start as usize..range.end as usize; builder = builder.with_file_byte_range(range); } @@ -265,11 +279,14 @@ impl FileOpener for OrcOpener { } #[derive(Clone)] -struct OrcFileReaderRef(Arc); +struct OrcFileReaderRef { + inner: Arc, + metrics: OrcFileMetrics, +} impl AsyncChunkReader for OrcFileReaderRef { fn len(&mut self) -> BoxFuture<'_, std::io::Result> { - async move { Ok(self.0.get_meta().size as u64) }.boxed() + async move { Ok(self.inner.get_meta().size as u64) }.boxed() } fn get_bytes( @@ -277,9 +294,31 @@ impl AsyncChunkReader for OrcFileReaderRef { offset_from_start: u64, length: u64, ) -> BoxFuture<'_, std::io::Result> { + let inner = self.inner.clone(); let offset_from_start = offset_from_start as usize; let length = length as usize; let range = offset_from_start..(offset_from_start + length); - async move { self.0.read_fully(range).map_err(|e| e.into()) }.boxed() + self.metrics.bytes_scanned.add(length); + async move { + tokio::task::spawn_blocking(move || inner.read_fully(range).map_err(|e| e.into())) + .await + .expect("tokio spawn_blocking error") + } + .boxed() + } +} + +#[derive(Clone)] +struct OrcFileMetrics { + bytes_scanned: Count, +} + +impl OrcFileMetrics { + pub fn new(partition: usize, filename: &str, metrics: &ExecutionPlanMetricsSet) -> Self { + let bytes_scanned = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .counter("bytes_scanned", partition); + + Self { bytes_scanned } } }