Skip to content

Commit

Permalink
Move ExecutionStats into its own module (#151)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Sep 20, 2024
1 parent 3c3daa1 commit 8247555
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 66 deletions.
69 changes: 3 additions & 66 deletions src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

//! [`ExecutionContext`]: DataFusion based execution context for running SQL queries
//!
use std::sync::Arc;
mod stats;
pub use stats::{collect_plan_stats, ExecutionStats};

use color_eyre::eyre::Result;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::{visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor};
use datafusion::prelude::*;
use datafusion::sql::parser::Statement;
use log::info;
use tokio_stream::StreamExt;
#[cfg(feature = "flightsql")]
use {
Expand Down Expand Up @@ -134,66 +134,3 @@ impl ExecutionContext {
.await
}
}

// #[derive(Debug, Clone)]
// pub struct ExecMetrics {
// name: String,
// bytes_scanned: usize,
// }

#[derive(Clone, Debug)]
pub struct ExecutionStats {
bytes_scanned: usize,
// exec_metrics: Vec<ExecMetrics>,
}

impl ExecutionStats {
pub fn bytes_scanned(&self) -> usize {
self.bytes_scanned
}
}

#[derive(Default)]
struct PlanVisitor {
total_bytes_scanned: usize,
// exec_metrics: Vec<ExecMetrics>,
}

impl From<PlanVisitor> for ExecutionStats {
fn from(value: PlanVisitor) -> Self {
Self {
bytes_scanned: value.total_bytes_scanned,
}
}
}

impl ExecutionPlanVisitor for PlanVisitor {
type Error = datafusion_common::DataFusionError;

fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
match plan.metrics() {
Some(metrics) => match metrics.sum_by_name("bytes_scanned") {
Some(bytes_scanned) => {
info!("Adding {} to total_bytes_scanned", bytes_scanned.as_usize());
self.total_bytes_scanned += bytes_scanned.as_usize();
}
None => {
info!("No bytes_scanned for {}", plan.name())
}
},
None => {
info!("No MetricsSet for {}", plan.name())
}
}
Ok(true)
}
}

pub fn collect_plan_stats(plan: Arc<dyn ExecutionPlan>) -> Option<ExecutionStats> {
let mut visitor = PlanVisitor::default();
if visit_execution_plan(plan.as_ref(), &mut visitor).is_ok() {
Some(visitor.into())
} else {
None
}
}
77 changes: 77 additions & 0 deletions src/execution/stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::physical_plan::{visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor};
use log::info;
use std::sync::Arc;

#[derive(Clone, Debug)]
pub struct ExecutionStats {
bytes_scanned: usize,
// exec_metrics: Vec<ExecMetrics>,
}

impl ExecutionStats {
pub fn bytes_scanned(&self) -> usize {
self.bytes_scanned
}
}

#[derive(Default)]
struct PlanVisitor {
total_bytes_scanned: usize,
// exec_metrics: Vec<ExecMetrics>,
}

impl From<PlanVisitor> for ExecutionStats {
fn from(value: PlanVisitor) -> Self {
Self {
bytes_scanned: value.total_bytes_scanned,
}
}
}

impl ExecutionPlanVisitor for PlanVisitor {
type Error = datafusion_common::DataFusionError;

fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> color_eyre::Result<bool, Self::Error> {
match plan.metrics() {
Some(metrics) => match metrics.sum_by_name("bytes_scanned") {
Some(bytes_scanned) => {
info!("Adding {} to total_bytes_scanned", bytes_scanned.as_usize());
self.total_bytes_scanned += bytes_scanned.as_usize();
}
None => {
info!("No bytes_scanned for {}", plan.name())
}
},
None => {
info!("No MetricsSet for {}", plan.name())
}
}
Ok(true)
}
}

pub fn collect_plan_stats(plan: Arc<dyn ExecutionPlan>) -> Option<ExecutionStats> {
let mut visitor = PlanVisitor::default();
if visit_execution_plan(plan.as_ref(), &mut visitor).is_ok() {
Some(visitor.into())
} else {
None
}
}

0 comments on commit 8247555

Please sign in to comment.