Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task Status refactorings #556

Merged
merged 5 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## \[4.0.0\] - unreleased

This release aims to improve Pueue and to rectify some old design decisions.

### Multi-threaded architecture

Up until recently, Pueue had the subprocesses' (tasks') state live in a dedicated thread.
Client commands that directly affected subprocesses, such as `pueue start --immediate`, were forwarded to that special thread via an `mpsc` channel to be further processed.

Expand All @@ -15,13 +19,29 @@ Commands like `pueue add --immediate install_something && pueue send 0 'y\n'` wo

The new state design fixes this issue, which allows Pueue to do subprocess state manipulation directly inside of the client message handlers, effectively removing any delays.

TLDR: Commands that start/stop/kill/pause tasks only return when the task is actually started/stopped/killed/paused.

### Runtime invariants

Previously, various state related runtime invariants were enforced by convention. For example, a task that is queued should not have a `start` or `enqueued_at` time set.
However, this approach is highly error-prone as it's really hard to always think of everything that needs to be set or cleaned up on every possible state transition.

Luckily, this is an issue that can be fixed in a (rather) elegant way in Rust, using struct enums. That way, those invariants are enforced via the type system during the compile time.
The code is a bit more verbose (~25%), but it prevents a whole category of bugs and while doing this refactoring I actually found at least 2 cases where I forgot to clear a variable.

TLDR: The new task state handling is a bit more verbose, but a lot cleaner and type safe.

### Fixed

