Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support pagination on task_history #15047

Merged
merged 3 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions src/common/cloud_control/proto/task.proto
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ message ShowTaskRunsRequest {
repeated string owners = 6;
repeated string task_ids = 7;
string task_name = 8;

optional int32 page_size = 90; // 100 by default
optional int64 next_page_token = 91;
optional int64 previous_page_token = 92;
}

message TaskRun {
Expand Down Expand Up @@ -205,6 +209,8 @@ message TaskRun {
message ShowTaskRunsResponse {
repeated TaskRun task_runs = 1;
optional TaskError error = 2;
optional int64 next_page_token = 6;
optional int64 previous_page_token = 7;
}

message GetTaskDependentsRequest {
Expand Down
26 changes: 26 additions & 0 deletions src/common/cloud_control/src/task_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use databend_common_exception::Result;
use tonic::transport::Channel;
use tonic::Request;

use crate::client_config::make_request;
use crate::client_config::ClientConfig;
use crate::pb::task_service_client::TaskServiceClient;
use crate::pb::AlterTaskRequest;
use crate::pb::AlterTaskResponse;
Expand Down Expand Up @@ -104,6 +106,30 @@ impl TaskClient {
Ok(resp.into_inner())
}

pub async fn show_task_runs_full(
&self,
config: ClientConfig,
req: crate::pb::ShowTaskRunsRequest,
) -> Result<Vec<crate::pb::ShowTaskRunsResponse>> {
let mut client = self.task_client.clone();
let request = make_request(req.clone(), config.clone());
let resp = client.show_task_runs(request).await?;
let mut has_next = resp.get_ref().next_page_token.is_some();
// it is a pagination request, so we need to handle the response
let mut result = vec![resp.into_inner()];
while has_next {
let mut req = req.clone();
req.next_page_token = result.last().unwrap().next_page_token;
let resp = client
.show_task_runs(make_request(req.clone(), config.clone()))
.await?;
let resp = resp.into_inner();
has_next = resp.next_page_token.is_some();
result.push(resp);
}
Ok(result)
}

pub async fn get_task_dependents(
&self,
req: Request<crate::pb::GetTaskDependentsRequest>,
Expand Down
2 changes: 2 additions & 0 deletions src/common/cloud_control/tests/it/task_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ impl TaskService for MockTaskService {
Ok(Response::new(ShowTaskRunsResponse {
task_runs: vec![],
error: None,
next_page_token: None,
previous_page_token: None,
}))
}

Expand Down
1 change: 1 addition & 0 deletions src/query/storages/system/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ jsonb = { workspace = true }
async-backtrace = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
chrono-tz = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
Expand Down
63 changes: 54 additions & 9 deletions src/query/storages/system/src/task_history_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,18 @@

use std::sync::Arc;

use chrono_tz::Tz::UTC;
use databend_common_catalog::plan::PushDownInfo;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::TableContext;
use databend_common_cloud_control::client_config::build_client_config;
use databend_common_cloud_control::client_config::make_request;
use databend_common_cloud_control::cloud_api::CloudControlApiProvider;
use databend_common_cloud_control::pb::ShowTaskRunsRequest;
use databend_common_cloud_control::pb::TaskRun;
use databend_common_config::GlobalConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::date_helper::DateConverter;
use databend_common_expression::infer_table_schema;
use databend_common_expression::types::Int32Type;
use databend_common_expression::types::Int64Type;
Expand All @@ -34,13 +35,18 @@ use databend_common_expression::types::UInt64Type;
use databend_common_expression::types::VariantType;
use databend_common_expression::DataBlock;
use databend_common_expression::FromData;
use databend_common_expression::Scalar;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_meta_app::schema::TableIdent;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableMeta;
use databend_common_sql::plans::task_run_schema;

use crate::table::AsyncOneBlockSystemTable;
use crate::table::AsyncSystemTable;
use crate::util::find_eq_filter;
use crate::util::find_gt_filter;
use crate::util::find_lt_filter;

pub fn parse_task_runs_to_datablock(task_runs: Vec<TaskRun>) -> Result<DataBlock> {
let mut name: Vec<String> = Vec::with_capacity(task_runs.len());
Expand Down Expand Up @@ -122,7 +128,7 @@ impl AsyncSystemTable for TaskHistoryTable {
async fn get_full_data(
&self,
ctx: Arc<dyn TableContext>,
_push_downs: Option<PushDownInfo>,
push_downs: Option<PushDownInfo>,
) -> Result<DataBlock> {
let config = GlobalConfig::instance();
if config.query.cloud_control_grpc_server_address.is_none() {
Expand All @@ -135,28 +141,67 @@ impl AsyncSystemTable for TaskHistoryTable {
let query_id = ctx.get_id();
let user = ctx.get_current_user()?.identity().to_string();
let available_roles = ctx.get_available_roles().await?;
// TODO: limit push_down does NOT work during tests,we need to fix it later.
let result_limit = push_downs
.as_ref()
.map(|v| v.limit.map(|i| i as i32))
.unwrap_or(None);
let mut task_name = None;
let mut scheduled_time_start = None;
let mut scheduled_time_end = None;
if let Some(push_downs) = push_downs {
if let Some(filter) = push_downs.filters.as_ref().map(|f| &f.filter) {
let expr = filter.as_expr(&BUILTIN_FUNCTIONS);
find_eq_filter(&expr, &mut |col_name, scalar| {
if col_name == "name" {
if let Scalar::String(s) = scalar {
task_name = Some(s.clone());
}
}
});
find_lt_filter(&expr, &mut |col_name, scalar| {
if col_name == "scheduled_time" {
if let Scalar::Timestamp(s) = scalar {
scheduled_time_end = Some(s.to_timestamp(UTC).to_rfc3339());
}
}
});
find_gt_filter(&expr, &mut |col_name, scalar| {
if col_name == "scheduled_time" {
if let Scalar::Timestamp(s) = scalar {
scheduled_time_start = Some(s.to_timestamp(UTC).to_rfc3339());
}
}
});
}
}
let req = ShowTaskRunsRequest {
tenant_id: tenant.to_string(),
scheduled_time_start: "".to_string(),
scheduled_time_end: "".to_string(),
task_name: "".to_string(),
result_limit: 10000, // TODO: use plan.limit pushdown
scheduled_time_start: scheduled_time_start.unwrap_or("".to_string()),
scheduled_time_end: scheduled_time_end.unwrap_or("".to_string()),
task_name: task_name.unwrap_or("".to_string()),
result_limit: result_limit.unwrap_or(0), // 0 means default
error_only: false,
owners: available_roles
.into_iter()
.map(|x| x.identity().to_string())
.collect(),
next_page_token: None,
page_size: None,
previous_page_token: None,
task_ids: vec![],
};

let cloud_api = CloudControlApiProvider::instance();
let task_client = cloud_api.get_task_client();
let config =
build_client_config(tenant.to_string(), user, query_id, cloud_api.get_timeout());
let req = make_request(req, config);

let resp = task_client.show_task_runs(req).await?;
let trs = resp.task_runs;
let resp = task_client.show_task_runs_full(config, req).await?;
let trs = resp
.into_iter()
.flat_map(|r| r.task_runs)
.collect::<Vec<_>>();

parse_task_runs_to_datablock(trs)
}
Expand Down
60 changes: 60 additions & 0 deletions src/query/storages/system/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,63 @@ pub fn find_eq_filter(expr: &Expr<String>, visitor: &mut impl FnMut(&str, &Scala
}
}
}

pub fn find_gt_filter(expr: &Expr<String>, visitor: &mut impl FnMut(&str, &Scalar)) {
match expr {
Expr::Constant { .. } | Expr::ColumnRef { .. } => {}
Expr::Cast { expr, .. } => find_gt_filter(expr, visitor),
Expr::FunctionCall { function, args, .. } => {
if function.signature.name == "gt" || function.signature.name == "gte" {
match args.as_slice() {
[Expr::ColumnRef { id, .. }, Expr::Constant { scalar, .. }]
| [Expr::Constant { scalar, .. }, Expr::ColumnRef { id, .. }] => {
visitor(id, scalar);
}
_ => {}
}
} else if function.signature.name == "and_filters" {
// only support this:
// 1. where xx and xx and xx
// 2. filter: Column `table`, Column `database`
for arg in args {
find_gt_filter(arg, visitor)
}
}
}
Expr::LambdaFunctionCall { args, .. } => {
for arg in args {
find_gt_filter(arg, visitor)
}
}
}
}

pub fn find_lt_filter(expr: &Expr<String>, visitor: &mut impl FnMut(&str, &Scalar)) {
match expr {
Expr::Constant { .. } | Expr::ColumnRef { .. } => {}
Expr::Cast { expr, .. } => find_lt_filter(expr, visitor),
Expr::FunctionCall { function, args, .. } => {
if function.signature.name == "lt" || function.signature.name == "lte" {
match args.as_slice() {
[Expr::ColumnRef { id, .. }, Expr::Constant { scalar, .. }]
| [Expr::Constant { scalar, .. }, Expr::ColumnRef { id, .. }] => {
visitor(id, scalar);
}
_ => {}
}
} else if function.signature.name == "and_filters" {
// only support this:
// 1. where xx and xx and xx
// 2. filter: Column `table`, Column `database`
for arg in args {
find_lt_filter(arg, visitor)
}
}
}
Expr::LambdaFunctionCall { args, .. } => {
for arg in args {
find_lt_filter(arg, visitor)
}
}
}
}
45 changes: 42 additions & 3 deletions tests/cloud_control_server/simple_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def load_data_from_json():
task = task_pb2.Task()
json_format.ParseDict(task_run_data["Task"], task)
TASK_DB[task.task_name] = task
TASK_RUN_DB["MockTask"] = create_mock_task_runs_from_task(TASK_DB["SampleTask"], 10)
notification_history_directory_path = os.path.join(
script_directory, "testdata", "notification_history"
)
Expand Down Expand Up @@ -142,6 +143,16 @@ def create_task_run_from_task(task):
return task_run


def create_mock_task_runs_from_task(task, num):
task_runs = []
for i in range(0, num):
task_run = create_task_run_from_task(task)
task_run.task_name = "MockTask"
task_run.run_id = "1ftx" + str(i)
task_runs.append(task_run)
return task_runs


class TaskService(task_pb2_grpc.TaskServiceServicer):
def CreateTask(self, request, context):
print("CreateTask", request)
Expand Down Expand Up @@ -290,7 +301,7 @@ def AlterTask(self, request, context):
def ExecuteTask(self, request, context):
print("ExecuteTask", request)
for task_name, task in TASK_DB.items():
TASK_RUN_DB[task_name] = create_task_run_from_task(task)
TASK_RUN_DB[task_name] = [create_task_run_from_task(task)]
return task_pb2.ExecuteTaskResponse(error=None)

def ShowTasks(self, request, context):
Expand All @@ -300,8 +311,36 @@ def ShowTasks(self, request, context):

def ShowTaskRuns(self, request, context):
print("ShowTaskRuns", request)
task_runs = list(TASK_RUN_DB.values())
return task_pb2.ShowTaskRunsResponse(task_runs=task_runs)
task_runs = [item for sublist in TASK_RUN_DB.values() for item in sublist]
task_runs = sorted(task_runs, key=lambda x: x.run_id)

if len(request.task_name) > 0:
print("Limiting task_name to", request.task_name)
task_runs = list(
filter(lambda x: x.task_name == request.task_name, task_runs)
)
# limit and sort by run_id
if request.result_limit > 0:
print("Limiting result to", request.result_limit)
task_runs = task_runs[: request.result_limit]
if request.result_limit < num_results:
num_results = request.result_limit
num_results = len(task_runs)
# pagination
start_index = 0
page_size = 2
if request.HasField("next_page_token"):
print("Next page token", request.next_page_token)
start_index = request.next_page_token

end_index = start_index + page_size
next_page_token = end_index
if end_index > num_results:
next_page_token = None
task_runs = task_runs[start_index:end_index]
return task_pb2.ShowTaskRunsResponse(
task_runs=task_runs, next_page_token=next_page_token
)

def GetTaskDependents(self, request, context):
print("GetTaskDependents", request)
Expand Down
Loading
Loading