Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
the operation is immediately a part of the written journal.
* `hq worker info` can now be used with a selector, to display information about multiple workers at once.
* Note that this is a breaking change for the JSON format, as it now outputs the worker infos as an array of objects. Before it was a single object.
* `hq job cancel` now adds new possibility to add a reason to the cancelation of the job using `--reason <cancel-reason>`
* This is connected with this information added in `hq job list --verbose` and `hq job info`

### Changes

Expand Down
4 changes: 2 additions & 2 deletions crates/hyperqueue/src/bin/hq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async fn command_job_list(gsettings: &GlobalSettings, opts: JobListOpts) -> anyh
(opts.filter, false)
};

output_job_list(gsettings, &mut connection, filter, show_open).await
output_job_list(gsettings, &mut connection, filter, show_open, opts.verbose).await
}

async fn command_job_summary(gsettings: &GlobalSettings) -> anyhow::Result<()> {
Expand Down Expand Up @@ -128,7 +128,7 @@ async fn command_job_cat(gsettings: &GlobalSettings, opts: JobCatOpts) -> anyhow

async fn command_job_cancel(gsettings: &GlobalSettings, opts: JobCancelOpts) -> anyhow::Result<()> {
let mut connection = get_client_session(gsettings.server_directory()).await?;
cancel_job(gsettings, &mut connection, opts.selector).await
cancel_job(gsettings, &mut connection, opts.selector, opts.reason).await
}

async fn command_job_close(gsettings: &GlobalSettings, opts: JobCloseOpts) -> anyhow::Result<()> {
Expand Down
27 changes: 25 additions & 2 deletions crates/hyperqueue/src/client/commands/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use clap::Parser;

use crate::client::globalsettings::GlobalSettings;
use crate::client::job::get_worker_map;
use crate::client::output::cli::CANCEL_REASON_MAX_LEN;
use crate::client::output::outputs::OutputStream;
use crate::client::output::resolve_task_paths;
use crate::client::status::{Status, job_status};
Expand All @@ -26,6 +27,10 @@ pub struct JobListOpts {
/// You can use multiple states separated by a comma.
#[arg(long, value_delimiter(','), value_enum)]
pub filter: Vec<Status>,

/// Display additional information <Cancel Reason>
#[arg(long)]
pub verbose: bool,
}

#[derive(Parser)]
Expand All @@ -35,11 +40,26 @@ pub struct JobInfoOpts {
pub selector: IdSelector,
}

fn check_max_reason_len(s: &str) -> Result<String, String> {
if s.len() <= CANCEL_REASON_MAX_LEN {
Ok(s.to_string())
} else {
Err(format!(
"Cancel reason must be shorter than {} you entered {}.",
CANCEL_REASON_MAX_LEN,
s.len()
))
}
}

#[derive(Parser)]
pub struct JobCancelOpts {
/// Select job(s) to cancel
#[arg(value_parser = parse_last_all_range)]
pub selector: IdSelector,
/// Reason for the cancelation
#[arg(long, value_parser = check_max_reason_len)]
pub reason: Option<String>,
}

#[derive(Parser)]
Expand Down Expand Up @@ -123,6 +143,7 @@ pub async fn output_job_list(
session: &mut ClientSession,
job_filters: Vec<Status>,
show_open: bool,
verbose: bool,
) -> anyhow::Result<()> {
let message = FromClientMessage::JobInfo(
JobInfoRequest {
Expand All @@ -138,12 +159,12 @@ pub async fn output_job_list(
if !job_filters.is_empty() {
response
.jobs
.retain(|j| (show_open && j.is_open) || job_filters.contains(&job_status(j)));
.retain(|j| (show_open && j.is_open()) || job_filters.contains(&job_status(j)));
}
response.jobs.sort_unstable_by_key(|j| j.id);
gsettings
.printer()
.print_job_list(response.jobs, total_count);
.print_job_list(response.jobs, total_count, verbose);
Ok(())
}

Expand Down Expand Up @@ -253,9 +274,11 @@ pub async fn cancel_job(
_gsettings: &GlobalSettings,
session: &mut ClientSession,
selector: IdSelector,
reason: Option<String>,
) -> anyhow::Result<()> {
let mut responses = rpc_call!(session.connection(), FromClientMessage::Cancel(CancelRequest {
selector,
reason,
}), ToClientMessage::CancelJobResponse(r) => r)
.await?;
responses.sort_unstable_by_key(|x| x.0);
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/commands/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub async fn wait_for_jobs(
) -> anyhow::Result<()> {
let mut unfinished_jobs = Set::new();
for job in jobs {
if !is_terminated(job) || (wait_for_close && job.is_open) {
if !is_terminated(job) || (wait_for_close && job.is_open()) {
unfinished_jobs.insert(job.id);
}
}
Expand Down
26 changes: 20 additions & 6 deletions crates/hyperqueue/src/client/output/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub const TASK_COLOR_INVALID: Colorization = Colorization::BrightRed;
const TERMINAL_WIDTH: usize = 80;
const ERROR_TRUNCATE_LENGTH_LIST: usize = 16;
const ERROR_TRUNCATE_LENGTH_INFO: usize = 237;
pub const CANCEL_REASON_MAX_LEN: usize = 200;

pub struct CliOutput {
color_policy: ColorChoice,
Expand Down Expand Up @@ -475,15 +476,15 @@ impl Output for CliOutput {
}
}

fn print_job_list(&self, jobs: Vec<JobInfo>, total_jobs: usize) {
fn print_job_list(&self, jobs: Vec<JobInfo>, total_jobs: usize, verbose: bool) {
let job_count = jobs.len();
let mut has_opened = false;
let rows: Vec<_> = jobs
.into_iter()
.map(|t| {
let status = status_to_cell(&job_status(&t));
vec![
if t.is_open {
let mut row = vec![
if t.is_open() {
has_opened = true;
format!("*{}", t.id).cell()
} else {
Expand All @@ -493,16 +494,25 @@ impl Output for CliOutput {
truncate_middle(&t.name, 50).cell(),
status,
t.n_tasks.cell(),
]
];
if verbose {
row.push(t.cancel_reason.unwrap_or_default().cell())
}
row
})
.collect();

let header = vec![
let mut header = vec![
"ID".cell().bold(true),
"Name".cell().bold(true),
"State".cell().bold(true),
"Tasks".cell().bold(true),
];

if verbose {
header.push("Cancel Reason".cell().bold(true));
}

self.print_horizontal_table(rows, header);

if has_opened {
Expand Down Expand Up @@ -560,8 +570,12 @@ impl Output for CliOutput {
let state_label = "State".cell().bold(true);
rows.push(vec![state_label, status]);

let cancel_reason_label = "Cancel Reason".cell().bold(true);
let cancel_reason = info.cancel_reason.clone().unwrap_or_default().cell();
rows.push(vec![cancel_reason_label, cancel_reason]);

let state_label = "Session".cell().bold(true);
rows.push(vec![state_label, session_to_cell(info.is_open)]);
rows.push(vec![state_label, session_to_cell(info.is_open())]);

let mut n_tasks = info.n_tasks.to_string();

Expand Down
7 changes: 5 additions & 2 deletions crates/hyperqueue/src/client/output/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ impl Output for JsonOutput {
}))
}

fn print_job_list(&self, jobs: Vec<JobInfo>, _total_jobs: usize) {
fn print_job_list(&self, jobs: Vec<JobInfo>, _total_jobs: usize, _verbose: bool) {
// TODO! I think JSON should have all the information, so verbose shouldn't be used here
self.print(
jobs.into_iter()
.map(|info| format_job_info(&info))
Expand Down Expand Up @@ -391,6 +392,7 @@ fn format_job_info(info: &JobInfo) -> Value {
n_tasks,
counters,
is_open,
cancel_reason,
running_tasks: _,
} = info;

Expand All @@ -405,7 +407,8 @@ fn format_job_info(info: &JobInfo) -> Value {
"failed": counters.n_failed_tasks,
"canceled": counters.n_canceled_tasks,
"waiting": counters.n_waiting_tasks(*n_tasks)
})
}),
"cancel_reason": cancel_reason,
})
}

Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/output/outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub trait Output {
fn print_job_submitted(&self, job: JobDetail);

fn print_job_open(&self, job_id: JobId);
fn print_job_list(&self, jobs: Vec<JobInfo>, total_jobs: usize);
fn print_job_list(&self, jobs: Vec<JobInfo>, total_jobs: usize, verbose: bool);
fn print_job_summary(&self, jobs: Vec<JobInfo>);
fn print_job_detail(&self, jobs: Vec<JobDetail>, worker_map: &WorkerMap, server_uid: &str);
fn print_job_wait(
Expand Down
3 changes: 2 additions & 1 deletion crates/hyperqueue/src/client/output/quiet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ impl Output for Quiet {
println!("{job_id}");
}

fn print_job_list(&self, jobs: Vec<JobInfo>, _total_jobs: usize) {
fn print_job_list(&self, jobs: Vec<JobInfo>, _total_jobs: usize, _verbose: bool) {
// TODO! How to deal with verbose here -> Is it wanted or not. Quite maybe shoudn't use it
for task in jobs {
let status = job_status(&task);
println!("{} {}", task.id, format_status(&status))
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub fn job_status(info: &JobInfo) -> Status {
Status::Canceled
} else {
assert_eq!(info.counters.n_finished_tasks, info.n_tasks);
if info.is_open {
if info.is_open() {
Status::Opened
} else {
Status::Finished
Expand Down
17 changes: 13 additions & 4 deletions crates/hyperqueue/src/server/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,9 @@ pub async fn client_rpc_loop<
handle_worker_stop(&state_ref, senders, msg.selector)
}
FromClientMessage::Cancel(msg) => {
let response = handle_job_cancel(&state_ref, senders, &msg.selector).await;
let response =
handle_job_cancel(&state_ref, senders, &msg.selector, &msg.reason)
.await;
if !response.is_error() {
senders.events.flush_journal().await;
};
Expand Down Expand Up @@ -664,6 +666,7 @@ async fn handle_job_cancel(
state_ref: &StateRef,
senders: &Senders,
selector: &IdSelector,
reason: &Option<String>,
) -> ToClientMessage {
let job_ids: Vec<JobId> = match selector {
IdSelector::All => state_ref
Expand All @@ -679,7 +682,7 @@ async fn handle_job_cancel(

let mut responses: Vec<(JobId, CancelJobResponse)> = Vec::new();
for job_id in job_ids {
let response = cancel_job(state_ref, senders, job_id).await;
let response = cancel_job(state_ref, senders, job_id, reason).await;
responses.push((job_id, response));
}
ToClientMessage::CancelJobResponse(responses)
Expand Down Expand Up @@ -723,8 +726,13 @@ async fn handle_job_close(
ToClientMessage::CloseJobResponse(responses)
}

async fn cancel_job(state_ref: &StateRef, senders: &Senders, job_id: JobId) -> CancelJobResponse {
let task_ids = match state_ref.get().get_job(job_id) {
async fn cancel_job(
state_ref: &StateRef,
senders: &Senders,
job_id: JobId,
reason: &Option<String>,
) -> CancelJobResponse {
let task_ids = match state_ref.get_mut().get_job_mut(job_id) {
None => {
return CancelJobResponse::InvalidJob;
}
Expand All @@ -747,6 +755,7 @@ async fn cancel_job(state_ref: &StateRef, senders: &Senders, job_id: JobId) -> C
.map(|task_id| task_id.job_task_id())
.collect();
job.set_cancel_state(task_ids, senders);
job.cancel(reason.clone());
CancelJobResponse::Canceled(job_task_ids, already_finished)
} else {
CancelJobResponse::Canceled(vec![], 0)
Expand Down
15 changes: 11 additions & 4 deletions crates/hyperqueue/src/server/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ pub struct Job {
// If true, new tasks may be submitted into this job
// If true and all tasks in the job are terminated then the job
// is in state OPEN not FINISHED.
is_open: bool,
pub is_open: bool,
pub cancel_reason: Option<String>,

pub submission_date: DateTime<Utc>,
pub completion_date: Option<DateTime<Utc>>,
Expand All @@ -156,10 +157,11 @@ impl Job {
counters: Default::default(),
tasks: Default::default(),
job_desc,
submit_descs: Default::default(),
is_open,
submit_descs: Default::default(),
submission_date: Utc::now(),
completion_date: None,
cancel_reason: None,
}
}

Expand All @@ -173,6 +175,10 @@ impl Job {
senders.events.on_job_closed(self.job_id);
}

pub fn cancel(&mut self, reason: Option<String>) {
self.cancel_reason = reason;
}

pub fn max_id(&self) -> Option<JobTaskId> {
self.tasks.keys().max().copied()
}
Expand Down Expand Up @@ -244,6 +250,7 @@ impl Job {
n_tasks: self.n_tasks(),
counters: self.counters,
is_open: self.is_open,
cancel_reason: self.cancel_reason.clone(),
running_tasks: if include_running_tasks {
self.tasks
.iter()
Expand Down Expand Up @@ -271,7 +278,7 @@ impl Job {
}

pub fn is_terminated(&self) -> bool {
!self.is_open && self.has_no_active_tasks()
!self.is_open() && self.has_no_active_tasks()
}

pub fn iter_task_states(&self) -> impl Iterator<Item = (JobTaskId, &JobTaskState)> + '_ {
Expand Down Expand Up @@ -321,7 +328,7 @@ impl Job {

pub fn check_termination(&mut self, senders: &Senders, now: DateTime<Utc>) {
if self.has_no_active_tasks() {
if self.is_open {
if self.is_open() {
senders.events.on_job_idle(self.job_id, now);
} else {
self.completion_date = Some(now);
Expand Down
8 changes: 8 additions & 0 deletions crates/hyperqueue/src/transfer/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ pub struct TaskSelector {
#[derive(Serialize, Deserialize, Debug)]
pub struct CancelRequest {
pub selector: IdSelector,
pub reason: Option<String>,
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -478,9 +479,16 @@ pub struct JobInfo {
pub n_tasks: JobTaskCount,
pub counters: JobTaskCounters,
pub is_open: bool,
pub cancel_reason: Option<String>,
pub running_tasks: Vec<JobTaskId>,
}

impl JobInfo {
pub fn is_open(&self) -> bool {
self.is_open
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct WorkerExitInfo {
pub ended_at: DateTime<Utc>,
Expand Down
Loading
Loading