- Fixed delay after sending process related commands from client. [#548](https://github.com/Nukesor/pueue/pull/548)

### Change

- **Breaking**: Streamlined `pueue log` parameters to behave the same way was `start`, `pause` or `kill`. [#509](https://github.com/Nukesor/pueue/issues/509)
- **Breaking**: Refactor internal task state. Some task variables have been moved into the `TaskStatus` enum, which now enforces various invariants during compile time via the type system.
Due to this, several subtle time related inconsistencies (task start/stop/enqueue times) have been fixed. [#556](https://github.com/Nukesor/pueue/pull/556) \
**Important: This completely breaks backwards compatibility, including previous state.**
**Important: The Pueue daemon needs to be restarted and the state will be wiped clean.**
- **Breaking**: Streamlined `pueue log` parameters to behave the same way as `start`, `pause` or `kill`. [#509](https://github.com/Nukesor/pueue/issues/509)
- **Breaking**: Remove the `--children` commandline flags, that have been deprecated and no longer serve any function since `v3.0.0`.

### Add
Expand Down
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pueue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ tokio = { workspace = true }
[dev-dependencies]
anyhow = { workspace = true }
assert_cmd = "2"
assert_matches = "1"
better-panic = { workspace = true }
# Make it easy to view log output for select tests.
# Set log level for tests with RUST_LOG=<level>, use with failed tests or
Expand Down
15 changes: 12 additions & 3 deletions pueue/src/client/commands/restart.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::{bail, Result};

use chrono::Local;
use pueue_lib::network::message::*;
use pueue_lib::network::protocol::*;
use pueue_lib::settings::Settings;
Expand Down Expand Up @@ -32,7 +33,9 @@
let new_status = if stashed {
TaskStatus::Stashed { enqueue_at: None }
} else {
TaskStatus::Queued
TaskStatus::Queued {
enqueued_at: Local::now(),
}
};

let state = get_state(stream).await?;
Expand All @@ -50,13 +53,19 @@
state.filter_tasks(done_filter, None)
};

// now pick the failed tasks
// Now pick the failed tasks
let failed = filtered_tasks
.matching_ids
.into_iter()
.filter(|task_id| {
let task = state.tasks.get(task_id).unwrap();
!matches!(task.status, TaskStatus::Done(TaskResult::Success))
!matches!(
task.status,

Check warning on line 63 in pueue/src/client/commands/restart.rs

View check run for this annotation

Codecov / codecov/patch

pueue/src/client/commands/restart.rs#L62-L63

Added lines #L62 - L63 were not covered by tests
TaskStatus::Done {
result: TaskResult::Success,
..
}
)
})
.collect();

Expand Down
37 changes: 28 additions & 9 deletions pueue/src/client/commands/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,27 @@
fn reached_target_status(task: &Task, target_status: &WaitTargetStatus) -> bool {
match target_status {
WaitTargetStatus::Queued => {
task.status == TaskStatus::Queued
|| task.status == TaskStatus::Running
|| matches!(task.status, TaskStatus::Done(_))
matches!(
task.status,
TaskStatus::Queued { .. } | TaskStatus::Running { .. } | TaskStatus::Done { .. }
)
}
WaitTargetStatus::Running => {
task.status == TaskStatus::Running || matches!(task.status, TaskStatus::Done(_))
matches!(
task.status,

Check warning on line 150 in pueue/src/client/commands/wait.rs

View check run for this annotation

Codecov / codecov/patch

pueue/src/client/commands/wait.rs#L149-L150

Added lines #L149 - L150 were not covered by tests
TaskStatus::Running { .. } | TaskStatus::Done { .. }
)
}
WaitTargetStatus::Done => matches!(task.status, TaskStatus::Done { .. }),
WaitTargetStatus::Success => {
matches!(
task.status,
TaskStatus::Done {
result: TaskResult::Success,
..
}
)
}
WaitTargetStatus::Done => matches!(task.status, TaskStatus::Done(_)),
WaitTargetStatus::Success => matches!(task.status, TaskStatus::Done(TaskResult::Success)),
}
}

Expand Down Expand Up @@ -201,7 +213,7 @@
// Check if the task has finished.
// In case it has, show the task's result in human-readable form.
// Color some parts of the output depending on the task's outcome.
if let TaskStatus::Done(result) = &task.status {
if let TaskStatus::Done { result, .. } = &task.status {
let text = match result {
TaskResult::Success => {
let status = style.style_text("0", Some(Color::Green), None);
Expand Down Expand Up @@ -246,8 +258,15 @@

fn get_color_for_status(task_status: &TaskStatus) -> Color {
match task_status {
TaskStatus::Running | TaskStatus::Done(_) => Color::Green,
TaskStatus::Paused | TaskStatus::Locked => Color::White,
TaskStatus::Paused { .. } | TaskStatus::Locked { .. } => Color::White,

Check warning on line 261 in pueue/src/client/commands/wait.rs

View check run for this annotation

Codecov / codecov/patch

pueue/src/client/commands/wait.rs#L261

Added line #L261 was not covered by tests
TaskStatus::Running { .. } => Color::Green,
TaskStatus::Done { result, .. } => {
if matches!(result, TaskResult::Success) {
Color::Green

Check warning on line 265 in pueue/src/client/commands/wait.rs

View check run for this annotation

Codecov / codecov/patch

pueue/src/client/commands/wait.rs#L263-L265

Added lines #L263 - L265 were not covered by tests
} else {
Color::Red

Check warning on line 267 in pueue/src/client/commands/wait.rs

View check run for this annotation

Codecov / codecov/patch

pueue/src/client/commands/wait.rs#L267

Added line #L267 was not covered by tests
}
}
_ => Color::White,
}
}
9 changes: 5 additions & 4 deletions pueue/src/client/display/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ pub fn sort_tasks_by_group(tasks: Vec<Task>) -> BTreeMap<String, Vec<Task>> {
/// If the task doesn't have a start and/or end yet, an empty string will be returned
/// for the respective field.
pub fn formatted_start_end(task: &Task, settings: &Settings) -> (String, String) {
// Get the start time.
let (start, end) = task.start_and_end();

// If the task didn't start yet, just return two empty strings.
let start = match task.start {
let start = match start {
Some(start) => start,
None => return ("".into(), "".into()),
};
Expand All @@ -65,8 +66,8 @@ pub fn formatted_start_end(task: &Task, settings: &Settings) -> (String, String)
.to_string()
};

// Get finish time, if already set. Otherwise only return the formatted start.
let end = match task.end {
// If the task didn't finish yet, only return the formatted start.
let end = match end {
Some(end) => end,
None => return (formatted_start, "".into()),
};
Expand Down
16 changes: 9 additions & 7 deletions pueue/src/client/display/log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
if let Some((_, task_log)) = task_iter.peek() {
if matches!(
&task_log.task.status,
TaskStatus::Done(_) | TaskStatus::Running | TaskStatus::Paused,
TaskStatus::Done { .. } | TaskStatus::Running { .. } | TaskStatus::Paused { .. }
) {
println!();
}
Expand All @@ -122,7 +122,7 @@
// We only show logs of finished or running tasks.
if !matches!(
task.status,
TaskStatus::Done(_) | TaskStatus::Running | TaskStatus::Paused
TaskStatus::Done { .. } | TaskStatus::Running { .. } | TaskStatus::Paused { .. }
) {
return;
}
Expand All @@ -148,9 +148,9 @@
);

let (exit_status, color) = match &task.status {
TaskStatus::Paused => ("paused".into(), Color::White),
TaskStatus::Running => ("running".into(), Color::Yellow),
TaskStatus::Done(result) => match result {
TaskStatus::Paused { .. } => ("paused".into(), Color::White),
TaskStatus::Running { .. } => ("running".into(), Color::Yellow),

Check warning on line 152 in pueue/src/client/display/log/mod.rs

View check run for this annotation

Codecov / codecov/patch

pueue/src/client/display/log/mod.rs#L151-L152

Added lines #L151 - L152 were not covered by tests
TaskStatus::Done { result, .. } => match result {
TaskResult::Success => ("completed successfully".into(), Color::Green),
TaskResult::Failed(exit_code) => {
(format!("failed with exit code {exit_code}"), Color::Red)
Expand Down Expand Up @@ -197,14 +197,16 @@
]);
}

let (start, end) = task.start_and_end();

// Start and end time
if let Some(start) = task.start {
if let Some(start) = start {
table.add_row(vec![
style.styled_cell("Start:", None, Some(ComfyAttribute::Bold)),
Cell::new(start.to_rfc2822()),
]);
}
if let Some(end) = task.end {
if let Some(end) = end {
table.add_row(vec![
style.styled_cell("End:", None, Some(ComfyAttribute::Bold)),
Cell::new(end.to_rfc2822()),
Expand Down
8 changes: 5 additions & 3 deletions pueue/src/client/display/table_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,11 @@
// Determine the human readable task status representation and the respective color.
let status_string = task.status.to_string();
let (status_text, color) = match &task.status {
TaskStatus::Running => (status_string, Color::Green),
TaskStatus::Paused | TaskStatus::Locked => (status_string, Color::White),
TaskStatus::Done(result) => match result {
TaskStatus::Running { .. } => (status_string, Color::Green),

Check warning on line 211 in pueue/src/client/display/table_builder.rs

View check run for this annotation

Codecov / codecov/patch

pueue/src/client/display/table_builder.rs#L211

Added line #L211 was not covered by tests
TaskStatus::Paused { .. } | TaskStatus::Locked { .. } => {
(status_string, Color::White)

Check warning on line 213 in pueue/src/client/display/table_builder.rs

View check run for this annotation

Codecov / codecov/patch

pueue/src/client/display/table_builder.rs#L213

Added line #L213 was not covered by tests
}
TaskStatus::Done { result, .. } => match result {
TaskResult::Success => (TaskResult::Success.to_string(), Color::Green),
TaskResult::DependencyFailed => {
("Dependency failed".to_string(), Color::Red)
Expand Down
22 changes: 15 additions & 7 deletions pueue/src/client/query/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,15 @@ pub fn datetime(section: Pair<'_, Rule>, query_result: &mut QueryResult) -> Resu
enqueue_at
}
Rule::column_start => {
let Some(start) = task.start else {
let (start, _) = task.start_and_end();
let Some(start) = start else {
return false;
};
start
}
Rule::column_end => {
let Some(end) = task.end else {
let (_, end) = task.start_and_end();
let Some(end) = end else {
return false;
};
end
Expand Down Expand Up @@ -311,16 +313,22 @@ pub fn status(section: Pair<'_, Rule>, query_result: &mut QueryResult) -> Result
// Build the filter function for the task's status.
let filter_function = Box::new(move |task: &Task| -> bool {
let matches = match operand {
Rule::status_queued => matches!(task.status, TaskStatus::Queued),
Rule::status_queued => matches!(task.status, TaskStatus::Queued { .. }),
Rule::status_stashed => matches!(task.status, TaskStatus::Stashed { .. }),
Rule::status_running => matches!(task.status, TaskStatus::Running),
Rule::status_paused => matches!(task.status, TaskStatus::Paused),
Rule::status_running => matches!(task.status, TaskStatus::Running { .. }),
Rule::status_paused => matches!(task.status, TaskStatus::Paused { .. }),
Rule::status_success => {
matches!(&task.status, TaskStatus::Done(TaskResult::Success))
matches!(
&task.status,
TaskStatus::Done {
result: TaskResult::Success,
..
}
)
}
Rule::status_failed => {
let mut matches = false;
if let TaskStatus::Done(result) = &task.status {
if let TaskStatus::Done { result, .. } = &task.status {
if !matches!(result, TaskResult::Success) {
matches = true;
}
Expand Down
22 changes: 15 additions & 7 deletions pueue/src/client/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@
fn rank_status(task: &Task) -> u8 {
match &task.status {
TaskStatus::Stashed { .. } => 0,
TaskStatus::Locked => 1,
TaskStatus::Queued => 2,
TaskStatus::Paused => 3,
TaskStatus::Running => 4,
TaskStatus::Done(result) => match result {
TaskStatus::Locked { .. } => 1,

Check warning on line 80 in pueue/src/client/query/mod.rs

View check run for this annotation

Codecov / codecov/patch

pueue/src/client/query/mod.rs#L80

Added line #L80 was not covered by tests
TaskStatus::Queued { .. } => 2,
TaskStatus::Paused { .. } => 3,

Check warning on line 82 in pueue/src/client/query/mod.rs

View check run for this annotation

Codecov / codecov/patch

pueue/src/client/query/mod.rs#L82

Added line #L82 was not covered by tests
TaskStatus::Running { .. } => 4,
TaskStatus::Done { result, .. } => match result {
TaskResult::Success => 6,
_ => 5,
},
Expand All @@ -93,8 +93,16 @@
Rule::column_label => task1.label.cmp(&task2.label),
Rule::column_command => task1.command.cmp(&task2.command),
Rule::column_path => task1.path.cmp(&task2.path),
Rule::column_start => task1.start.cmp(&task2.start),
Rule::column_end => task1.end.cmp(&task2.end),
Rule::column_start => {
let (start1, _) = task1.start_and_end();
let (start2, _) = task2.start_and_end();
start1.cmp(&start2)

Check warning on line 99 in pueue/src/client/query/mod.rs

View check run for this annotation

Codecov / codecov/patch

pueue/src/client/query/mod.rs#L97-L99

Added lines #L97 - L99 were not covered by tests
}
Rule::column_end => {
let (_, end1) = task1.start_and_end();
let (_, end2) = task2.start_and_end();
end1.cmp(&end2)

Check warning on line 104 in pueue/src/client/query/mod.rs

View check run for this annotation

Codecov / codecov/patch

pueue/src/client/query/mod.rs#L102-L104

Added lines #L102 - L104 were not covered by tests
}
_ => std::cmp::Ordering::Less,
});

Expand Down
9 changes: 5 additions & 4 deletions pueue/src/daemon/callbacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
parameters.insert("group", task.group.clone());

// Result takes the TaskResult Enum strings, unless it didn't finish yet.
if let TaskStatus::Done(result) = &task.status {
if let TaskStatus::Done { result, .. } = &task.status {

Check warning on line 67 in pueue/src/daemon/callbacks.rs

View check run for this annotation

Codecov / codecov/patch

pueue/src/daemon/callbacks.rs#L67

Added line #L67 was not covered by tests
parameters.insert("result", result.to_string());
} else {
parameters.insert("result", "None".into());
Expand All @@ -75,8 +75,9 @@
time.map(|time| time.timestamp().to_string())
.unwrap_or_default()
};
parameters.insert("start", print_time(task.start));
parameters.insert("end", print_time(task.end));
let (start, end) = task.start_and_end();
parameters.insert("start", print_time(start));
parameters.insert("end", print_time(end));

Check warning on line 80 in pueue/src/daemon/callbacks.rs

View check run for this annotation

Codecov / codecov/patch

pueue/src/daemon/callbacks.rs#L78-L80

Added lines #L78 - L80 were not covered by tests

// Read the last lines of the process' output and make it available.
if let Ok(output) = read_last_log_file_lines(
Expand All @@ -95,7 +96,7 @@
parameters.insert("output_path", out_path.display().to_string());

// Get the exit code
if let TaskStatus::Done(result) = &task.status {
if let TaskStatus::Done { result, .. } = &task.status {

Check warning on line 99 in pueue/src/daemon/callbacks.rs

View check run for this annotation

Codecov / codecov/patch

pueue/src/daemon/callbacks.rs#L99

Added line #L99 was not covered by tests
match result {
TaskResult::Success => parameters.insert("exit_code", "0".into()),
TaskResult::Failed(code) => parameters.insert("exit_code", code.to_string()),
Expand Down
9 changes: 4 additions & 5 deletions pueue/src/daemon/network/message_handler/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,19 @@ pub fn add_task(settings: &Settings, state: &SharedState, message: AddMessage) -
message.path,
message.envs,
message.group,
TaskStatus::Locked,
TaskStatus::Queued {
enqueued_at: Local::now(),
},
message.dependencies,
message.priority.unwrap_or(0),
message.label,
);

// Set the starting status.
// Handle if the command is to be stashed and/or automatically enqueued later.
if message.stashed || message.enqueue_at.is_some() {
task.status = TaskStatus::Stashed {
enqueue_at: message.enqueue_at,
};
} else {
task.status = TaskStatus::Queued;
task.enqueued_at = Some(Local::now());
}

// Check if there're any aliases that should be applied.
Expand Down
Loading
Loading