Skip to content

Commit

Permalink
feat: support pagination on task_history (#15047)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhiHanZ authored Mar 21, 2024
1 parent c4dcf91 commit 341fb34
Show file tree
Hide file tree
Showing 10 changed files with 223 additions and 32 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions src/common/cloud_control/proto/task.proto
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ message ShowTaskRunsRequest {
repeated string owners = 6;
repeated string task_ids = 7;
string task_name = 8;

optional int32 page_size = 90; // 100 by default
optional int64 next_page_token = 91;
optional int64 previous_page_token = 92;
}

message TaskRun {
Expand Down Expand Up @@ -205,6 +209,8 @@ message TaskRun {
message ShowTaskRunsResponse {
repeated TaskRun task_runs = 1;
optional TaskError error = 2;
optional int64 next_page_token = 6;
optional int64 previous_page_token = 7;
}

message GetTaskDependentsRequest {
Expand Down
26 changes: 26 additions & 0 deletions src/common/cloud_control/src/task_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use databend_common_exception::Result;
use tonic::transport::Channel;
use tonic::Request;

use crate::client_config::make_request;
use crate::client_config::ClientConfig;
use crate::pb::task_service_client::TaskServiceClient;
use crate::pb::AlterTaskRequest;
use crate::pb::AlterTaskResponse;
Expand Down Expand Up @@ -104,6 +106,30 @@ impl TaskClient {
Ok(resp.into_inner())
}

pub async fn show_task_runs_full(
&self,
config: ClientConfig,
req: crate::pb::ShowTaskRunsRequest,
) -> Result<Vec<crate::pb::ShowTaskRunsResponse>> {
let mut client = self.task_client.clone();
let request = make_request(req.clone(), config.clone());
let resp = client.show_task_runs(request).await?;
let mut has_next = resp.get_ref().next_page_token.is_some();
// it is a pagination request, so we need to handle the response
let mut result = vec![resp.into_inner()];
while has_next {
let mut req = req.clone();
req.next_page_token = result.last().unwrap().next_page_token;
let resp = client
.show_task_runs(make_request(req.clone(), config.clone()))
.await?;
let resp = resp.into_inner();
has_next = resp.next_page_token.is_some();
result.push(resp);
}
Ok(result)
}

pub async fn get_task_dependents(
&self,
req: Request<crate::pb::GetTaskDependentsRequest>,
Expand Down
2 changes: 2 additions & 0 deletions src/common/cloud_control/tests/it/task_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ impl TaskService for MockTaskService {
Ok(Response::new(ShowTaskRunsResponse {
task_runs: vec![],
error: None,
next_page_token: None,
previous_page_token: None,
}))
}

Expand Down
1 change: 1 addition & 0 deletions src/query/storages/system/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ jsonb = { workspace = true }
async-backtrace = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
chrono-tz = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
Expand Down
63 changes: 54 additions & 9 deletions src/query/storages/system/src/task_history_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,18 @@

use std::sync::Arc;

use chrono_tz::Tz::UTC;
use databend_common_catalog::plan::PushDownInfo;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::TableContext;
use databend_common_cloud_control::client_config::build_client_config;
use databend_common_cloud_control::client_config::make_request;
use databend_common_cloud_control::cloud_api::CloudControlApiProvider;
use databend_common_cloud_control::pb::ShowTaskRunsRequest;
use databend_common_cloud_control::pb::TaskRun;
use databend_common_config::GlobalConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::date_helper::DateConverter;
use databend_common_expression::infer_table_schema;
use databend_common_expression::types::Int32Type;
use databend_common_expression::types::Int64Type;
Expand All @@ -34,13 +35,18 @@ use databend_common_expression::types::UInt64Type;
use databend_common_expression::types::VariantType;
use databend_common_expression::DataBlock;
use databend_common_expression::FromData;
use databend_common_expression::Scalar;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_meta_app::schema::TableIdent;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableMeta;
use databend_common_sql::plans::task_run_schema;

use crate::table::AsyncOneBlockSystemTable;
use crate::table::AsyncSystemTable;
use crate::util::find_eq_filter;
use crate::util::find_gt_filter;
use crate::util::find_lt_filter;

pub fn parse_task_runs_to_datablock(task_runs: Vec<TaskRun>) -> Result<DataBlock> {
let mut name: Vec<String> = Vec::with_capacity(task_runs.len());
Expand Down Expand Up @@ -122,7 +128,7 @@ impl AsyncSystemTable for TaskHistoryTable {
async fn get_full_data(
&self,
ctx: Arc<dyn TableContext>,
_push_downs: Option<PushDownInfo>,
push_downs: Option<PushDownInfo>,
) -> Result<DataBlock> {
let config = GlobalConfig::instance();
if config.query.cloud_control_grpc_server_address.is_none() {
Expand All @@ -135,28 +141,67 @@ impl AsyncSystemTable for TaskHistoryTable {
let query_id = ctx.get_id();
let user = ctx.get_current_user()?.identity().to_string();
let available_roles = ctx.get_available_roles().await?;
// TODO: limit push_down does NOT work during tests,we need to fix it later.
let result_limit = push_downs
.as_ref()
.map(|v| v.limit.map(|i| i as i32))
.unwrap_or(None);
let mut task_name = None;
let mut scheduled_time_start = None;
let mut scheduled_time_end = None;
if let Some(push_downs) = push_downs {
if let Some(filter) = push_downs.filters.as_ref().map(|f| &f.filter) {
let expr = filter.as_expr(&BUILTIN_FUNCTIONS);
find_eq_filter(&expr, &mut |col_name, scalar| {
if col_name == "name" {
if let Scalar::String(s) = scalar {
task_name = Some(s.clone());
}
}
});
find_lt_filter(&expr, &mut |col_name, scalar| {
if col_name == "scheduled_time" {
if let Scalar::Timestamp(s) = scalar {
scheduled_time_end = Some(s.to_timestamp(UTC).to_rfc3339());
}
}
});
find_gt_filter(&expr, &mut |col_name, scalar| {
if col_name == "scheduled_time" {
if let Scalar::Timestamp(s) = scalar {
scheduled_time_start = Some(s.to_timestamp(UTC).to_rfc3339());
}
}
});
}
}
let req = ShowTaskRunsRequest {
tenant_id: tenant.to_string(),
scheduled_time_start: "".to_string(),
scheduled_time_end: "".to_string(),
task_name: "".to_string(),
result_limit: 10000, // TODO: use plan.limit pushdown
scheduled_time_start: scheduled_time_start.unwrap_or("".to_string()),
scheduled_time_end: scheduled_time_end.unwrap_or("".to_string()),
task_name: task_name.unwrap_or("".to_string()),
result_limit: result_limit.unwrap_or(0), // 0 means default
error_only: false,
owners: available_roles
.into_iter()
.map(|x| x.identity().to_string())
.collect(),
next_page_token: None,
page_size: None,
previous_page_token: None,
task_ids: vec![],
};

let cloud_api = CloudControlApiProvider::instance();
let task_client = cloud_api.get_task_client();
let config =
build_client_config(tenant.to_string(), user, query_id, cloud_api.get_timeout());
let req = make_request(req, config);

let resp = task_client.show_task_runs(req).await?;
let trs = resp.task_runs;
let resp = task_client.show_task_runs_full(config, req).await?;
let trs = resp
.into_iter()
.flat_map(|r| r.task_runs)
.collect::<Vec<_>>();

parse_task_runs_to_datablock(trs)
}
Expand Down
60 changes: 60 additions & 0 deletions src/query/storages/system/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,63 @@ pub fn find_eq_filter(expr: &Expr<String>, visitor: &mut impl FnMut(&str, &Scala
}
}
}

pub fn find_gt_filter(expr: &Expr<String>, visitor: &mut impl FnMut(&str, &Scalar)) {
match expr {
Expr::Constant { .. } | Expr::ColumnRef { .. } => {}
Expr::Cast { expr, .. } => find_gt_filter(expr, visitor),
Expr::FunctionCall { function, args, .. } => {
if function.signature.name == "gt" || function.signature.name == "gte" {
match args.as_slice() {
[Expr::ColumnRef { id, .. }, Expr::Constant { scalar, .. }]
| [Expr::Constant { scalar, .. }, Expr::ColumnRef { id, .. }] => {
visitor(id, scalar);
}
_ => {}
}
} else if function.signature.name == "and_filters" {
// only support this:
// 1. where xx and xx and xx
// 2. filter: Column `table`, Column `database`
for arg in args {
find_gt_filter(arg, visitor)
}
}
}
Expr::LambdaFunctionCall { args, .. } => {
for arg in args {
find_gt_filter(arg, visitor)
}
}
}
}

pub fn find_lt_filter(expr: &Expr<String>, visitor: &mut impl FnMut(&str, &Scalar)) {
match expr {
Expr::Constant { .. } | Expr::ColumnRef { .. } => {}
Expr::Cast { expr, .. } => find_lt_filter(expr, visitor),
Expr::FunctionCall { function, args, .. } => {
if function.signature.name == "lt" || function.signature.name == "lte" {
match args.as_slice() {
[Expr::ColumnRef { id, .. }, Expr::Constant { scalar, .. }]
| [Expr::Constant { scalar, .. }, Expr::ColumnRef { id, .. }] => {
visitor(id, scalar);
}
_ => {}
}
} else if function.signature.name == "and_filters" {
// only support this:
// 1. where xx and xx and xx
// 2. filter: Column `table`, Column `database`
for arg in args {
find_lt_filter(arg, visitor)
}
}
}
Expr::LambdaFunctionCall { args, .. } => {
for arg in args {
find_lt_filter(arg, visitor)
}
}
}
}
45 changes: 42 additions & 3 deletions tests/cloud_control_server/simple_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def load_data_from_json():
task = task_pb2.Task()
json_format.ParseDict(task_run_data["Task"], task)
TASK_DB[task.task_name] = task
TASK_RUN_DB["MockTask"] = create_mock_task_runs_from_task(TASK_DB["SampleTask"], 10)
notification_history_directory_path = os.path.join(
script_directory, "testdata", "notification_history"
)
Expand Down Expand Up @@ -142,6 +143,16 @@ def create_task_run_from_task(task):
return task_run


def create_mock_task_runs_from_task(task, num):
task_runs = []
for i in range(0, num):
task_run = create_task_run_from_task(task)
task_run.task_name = "MockTask"
task_run.run_id = "1ftx" + str(i)
task_runs.append(task_run)
return task_runs


class TaskService(task_pb2_grpc.TaskServiceServicer):
def CreateTask(self, request, context):
print("CreateTask", request)
Expand Down Expand Up @@ -290,7 +301,7 @@ def AlterTask(self, request, context):
def ExecuteTask(self, request, context):
print("ExecuteTask", request)
for task_name, task in TASK_DB.items():
TASK_RUN_DB[task_name] = create_task_run_from_task(task)
TASK_RUN_DB[task_name] = [create_task_run_from_task(task)]
return task_pb2.ExecuteTaskResponse(error=None)

def ShowTasks(self, request, context):
Expand All @@ -300,8 +311,36 @@ def ShowTasks(self, request, context):

def ShowTaskRuns(self, request, context):
print("ShowTaskRuns", request)
task_runs = list(TASK_RUN_DB.values())
return task_pb2.ShowTaskRunsResponse(task_runs=task_runs)
task_runs = [item for sublist in TASK_RUN_DB.values() for item in sublist]
task_runs = sorted(task_runs, key=lambda x: x.run_id)

if len(request.task_name) > 0:
print("Limiting task_name to", request.task_name)
task_runs = list(
filter(lambda x: x.task_name == request.task_name, task_runs)
)
# limit and sort by run_id
if request.result_limit > 0:
print("Limiting result to", request.result_limit)
task_runs = task_runs[: request.result_limit]
if request.result_limit < num_results:
num_results = request.result_limit
num_results = len(task_runs)
# pagination
start_index = 0
page_size = 2
if request.HasField("next_page_token"):
print("Next page token", request.next_page_token)
start_index = request.next_page_token

end_index = start_index + page_size
next_page_token = end_index
if end_index > num_results:
next_page_token = None
task_runs = task_runs[start_index:end_index]
return task_pb2.ShowTaskRunsResponse(
task_runs=task_runs, next_page_token=next_page_token
)

def GetTaskDependents(self, request, context):
print("GetTaskDependents", request)
Expand Down
Loading

0 comments on commit 341fb34

Please sign in to comment.