From a31d0b78b198372fdef28db83f8cdb78ed996f5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20C=CC=8Cech?= Date: Fri, 16 Aug 2019 21:24:18 +0200 Subject: [PATCH 1/2] Add --detach option (issue #11) --- Cargo.lock | 9 ++ src/evaluator.rs | 89 ++++++++++++++++++ src/main.rs | 228 +++++++++++++++++++++++----------------------- src/supervisor.rs | 78 ++++++++++++++++ 4 files changed, 292 insertions(+), 112 deletions(-) create mode 100644 src/evaluator.rs create mode 100644 src/supervisor.rs diff --git a/Cargo.lock b/Cargo.lock index 311cb0d..76efffd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -39,6 +39,15 @@ dependencies = [ "ppv-lite86 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "c2-chacha" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "ppv-lite86 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "cfg-if" version = "0.1.9" diff --git a/src/evaluator.rs b/src/evaluator.rs new file mode 100644 index 0000000..fd31709 --- /dev/null +++ b/src/evaluator.rs @@ -0,0 +1,89 @@ +use subprocess::{Exec, ExitStatus, CaptureData}; +use crate::ErrorCode; +use crate::Opt; + +pub struct Response{ + pub options: Opt, + pub result: CaptureData, + pub last_result: Option, +} + +impl Response{ + pub fn should_stop(&self) -> bool { + let result_checks = [ + stop_if_fail, + stop_if_success, + stop_if_contains, + stop_if_match, + stop_if_error + ]; + + result_checks.iter().any(|&fun| fun(&self.result, &self.options)) + || should_stop_on_comparison(&self.options, &self.last_result.as_ref(), &self.result) + } +} + +fn stop_if_contains(result: &CaptureData, options: &Opt) -> bool{ + // --until-contains + let result_str = result.stdout_str(); + if let Some(string) = &options.until_contains { + result_str.contains(string) + } else { + false + } +} +fn stop_if_match(result: &CaptureData, options: &Opt) -> bool{ + // --until-match + let result_str = result.stdout_str(); + if let Some(regex) = &options.until_match { + regex.captures(&result_str).is_some() + } else { + false + } +} +fn stop_if_error(result: &CaptureData, options: &Opt) -> bool{ + if let Some(error_code) = &options.until_error { + match error_code { + ErrorCode::Any => !result.exit_status.success(), + ErrorCode::Code(code) => result.exit_status == ExitStatus::Exited(*code) + } + } else { + false + } +} + +fn stop_if_success(result: &CaptureData, options: &Opt) -> bool{ + options.until_success && result.exit_status.success() +} +fn stop_if_fail(result: &CaptureData, options: &Opt) -> bool{ + options.until_fail && !(result.exit_status.success()) +} + +fn is_equivalent(previous: &Option<&CaptureData>, current: &CaptureData) -> bool{ + if let Some(prev) = previous{ + prev.stdout == current.stdout + } else { + false + } +} + +fn is_differrent(previous: &Option<&CaptureData>, current: &CaptureData) -> bool{ + if let Some(prev) = previous{ + prev.stdout != current.stdout + } else { + false + } +} + +pub fn should_stop_on_comparison(options: &Opt, previous: &Option<&CaptureData>, current: &CaptureData) -> bool { + + (options.until_same && is_equivalent(&previous.clone(), ¤t.clone())) + || (options.until_changes && is_differrent(&previous.clone(), ¤t.clone())) +} + +pub fn execute(opt: Opt) -> (Opt, CaptureData) { + let result = Exec::shell(opt.input.join("")) + .capture().unwrap(); + (opt, result) +} + diff --git a/src/main.rs b/src/main.rs index fda7771..5bf4c4c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,20 +8,26 @@ extern crate tempfile; use std::env; use std::f64; use std::io::prelude::*; -use std::io::{self, BufRead, SeekFrom}; +use std::io::{self, BufRead}; +use std::io::{stdout, stderr}; use std::process; -use std::thread; +use std::thread::{self, JoinHandle}; +use std::sync::mpsc::{Sender, Receiver, channel}; use std::time::{Duration, Instant, SystemTime}; +use subprocess::{ExitStatus, CaptureData}; use humantime::{parse_duration, parse_rfc3339_weak}; use regex::Regex; -use subprocess::{Exec, ExitStatus, Redirection}; use structopt::StructOpt; -static UNKONWN_EXIT_CODE: u32 = 99; +mod supervisor; +mod evaluator; + +use evaluator::{execute, Response}; // same exit code as use of `timeout` shell command static TIMEOUT_EXIT_CODE: i32 = 124; +static UNKONWN_EXIT_CODE: u32 = 99; fn main() { @@ -39,7 +45,7 @@ fn main() { let program_start = Instant::now(); // Number of iterations - let mut items = if let Some(items) = opt.ffor { items.clone() } else { vec![] }; + let mut items = if let Some(ref items) = opt.ffor { items.clone() } else { vec![] }; // Get any lines from stdin if opt.stdin || atty::isnt(atty::Stream::Stdin) { @@ -63,10 +69,6 @@ fn main() { } else { f64::INFINITY }; - let mut has_matched = false; - let mut tmpfile = tempfile::tempfile().unwrap(); - let mut summary = Summary { successes: 0, failures: Vec::new() }; - let mut previous_stdout = None; let counter = Counter { start: opt.offset - opt.count_by, @@ -74,6 +76,28 @@ fn main() { end: num, step_by: opt.count_by }; + + let mut detach_data = if opt.detach { + let (requests_in, requests_out) = channel(); + let (responses_in, responses_out) = channel(); + let supervisor_thread = thread::spawn(move || { + supervisor::supervisor(requests_out, responses_in); + }); + Some(DetachData{ + supervisor: supervisor_thread, + requests_in: requests_in, + responses_out: responses_out + }) + } else { + None + }; + + let mut app_state = AppState{ + options: &opt, + summary: Summary { successes: 0, failures: Vec::new() }, + previous_result: None, + }; + for (count, actual_count) in counter.enumerate() { // Time Start @@ -89,6 +113,16 @@ fn main() { env::set_var("ITEM", item); } + // Finish if we have result from detached thread + if let Some(ref detach_data) = detach_data{ + if let Ok(response) = detach_data.responses_out.try_recv(){ + app_state = handle_output(&response, app_state); + if response.should_stop(){ + break; + } + } + } + // Finish if we're over our duration if let Some(duration) = opt.for_duration { let since = Instant::now().duration_since(program_start); @@ -110,94 +144,26 @@ fn main() { } // Main executor - tmpfile.seek(SeekFrom::Start(0)).ok(); - tmpfile.set_len(0).ok(); - let result = Exec::shell(joined_input) - .stdout(Redirection::File(tmpfile.try_clone().unwrap())) - .stderr(Redirection::Merge) - .capture().unwrap(); - - // Print the results - let mut stdout = String::new(); - tmpfile.seek(SeekFrom::Start(0)).ok(); - tmpfile.read_to_string(&mut stdout).ok(); - for line in stdout.lines() { - // --only-last - // If we only want output from the last execution, - // defer printing until later - if !opt.only_last { - println!("{}", line); - } - - // --until-contains - // We defer loop breaking until the entire result is printed. - if let Some(string) = &opt.until_contains { - if line.contains(string){ - has_matched = true; - } - } - - // --until-match - if let Some(regex) = &opt.until_match { - if regex.captures(&line).is_some() { - has_matched = true; - } - } - } - - // --until-error - if let Some(error_code) = &opt.until_error { - match error_code { - ErrorCode::Any => if !result.exit_status.success() { - has_matched = true; - }, - ErrorCode::Code(code) => { - if result.exit_status == ExitStatus::Exited(*code) { - has_matched = true; - } - } - } - } - - // --until-success - if opt.until_success && result.exit_status.success() { - has_matched = true; - } - - // --until-fail - if opt.until_fail && !(result.exit_status.success()) { - has_matched = true; - } - - if opt.summary { - match result.exit_status { - ExitStatus::Exited(0) => summary.successes += 1, - ExitStatus::Exited(n) => summary.failures.push(n), - _ => summary.failures.push(UNKONWN_EXIT_CODE), - } - } - - // Finish if we matched - if has_matched { - break; - } - - if let Some(ref previous_stdout) = previous_stdout { - // --until-changes - if opt.until_changes { - if *previous_stdout != stdout { - break; - } - } - - // --until-same - if opt.until_same { - if *previous_stdout == stdout { - break; + if opt.detach{ + let th_opt = opt.clone(); + let dd = detach_data.as_mut().unwrap(); + dd.requests_in.send((count, th_opt)).unwrap(); + } else { + let (opt, result) = execute(opt.clone()); + + // Finish if we matched + let response = Response{ + options: opt, + result: supervisor::clone_data(&result), + last_result: match app_state.previous_result{ + Some(ref lr) => Some(supervisor::clone_data(&lr)), + None => None, } + }; + app_state = handle_output(&response, app_state); + if response.should_stop(){ + break; } - } else { - previous_stdout = Some(stdout); } // Delay until next iteration time @@ -208,24 +174,50 @@ fn main() { } if opt.only_last { - let mut stdout = String::new(); - tmpfile.seek(SeekFrom::Start(0)).ok(); - tmpfile.read_to_string(&mut stdout).ok(); - for line in stdout.lines() { - println!("{}", line); + if let Some(ref previous_result) = &app_state.previous_result { + stdout().write_all(&previous_result.stdout).unwrap(); + stderr().write_all(&previous_result.stderr).unwrap(); } } if opt.summary { - summary.print() + app_state.summary.print() + } + if let Some(detach_data) = detach_data{ + detach_data.supervisor.join().unwrap(); } process::exit(exit_status); } -#[derive(StructOpt, Debug)] +fn handle_output<'a>(response: &Response, mut state: AppState<'a>) -> AppState<'a> { + if !state.options.only_last { + stdout().write_all(&response.result.stdout).unwrap(); + stderr().write_all(&response.result.stderr).unwrap(); + } + if state.options.summary { + match response.result.exit_status { + ExitStatus::Exited(0) => state.summary.successes += 1, + ExitStatus::Exited(n) => state.summary.failures.push(n), + _ => state.summary.failures.push(UNKONWN_EXIT_CODE), + } + } + AppState{ + previous_result: Some(supervisor::clone_data(&response.result)), + ..state + } +} + +struct AppState <'a>{ + options: &'a Opt, + summary: Summary, + previous_result: Option, +} + + +#[derive(StructOpt, Debug, Clone)] #[structopt(name = "loop", author = "Rich Jones ", about = "UNIX's missing `loop` command")] -struct Opt { +pub struct Opt { /// Number of iterations to execute #[structopt(short = "n", long = "num")] num: Option, @@ -299,6 +291,10 @@ struct Opt { #[structopt(long = "summary")] summary: bool, + /// Do not be blocked by the running command + #[structopt(long = "detach")] + detach: bool, + /// The command to be looped #[structopt(raw(multiple="true"))] input: Vec @@ -318,13 +314,13 @@ fn precision_of(s: &str) -> usize { exp - after_point } -#[derive(Debug)] -enum ErrorCode { +#[derive(Debug, Clone)] +pub enum ErrorCode { Any, Code(u32), } -fn get_error_code(input: &str) -> ErrorCode { +pub fn get_error_code(input: &str) -> ErrorCode { if let Ok(code) = input.parse::() { ErrorCode::Code(code) } else { @@ -350,22 +346,30 @@ struct Counter { } #[derive(Debug)] -struct Summary { +struct DetachData { + supervisor: JoinHandle<()>, + requests_in: Sender<(usize, Opt)>, + responses_out: Receiver +} + +#[derive(Debug)] +pub struct Summary { successes: u32, failures: Vec } impl Summary { - fn print(self) { + fn print(&self) { let total = self.successes + self.failures.len() as u32; let errors = if self.failures.is_empty() { String::from("0") } else { - format!("{} ({})", self.failures.len(), self.failures.into_iter() - .map(|f| (-(f as i32)).to_string()) - .collect::>() - .join(", ")) + format!("{} (TODO)", self.failures.len()) + // format!("{} ({})", self.failures.len(), self.failures.into_iter() + // .map(|f| (-(f as i32)).to_string()) + // .collect::>() + // .join(", ")) }; println!("Total runs:\t{}", total); diff --git a/src/supervisor.rs b/src/supervisor.rs new file mode 100644 index 0000000..5915a6d --- /dev/null +++ b/src/supervisor.rs @@ -0,0 +1,78 @@ +use std::sync::mpsc::{self, Sender, Receiver, TryRecvError}; +use std::thread; +use std::collections::HashMap; +use subprocess::CaptureData; +pub use crate::Opt; +pub use crate::evaluator::{execute, Response}; + + +struct Task{ + thread_index: usize, + options: Opt, + result: CaptureData, +} + +pub fn clone_data(data: &CaptureData) -> CaptureData{ + match data{ + CaptureData{exit_status, stdout: std_out, stderr: std_err} =>{ + CaptureData{exit_status: *exit_status, stdout: std_out.to_vec(), stderr: std_err.to_vec()} + } + } +} + +pub fn supervisor(requests: Receiver<(usize, Opt)>, responses: Sender) -> (){ + let mut tasks = HashMap::new(); + let (task_finished_sender, task_finished_receiver) = mpsc::channel(); + let mut last_result: Option = None; + loop { + match requests.try_recv(){ + Ok((index, opt)) => { + let finished = task_finished_sender.clone(); + let handle = thread::spawn(move ||{ + let (opt, result) = execute(opt); + let task = Task{ + thread_index: index, + options: opt, + result: result + }; + finished.send(task).unwrap(); + }); + tasks.insert(index, handle); + }, + Err(TryRecvError::Empty) =>(), + Err(TryRecvError::Disconnected) =>{ + panic!("thread disconected (requests)"); + }, + }; + + match task_finished_receiver.try_recv(){ + Ok(Task{ + thread_index, + options, + result, + }) => { + let response = Response{ + options: options, + result: clone_data(&result), + last_result: last_result, + }; + responses.send(response).unwrap(); + last_result = Some(result); + match tasks.remove(&thread_index) { + Some(handle) => { + handle.join().unwrap(); + () + }, + None => (), + }; + if tasks.is_empty(){ + break; + } + }, + Err(TryRecvError::Empty) =>(), + Err(TryRecvError::Disconnected) =>{ + panic!("thread disconected (finished_tasks)"); + }, + }; + } +} From 62746f37dce3035e2f357222ef1cfad4355b0a85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20C=CC=8Cech?= Date: Wed, 7 Oct 2020 11:57:55 +0200 Subject: [PATCH 2/2] Document the --detach option in README --- Cargo.lock | 9 --------- README.md | 20 ++++++++++++++++++++ 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 76efffd..311cb0d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -39,15 +39,6 @@ dependencies = [ "ppv-lite86 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "c2-chacha" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", - "ppv-lite86 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "cfg-if" version = "0.1.9" diff --git a/README.md b/README.md index 117ddd5..4e29474 100644 --- a/README.md +++ b/README.md @@ -213,6 +213,26 @@ Or until a certain date/time with `--until-time`: Fri May 25 20:49:59 UTC 2018 $ +It is possible to start commands regardles of whether a previous run had already ended by providing `--detach` option: + + $ loop --every 2s --detach -- date;sleep 5; + Wed Oct 7 10:55:45 CEST 2020 + Wed Oct 7 10:55:47 CEST 2020 + Wed Oct 7 10:55:49 CEST 2020 + Wed Oct 7 10:55:51 CEST 2020 + Wed Oct 7 10:55:53 CEST 2020 + Wed Oct 7 10:55:55 CEST 2020 + +Without the `--detach` option it would print: + + $ loop --every 2s -- date;sleep 5; + Wed Oct 7 10:56:09 CEST 2020 + Wed Oct 7 10:56:14 CEST 2020 + Wed Oct 7 10:56:19 CEST 2020 + Wed Oct 7 10:56:24 CEST 2020 + Wed Oct 7 10:56:29 CEST 2020 + Wed Oct 7 10:56:34 CEST 2020 + ### Until Conditions `loop` can iterate until output contains a string with `--until-contains`: