Skip to content

Commit

Permalink
Merge pull request #8593 from BohuTANG/dev-interpreter-refine-1
Browse files Browse the repository at this point in the history
refactor(interpreter): move interpreter_common.rs to common/
  • Loading branch information
BohuTANG authored Nov 2, 2022
2 parents 2686a7b + a41f14a commit c2f2677
Show file tree
Hide file tree
Showing 14 changed files with 322 additions and 252 deletions.
115 changes: 115 additions & 0 deletions src/query/service/src/interpreters/common/file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io;
use std::sync::Arc;

use chrono::TimeZone;
use chrono::Utc;
use common_catalog::table_context::TableContext;
use common_exception::Result;
use common_meta_types::StageFile;
use common_meta_types::UserStageInfo;
use common_storages_factory::stage::StageTable;
use futures_util::TryStreamExt;
use tracing::debug;
use tracing::warn;

use crate::sessions::QueryContext;

pub async fn stat_file(
ctx: &Arc<QueryContext>,
stage: &UserStageInfo,
path: &str,
) -> Result<StageFile> {
let table_ctx: Arc<dyn TableContext> = ctx.clone();
let op = StageTable::get_op(&table_ctx, stage)?;
let meta = op.object(path).metadata().await?;
Ok(StageFile {
path: path.to_string(),
size: meta.content_length(),
md5: meta.content_md5().map(str::to_string),
last_modified: meta
.last_modified()
.map_or(Utc::now(), |t| Utc.timestamp(t.unix_timestamp(), 0)),
creator: None,
etag: meta.etag().map(str::to_string),
})
}

/// List files from DAL in recursive way.
///
/// - If input path is a dir, we will list it recursively.
/// - Or, we will append the file itself, and try to list `path/`.
/// - If not exist, we will try to list `path/` too.
///
/// TODO(@xuanwo): return a stream instead.
pub async fn list_files(
ctx: &Arc<QueryContext>,
stage: &UserStageInfo,
path: &str,
) -> Result<Vec<StageFile>> {
let table_ctx: Arc<dyn TableContext> = ctx.clone();
let op = StageTable::get_op(&table_ctx, stage)?;
let mut files = Vec::new();

// - If the path itself is a dir, return directly.
// - Otherwise, return a path suffix by `/`
// - If other errors happen, we will ignore them by returning None.
let dir_path = match op.object(path).metadata().await {
Ok(meta) if meta.mode().is_dir() => Some(path.to_string()),
Ok(meta) if !meta.mode().is_dir() => {
files.push((path.to_string(), meta));

None
}
Err(e) if e.kind() == io::ErrorKind::NotFound => None,
Err(e) => return Err(e.into()),
_ => None,
};

// Check the if this dir valid and list it recursively.
if let Some(dir) = dir_path {
match op.object(&dir).metadata().await {
Ok(_) => {
let mut ds = op.batch().walk_top_down(&dir)?;
while let Some(de) = ds.try_next().await? {
if de.mode().is_file() {
let path = de.path().to_string();
let meta = de.metadata().await;
files.push((path, meta));
}
}
}
Err(e) => warn!("ignore listing {path}/, because: {:?}", e),
};
}

let matched_files = files
.into_iter()
.map(|(name, meta)| StageFile {
path: name,
size: meta.content_length(),
md5: meta.content_md5().map(str::to_string),
last_modified: meta
.last_modified()
.map_or(Utc::now(), |t| Utc.timestamp(t.unix_timestamp(), 0)),
creator: None,
etag: meta.etag().map(str::to_string),
})
.collect::<Vec<StageFile>>();

debug!("listed files: {:?}", matched_files);
Ok(matched_files)
}
58 changes: 58 additions & 0 deletions src/query/service/src/interpreters/common/grant.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_catalog::table_context::TableContext;
use common_exception::Result;
use common_meta_types::GrantObject;

use crate::sessions::QueryContext;

pub async fn validate_grant_object_exists(
ctx: &Arc<QueryContext>,
object: &GrantObject,
) -> Result<()> {
let tenant = ctx.get_tenant();

match &object {
GrantObject::Table(catalog_name, database_name, table_name) => {
let catalog = ctx.get_catalog(catalog_name)?;
if !catalog
.exists_table(tenant.as_str(), database_name, table_name)
.await?
{
return Err(common_exception::ErrorCode::UnknownTable(format!(
"table {}.{} not exists",
database_name, table_name,
)));
}
}
GrantObject::Database(catalog_name, database_name) => {
let catalog = ctx.get_catalog(catalog_name)?;
if !catalog
.exists_database(tenant.as_str(), database_name)
.await?
{
return Err(common_exception::ErrorCode::UnknownDatabase(format!(
"database {} not exists",
database_name,
)));
}
}
GrantObject::Global => (),
}

Ok(())
}
22 changes: 22 additions & 0 deletions src/query/service/src/interpreters/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod file;
mod grant;
mod table;

pub use file::list_files;
pub use file::stat_file;
pub use grant::validate_grant_object_exists;
pub use table::append2table;
86 changes: 86 additions & 0 deletions src/query/service/src/interpreters/common/table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_base::base::GlobalIORuntime;
use common_catalog::table::Table;
use common_catalog::table_context::TableContext;
use common_datavalues::DataSchemaRef;
use common_exception::Result;
use common_pipeline_core::Pipeline;

use crate::pipelines::processors::TransformAddOn;
use crate::pipelines::PipelineBuildResult;
use crate::sessions::QueryContext;

fn fill_missing_columns(
ctx: Arc<QueryContext>,
source_schema: &DataSchemaRef,
target_schema: &DataSchemaRef,
pipeline: &mut Pipeline,
) -> Result<()> {
let need_fill_missing_columns = target_schema != source_schema;
if need_fill_missing_columns {
pipeline.add_transform(|transform_input_port, transform_output_port| {
TransformAddOn::try_create(
transform_input_port,
transform_output_port,
source_schema.clone(),
target_schema.clone(),
ctx.clone(),
)
})?;
}
Ok(())
}

pub fn append2table(
ctx: Arc<QueryContext>,
table: Arc<dyn Table>,
source_schema: DataSchemaRef,
build_res: &mut PipelineBuildResult,
overwrite: bool,
need_commit: bool,
) -> Result<()> {
fill_missing_columns(
ctx.clone(),
&source_schema,
&table.schema(),
&mut build_res.main_pipeline,
)?;

table.append_data(ctx.clone(), &mut build_res.main_pipeline, false)?;

if need_commit {
build_res.main_pipeline.set_on_finished(move |may_error| {
// capture out variable
let overwrite = overwrite;
let ctx = ctx.clone();
let table = table.clone();

if may_error.is_none() {
let append_entries = ctx.consume_precommit_blocks();
// We must put the commit operation to global runtime, which will avoid the "dispatch dropped without returning error" in tower
return GlobalIORuntime::instance().block_on(async move {
table.commit_insertion(ctx, append_entries, overwrite).await
});
}

Err(may_error.as_ref().unwrap().clone())
});
}

Ok(())
}
Loading

1 comment on commit c2f2677

@vercel
Copy link

@vercel vercel bot commented on c2f2677 Nov 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-git-main-databend.vercel.app
databend.rs
databend.vercel.app
databend-databend.vercel.app

Please sign in to comment.