Skip to content

Commit

Permalink
fix(executor): queries exeuctor points calc incorrectly (#15349)
Browse files Browse the repository at this point in the history
* fix(executor): schedule points calc problem

Signed-off-by: Liuqing Yue <[email protected]>

* chore: make clippy happy

Signed-off-by: Liuqing Yue <[email protected]>

* chore: make clippy happy

Signed-off-by: Liuqing Yue <[email protected]>

* disable queries executor

Signed-off-by: Liuqing Yue <[email protected]>

---------

Signed-off-by: Liuqing Yue <[email protected]>
Co-authored-by: Winter Zhang <[email protected]>
  • Loading branch information
dqhl76 and zhang2014 authored Apr 27, 2024
1 parent 7edfa1f commit d8ddbec
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 3 deletions.
12 changes: 9 additions & 3 deletions src/query/service/src/pipelines/executor/executor_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,16 +461,18 @@ impl ExecutingGraph {
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(old_value) => {
return (old_value & EPOCH_MASK) as u32 == global_epoch;
Ok(_) => {
return (desired_value & EPOCH_MASK) as u32 == global_epoch;
}
Err(new_expected) => {
let remain_points = (new_expected & POINTS_MASK) >> 32;
let epoch = new_expected & EPOCH_MASK;

expected_value = new_expected;
if epoch != global_epoch as u64 {
if epoch > global_epoch as u64 {
desired_value = new_expected;
} else if epoch < global_epoch as u64 {
desired_value = (max_points - 1) << 32 | global_epoch as u64;
} else if remain_points >= 1 {
desired_value = (remain_points - 1) << 32 | epoch;
} else {
Expand Down Expand Up @@ -830,6 +832,10 @@ impl RunningGraph {
self.0.graph[node_index].record_error(error);
}

pub fn get_points(&self) -> u64 {
self.0.points.load(Ordering::SeqCst)
}

pub fn format_graph_nodes(&self) -> String {
pub struct NodeDisplay {
id: usize,
Expand Down
64 changes: 64 additions & 0 deletions src/query/service/tests/it/pipelines/executor/executor_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,70 @@ async fn test_schedule_with_two_tasks() -> Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_schedule_point_simple() -> Result<()> {
let fixture = TestFixture::setup().await?;
let ctx = fixture.new_query_ctx().await?;
let graph = create_simple_pipeline(ctx)?;
let points = graph.get_points();
assert_eq!(points, (3 << 32) | 1);

let res = graph.can_perform_task(1, 3);
let points = graph.get_points();
assert_eq!(points, (2 << 32) | 1);
assert!(res);

let res = graph.can_perform_task(1, 3);
let points = graph.get_points();
assert_eq!(points, (1 << 32) | 1);
assert!(res);

let res = graph.can_perform_task(1, 3);
let points = graph.get_points();
assert_eq!(points, 1);
assert!(res);

let res = graph.can_perform_task(1, 3);
let points = graph.get_points();
assert_eq!(points, (3 << 32) | 2);
assert!(!res);

let res = graph.can_perform_task(1, 3);
let points = graph.get_points();
assert_eq!(points, (3 << 32) | 2);
assert!(!res);

let res = graph.can_perform_task(2, 3);
let points = graph.get_points();
assert_eq!(points, (2 << 32) | 2);
assert!(res);

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_schedule_point_complex() -> Result<()> {
let fixture = TestFixture::setup().await?;
let ctx = fixture.new_query_ctx().await?;
let graph = create_simple_pipeline(ctx)?;

let res = graph.can_perform_task(2, 3);
let points = graph.get_points();
assert_eq!(points, (2 << 32) | 2);
assert!(res);

for _ in 0..5 {
let _ = graph.can_perform_task(2, 3);
}

let res = graph.can_perform_task(3, 3);
let points = graph.get_points();
assert_eq!(points, (2 << 32) | 3);
assert!(res);

Ok(())
}

fn create_simple_pipeline(ctx: Arc<QueryContext>) -> Result<Arc<RunningGraph>> {
let (_rx, sink_pipe) = create_sink_pipe(1)?;
let (_tx, source_pipe) = create_source_pipe(ctx, 1)?;
Expand Down

0 comments on commit d8ddbec

Please sign in to comment.