diff --git a/CHANGELOG.md b/CHANGELOG.md index a1e124226..61827aadf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,8 @@ the operation is immediately a part of the written journal. * `hq worker info` can now be used with a selector, to display information about multiple workers at once. * Note that this is a breaking change for the JSON format, as it now outputs the worker infos as an array of objects. Before it was a single object. +* `hq job cancel` now adds new possibility to add a reason to the cancelation of the job using `--reason ` + * This is connected with this information added in `hq job list --verbose` and `hq job info` ### Changes diff --git a/crates/hyperqueue/src/bin/hq.rs b/crates/hyperqueue/src/bin/hq.rs index d7ae56d7f..f1a8607ba 100644 --- a/crates/hyperqueue/src/bin/hq.rs +++ b/crates/hyperqueue/src/bin/hq.rs @@ -99,7 +99,7 @@ async fn command_job_list(gsettings: &GlobalSettings, opts: JobListOpts) -> anyh (opts.filter, false) }; - output_job_list(gsettings, &mut connection, filter, show_open).await + output_job_list(gsettings, &mut connection, filter, show_open, opts.verbose).await } async fn command_job_summary(gsettings: &GlobalSettings) -> anyhow::Result<()> { @@ -128,7 +128,7 @@ async fn command_job_cat(gsettings: &GlobalSettings, opts: JobCatOpts) -> anyhow async fn command_job_cancel(gsettings: &GlobalSettings, opts: JobCancelOpts) -> anyhow::Result<()> { let mut connection = get_client_session(gsettings.server_directory()).await?; - cancel_job(gsettings, &mut connection, opts.selector).await + cancel_job(gsettings, &mut connection, opts.selector, opts.reason).await } async fn command_job_close(gsettings: &GlobalSettings, opts: JobCloseOpts) -> anyhow::Result<()> { diff --git a/crates/hyperqueue/src/client/commands/job.rs b/crates/hyperqueue/src/client/commands/job.rs index 4f5abe8b8..091a6fcdb 100644 --- a/crates/hyperqueue/src/client/commands/job.rs +++ b/crates/hyperqueue/src/client/commands/job.rs @@ -2,6 +2,7 @@ use clap::Parser; use crate::client::globalsettings::GlobalSettings; use crate::client::job::get_worker_map; +use crate::client::output::cli::CANCEL_REASON_MAX_LEN; use crate::client::output::outputs::OutputStream; use crate::client::output::resolve_task_paths; use crate::client::status::{Status, job_status}; @@ -26,6 +27,10 @@ pub struct JobListOpts { /// You can use multiple states separated by a comma. #[arg(long, value_delimiter(','), value_enum)] pub filter: Vec, + + /// Display additional information + #[arg(long)] + pub verbose: bool, } #[derive(Parser)] @@ -35,11 +40,26 @@ pub struct JobInfoOpts { pub selector: IdSelector, } +fn check_max_reason_len(s: &str) -> Result { + if s.len() <= CANCEL_REASON_MAX_LEN { + Ok(s.to_string()) + } else { + Err(format!( + "Cancel reason must be shorter than {} you entered {}.", + CANCEL_REASON_MAX_LEN, + s.len() + )) + } +} + #[derive(Parser)] pub struct JobCancelOpts { /// Select job(s) to cancel #[arg(value_parser = parse_last_all_range)] pub selector: IdSelector, + /// Reason for the cancelation + #[arg(long, value_parser = check_max_reason_len)] + pub reason: Option, } #[derive(Parser)] @@ -123,6 +143,7 @@ pub async fn output_job_list( session: &mut ClientSession, job_filters: Vec, show_open: bool, + verbose: bool, ) -> anyhow::Result<()> { let message = FromClientMessage::JobInfo( JobInfoRequest { @@ -138,12 +159,12 @@ pub async fn output_job_list( if !job_filters.is_empty() { response .jobs - .retain(|j| (show_open && j.is_open) || job_filters.contains(&job_status(j))); + .retain(|j| (show_open && j.is_open()) || job_filters.contains(&job_status(j))); } response.jobs.sort_unstable_by_key(|j| j.id); gsettings .printer() - .print_job_list(response.jobs, total_count); + .print_job_list(response.jobs, total_count, verbose); Ok(()) } @@ -253,9 +274,11 @@ pub async fn cancel_job( _gsettings: &GlobalSettings, session: &mut ClientSession, selector: IdSelector, + reason: Option, ) -> anyhow::Result<()> { let mut responses = rpc_call!(session.connection(), FromClientMessage::Cancel(CancelRequest { selector, + reason, }), ToClientMessage::CancelJobResponse(r) => r) .await?; responses.sort_unstable_by_key(|x| x.0); diff --git a/crates/hyperqueue/src/client/commands/wait.rs b/crates/hyperqueue/src/client/commands/wait.rs index e2671d9c2..d75fc26bb 100644 --- a/crates/hyperqueue/src/client/commands/wait.rs +++ b/crates/hyperqueue/src/client/commands/wait.rs @@ -52,7 +52,7 @@ pub async fn wait_for_jobs( ) -> anyhow::Result<()> { let mut unfinished_jobs = Set::new(); for job in jobs { - if !is_terminated(job) || (wait_for_close && job.is_open) { + if !is_terminated(job) || (wait_for_close && job.is_open()) { unfinished_jobs.insert(job.id); } } diff --git a/crates/hyperqueue/src/client/output/cli.rs b/crates/hyperqueue/src/client/output/cli.rs index 5dc0c92ba..725ffda68 100644 --- a/crates/hyperqueue/src/client/output/cli.rs +++ b/crates/hyperqueue/src/client/output/cli.rs @@ -59,6 +59,7 @@ pub const TASK_COLOR_INVALID: Colorization = Colorization::BrightRed; const TERMINAL_WIDTH: usize = 80; const ERROR_TRUNCATE_LENGTH_LIST: usize = 16; const ERROR_TRUNCATE_LENGTH_INFO: usize = 237; +pub const CANCEL_REASON_MAX_LEN: usize = 200; pub struct CliOutput { color_policy: ColorChoice, @@ -475,15 +476,15 @@ impl Output for CliOutput { } } - fn print_job_list(&self, jobs: Vec, total_jobs: usize) { + fn print_job_list(&self, jobs: Vec, total_jobs: usize, verbose: bool) { let job_count = jobs.len(); let mut has_opened = false; let rows: Vec<_> = jobs .into_iter() .map(|t| { let status = status_to_cell(&job_status(&t)); - vec![ - if t.is_open { + let mut row = vec![ + if t.is_open() { has_opened = true; format!("*{}", t.id).cell() } else { @@ -493,16 +494,25 @@ impl Output for CliOutput { truncate_middle(&t.name, 50).cell(), status, t.n_tasks.cell(), - ] + ]; + if verbose { + row.push(t.cancel_reason.unwrap_or_default().cell()) + } + row }) .collect(); - let header = vec![ + let mut header = vec![ "ID".cell().bold(true), "Name".cell().bold(true), "State".cell().bold(true), "Tasks".cell().bold(true), ]; + + if verbose { + header.push("Cancel Reason".cell().bold(true)); + } + self.print_horizontal_table(rows, header); if has_opened { @@ -560,8 +570,12 @@ impl Output for CliOutput { let state_label = "State".cell().bold(true); rows.push(vec![state_label, status]); + let cancel_reason_label = "Cancel Reason".cell().bold(true); + let cancel_reason = info.cancel_reason.clone().unwrap_or_default().cell(); + rows.push(vec![cancel_reason_label, cancel_reason]); + let state_label = "Session".cell().bold(true); - rows.push(vec![state_label, session_to_cell(info.is_open)]); + rows.push(vec![state_label, session_to_cell(info.is_open())]); let mut n_tasks = info.n_tasks.to_string(); diff --git a/crates/hyperqueue/src/client/output/json.rs b/crates/hyperqueue/src/client/output/json.rs index 49ef291c7..10bb8762c 100644 --- a/crates/hyperqueue/src/client/output/json.rs +++ b/crates/hyperqueue/src/client/output/json.rs @@ -96,7 +96,8 @@ impl Output for JsonOutput { })) } - fn print_job_list(&self, jobs: Vec, _total_jobs: usize) { + fn print_job_list(&self, jobs: Vec, _total_jobs: usize, _verbose: bool) { + // TODO! I think JSON should have all the information, so verbose shouldn't be used here self.print( jobs.into_iter() .map(|info| format_job_info(&info)) @@ -391,6 +392,7 @@ fn format_job_info(info: &JobInfo) -> Value { n_tasks, counters, is_open, + cancel_reason, running_tasks: _, } = info; @@ -405,7 +407,8 @@ fn format_job_info(info: &JobInfo) -> Value { "failed": counters.n_failed_tasks, "canceled": counters.n_canceled_tasks, "waiting": counters.n_waiting_tasks(*n_tasks) - }) + }), + "cancel_reason": cancel_reason, }) } diff --git a/crates/hyperqueue/src/client/output/outputs.rs b/crates/hyperqueue/src/client/output/outputs.rs index e071cdc88..cf7faa59a 100644 --- a/crates/hyperqueue/src/client/output/outputs.rs +++ b/crates/hyperqueue/src/client/output/outputs.rs @@ -45,7 +45,7 @@ pub trait Output { fn print_job_submitted(&self, job: JobDetail); fn print_job_open(&self, job_id: JobId); - fn print_job_list(&self, jobs: Vec, total_jobs: usize); + fn print_job_list(&self, jobs: Vec, total_jobs: usize, verbose: bool); fn print_job_summary(&self, jobs: Vec); fn print_job_detail(&self, jobs: Vec, worker_map: &WorkerMap, server_uid: &str); fn print_job_wait( diff --git a/crates/hyperqueue/src/client/output/quiet.rs b/crates/hyperqueue/src/client/output/quiet.rs index 9bd8d81eb..4e29a3a4a 100644 --- a/crates/hyperqueue/src/client/output/quiet.rs +++ b/crates/hyperqueue/src/client/output/quiet.rs @@ -75,7 +75,8 @@ impl Output for Quiet { println!("{job_id}"); } - fn print_job_list(&self, jobs: Vec, _total_jobs: usize) { + fn print_job_list(&self, jobs: Vec, _total_jobs: usize, _verbose: bool) { + // TODO! How to deal with verbose here -> Is it wanted or not. Quite maybe shoudn't use it for task in jobs { let status = job_status(&task); println!("{} {}", task.id, format_status(&status)) diff --git a/crates/hyperqueue/src/client/status.rs b/crates/hyperqueue/src/client/status.rs index 2419b61b8..94782e0c4 100644 --- a/crates/hyperqueue/src/client/status.rs +++ b/crates/hyperqueue/src/client/status.rs @@ -27,7 +27,7 @@ pub fn job_status(info: &JobInfo) -> Status { Status::Canceled } else { assert_eq!(info.counters.n_finished_tasks, info.n_tasks); - if info.is_open { + if info.is_open() { Status::Opened } else { Status::Finished diff --git a/crates/hyperqueue/src/server/client/mod.rs b/crates/hyperqueue/src/server/client/mod.rs index 7b9745bca..c41142d1d 100644 --- a/crates/hyperqueue/src/server/client/mod.rs +++ b/crates/hyperqueue/src/server/client/mod.rs @@ -271,7 +271,9 @@ pub async fn client_rpc_loop< handle_worker_stop(&state_ref, senders, msg.selector) } FromClientMessage::Cancel(msg) => { - let response = handle_job_cancel(&state_ref, senders, &msg.selector).await; + let response = + handle_job_cancel(&state_ref, senders, &msg.selector, &msg.reason) + .await; if !response.is_error() { senders.events.flush_journal().await; }; @@ -664,6 +666,7 @@ async fn handle_job_cancel( state_ref: &StateRef, senders: &Senders, selector: &IdSelector, + reason: &Option, ) -> ToClientMessage { let job_ids: Vec = match selector { IdSelector::All => state_ref @@ -679,7 +682,7 @@ async fn handle_job_cancel( let mut responses: Vec<(JobId, CancelJobResponse)> = Vec::new(); for job_id in job_ids { - let response = cancel_job(state_ref, senders, job_id).await; + let response = cancel_job(state_ref, senders, job_id, reason).await; responses.push((job_id, response)); } ToClientMessage::CancelJobResponse(responses) @@ -723,8 +726,13 @@ async fn handle_job_close( ToClientMessage::CloseJobResponse(responses) } -async fn cancel_job(state_ref: &StateRef, senders: &Senders, job_id: JobId) -> CancelJobResponse { - let task_ids = match state_ref.get().get_job(job_id) { +async fn cancel_job( + state_ref: &StateRef, + senders: &Senders, + job_id: JobId, + reason: &Option, +) -> CancelJobResponse { + let task_ids = match state_ref.get_mut().get_job_mut(job_id) { None => { return CancelJobResponse::InvalidJob; } @@ -747,6 +755,7 @@ async fn cancel_job(state_ref: &StateRef, senders: &Senders, job_id: JobId) -> C .map(|task_id| task_id.job_task_id()) .collect(); job.set_cancel_state(task_ids, senders); + job.cancel(reason.clone()); CancelJobResponse::Canceled(job_task_ids, already_finished) } else { CancelJobResponse::Canceled(vec![], 0) diff --git a/crates/hyperqueue/src/server/job.rs b/crates/hyperqueue/src/server/job.rs index d85ae8c51..a93d6731c 100644 --- a/crates/hyperqueue/src/server/job.rs +++ b/crates/hyperqueue/src/server/job.rs @@ -143,7 +143,8 @@ pub struct Job { // If true, new tasks may be submitted into this job // If true and all tasks in the job are terminated then the job // is in state OPEN not FINISHED. - is_open: bool, + pub is_open: bool, + pub cancel_reason: Option, pub submission_date: DateTime, pub completion_date: Option>, @@ -156,10 +157,11 @@ impl Job { counters: Default::default(), tasks: Default::default(), job_desc, - submit_descs: Default::default(), is_open, + submit_descs: Default::default(), submission_date: Utc::now(), completion_date: None, + cancel_reason: None, } } @@ -173,6 +175,10 @@ impl Job { senders.events.on_job_closed(self.job_id); } + pub fn cancel(&mut self, reason: Option) { + self.cancel_reason = reason; + } + pub fn max_id(&self) -> Option { self.tasks.keys().max().copied() } @@ -244,6 +250,7 @@ impl Job { n_tasks: self.n_tasks(), counters: self.counters, is_open: self.is_open, + cancel_reason: self.cancel_reason.clone(), running_tasks: if include_running_tasks { self.tasks .iter() @@ -271,7 +278,7 @@ impl Job { } pub fn is_terminated(&self) -> bool { - !self.is_open && self.has_no_active_tasks() + !self.is_open() && self.has_no_active_tasks() } pub fn iter_task_states(&self) -> impl Iterator + '_ { @@ -321,7 +328,7 @@ impl Job { pub fn check_termination(&mut self, senders: &Senders, now: DateTime) { if self.has_no_active_tasks() { - if self.is_open { + if self.is_open() { senders.events.on_job_idle(self.job_id, now); } else { self.completion_date = Some(now); diff --git a/crates/hyperqueue/src/transfer/messages.rs b/crates/hyperqueue/src/transfer/messages.rs index 224060282..89af3f0ff 100644 --- a/crates/hyperqueue/src/transfer/messages.rs +++ b/crates/hyperqueue/src/transfer/messages.rs @@ -290,6 +290,7 @@ pub struct TaskSelector { #[derive(Serialize, Deserialize, Debug)] pub struct CancelRequest { pub selector: IdSelector, + pub reason: Option, } #[derive(Serialize, Deserialize, Debug)] @@ -478,9 +479,16 @@ pub struct JobInfo { pub n_tasks: JobTaskCount, pub counters: JobTaskCounters, pub is_open: bool, + pub cancel_reason: Option, pub running_tasks: Vec, } +impl JobInfo { + pub fn is_open(&self) -> bool { + self.is_open + } +} + #[derive(Serialize, Deserialize, Debug, Clone)] pub struct WorkerExitInfo { pub ended_at: DateTime, diff --git a/docs/jobs/jobs.md b/docs/jobs/jobs.md index bb0417c7e..52b0eba97 100644 --- a/docs/jobs/jobs.md +++ b/docs/jobs/jobs.md @@ -274,11 +274,22 @@ You can prematurely terminate a submitted job that haven't been completed yet by the [`hq job cancel`](cli:hq.job.cancel) command[^1]: ```bash -$ hq job cancel +$ hq job cancel [--reason ] ``` Cancelling a job will cancel all of its tasks that are not yet completed. +When cancelling a job, you can use the `--reason` flag to attach a brief explanation for why the job is being terminated. This is useful for future reference and auditing. + +You can view a job's cancellation reason using either of the following commands: +- `hq job info ` +- `hq job list --verbose` + +Note that: +- The `` is a string, that isn't interpreted by HQ and cannot exceed 200 characters +- If a job is cancelled multiple times, only the reason from the most recent hq job cancel command is saved. +- If a job has no tasks left to cancel when the command is run, the cancellation reason will not be saved. + ## Forgetting jobs If you want to completely forget a job, and thus free up its associated memory, you can do that using @@ -430,7 +441,7 @@ You can display basic job information using [`hq job list`](cli:hq.job.list). ``` === "List all jobs" ```bash - $ hq job list --all + $ hq job list --all [--verbose] ``` === "List jobs by status" You can display only jobs having the selected [states](#job-state) by using the `--filter` flag: diff --git a/tests/README.md b/tests/README.md index 7bec07cea..e38b28bcd 100644 --- a/tests/README.md +++ b/tests/README.md @@ -6,9 +6,17 @@ The following commands are supposed to be executed from the root HyperQueue dire 1) Install `pytest` and other dependencies ```bash - $ python -m pip install tests/requirements.txt + $ python -m venv .env + $ source .env/bin/activate + $ python -m pip install -r tests/requirements.txt ``` -2) Run tests +2) Build and install rust bindings + ```bash + $ cd crates/pyhq + $ maturin develop + $ cd ../.. + ``` +3) Run tests ```bash $ python -m pytest tests ``` diff --git a/tests/output/test_json.py b/tests/output/test_json.py index 25e2c1c27..240e3c99e 100644 --- a/tests/output/test_json.py +++ b/tests/output/test_json.py @@ -146,6 +146,7 @@ def test_print_job_list(hq_env: HqEnv): "task_count": 1, "task_stats": dict, "is_open": bool, + "cancel_reason": None, }, "submits": [ { diff --git a/tests/test_job.py b/tests/test_job.py index 1ef64a8f6..ade9c4ca3 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -436,6 +436,47 @@ def test_cancel_all(hq_env: HqEnv): assert "Job 3 canceled" in r[0] +def test_cancel_with_reason(hq_env: HqEnv): + # First 2 shouldn't be canceled so it shouldn't have any cancelation reason, + # third one is cancelled, so it should have a reason + hq_env.start_server() + hq_env.start_worker(cpus=1) + hq_env.command(["submit", "uname"]) + hq_env.command(["submit", "/invalid"]) + + wait_for_job_state(hq_env, [1, 2], ["FINISHED", "FAILED"]) + + hq_env.command(["submit", "sleep", "100"]) + + hq_env.command(["job", "cancel", "all", "--reason", "Wrong task"]) + + table = list_jobs(hq_env, all=True, verbose=True) + for i, cancel_reason in enumerate(["", "", "Wrong task"]): + table.check_column_value("Cancel Reason", i, cancel_reason) + + +def test_cancel_reason_overwrite(hq_env: HqEnv): + # First job is canceled 2 times, so it should take the cancellation reason from the 2 cancel + hq_env.start_server() + hq_env.start_worker(cpus=1) + + hq_env.command(["job", "open"]) + + hq_env.command(["submit", "--job", "1", "sleep", "100"]) + + hq_env.command(["job", "cancel", "1", "--reason", "Wrong task"]) + + hq_env.command(["submit", "--job", "1", "sleep", "100"]) + + reason_n2 = "Updated task reason" + + hq_env.command(["job", "cancel", "1", "--reason", reason_n2]) + + table = list_jobs(hq_env, all=True, verbose=True) + for i, cancel_reason in enumerate(["Updated task reason"]): + table.check_column_value("Cancel Reason", i, cancel_reason) + + def test_reporting_state_after_worker_lost(hq_env: HqEnv): hq_env.start_server() hq_env.start_workers(2, cpus=1) diff --git a/tests/utils/job.py b/tests/utils/job.py index 58503195c..736d720ec 100644 --- a/tests/utils/job.py +++ b/tests/utils/job.py @@ -10,7 +10,7 @@ def default_task_output(job_id=1, task_id=0, type="stdout", working_dir: Optiona return f"{working_dir}/job-{job_id}/{task_id}.{type}" -def list_jobs(hq_env: HqEnv, all=True, filters: List[str] = None) -> Table: +def list_jobs(hq_env: HqEnv, all=True, filters: List[str] = None, verbose=False) -> Table: args = ["job", "list"] if all: assert filters is None @@ -18,4 +18,7 @@ def list_jobs(hq_env: HqEnv, all=True, filters: List[str] = None) -> Table: elif filters: args.extend(["--filter", ",".join(filters)]) + if verbose: + args.append("--verbose") + return hq_env.command(args, as_table=True)