diff --git a/src/execution/mod.rs b/src/execution/mod.rs index 6b8b4c9..d0c6b5e 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -17,14 +17,14 @@ //! [`ExecutionContext`]: DataFusion based execution context for running SQL queries //! -use std::sync::Arc; + +mod stats; +pub use stats::{collect_plan_stats, ExecutionStats}; use color_eyre::eyre::Result; use datafusion::execution::SendableRecordBatchStream; -use datafusion::physical_plan::{visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor}; use datafusion::prelude::*; use datafusion::sql::parser::Statement; -use log::info; use tokio_stream::StreamExt; #[cfg(feature = "flightsql")] use { @@ -134,66 +134,3 @@ impl ExecutionContext { .await } } - -// #[derive(Debug, Clone)] -// pub struct ExecMetrics { -// name: String, -// bytes_scanned: usize, -// } - -#[derive(Clone, Debug)] -pub struct ExecutionStats { - bytes_scanned: usize, - // exec_metrics: Vec, -} - -impl ExecutionStats { - pub fn bytes_scanned(&self) -> usize { - self.bytes_scanned - } -} - -#[derive(Default)] -struct PlanVisitor { - total_bytes_scanned: usize, - // exec_metrics: Vec, -} - -impl From for ExecutionStats { - fn from(value: PlanVisitor) -> Self { - Self { - bytes_scanned: value.total_bytes_scanned, - } - } -} - -impl ExecutionPlanVisitor for PlanVisitor { - type Error = datafusion_common::DataFusionError; - - fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result { - match plan.metrics() { - Some(metrics) => match metrics.sum_by_name("bytes_scanned") { - Some(bytes_scanned) => { - info!("Adding {} to total_bytes_scanned", bytes_scanned.as_usize()); - self.total_bytes_scanned += bytes_scanned.as_usize(); - } - None => { - info!("No bytes_scanned for {}", plan.name()) - } - }, - None => { - info!("No MetricsSet for {}", plan.name()) - } - } - Ok(true) - } -} - -pub fn collect_plan_stats(plan: Arc) -> Option { - let mut visitor = PlanVisitor::default(); - if visit_execution_plan(plan.as_ref(), &mut visitor).is_ok() { - Some(visitor.into()) - } else { - None - } -} diff --git a/src/execution/stats.rs b/src/execution/stats.rs new file mode 100644 index 0000000..bff4f29 --- /dev/null +++ b/src/execution/stats.rs @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::physical_plan::{visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor}; +use log::info; +use std::sync::Arc; + +#[derive(Clone, Debug)] +pub struct ExecutionStats { + bytes_scanned: usize, + // exec_metrics: Vec, +} + +impl ExecutionStats { + pub fn bytes_scanned(&self) -> usize { + self.bytes_scanned + } +} + +#[derive(Default)] +struct PlanVisitor { + total_bytes_scanned: usize, + // exec_metrics: Vec, +} + +impl From for ExecutionStats { + fn from(value: PlanVisitor) -> Self { + Self { + bytes_scanned: value.total_bytes_scanned, + } + } +} + +impl ExecutionPlanVisitor for PlanVisitor { + type Error = datafusion_common::DataFusionError; + + fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> color_eyre::Result { + match plan.metrics() { + Some(metrics) => match metrics.sum_by_name("bytes_scanned") { + Some(bytes_scanned) => { + info!("Adding {} to total_bytes_scanned", bytes_scanned.as_usize()); + self.total_bytes_scanned += bytes_scanned.as_usize(); + } + None => { + info!("No bytes_scanned for {}", plan.name()) + } + }, + None => { + info!("No MetricsSet for {}", plan.name()) + } + } + Ok(true) + } +} + +pub fn collect_plan_stats(plan: Arc) -> Option { + let mut visitor = PlanVisitor::default(); + if visit_execution_plan(plan.as_ref(), &mut visitor).is_ok() { + Some(visitor.into()) + } else { + None + } +}