diff --git a/src/command_buffers/buffer.rs b/src/command_buffers/buffer.rs index 15b08a9..f75f902 100644 --- a/src/command_buffers/buffer.rs +++ b/src/command_buffers/buffer.rs @@ -1,21 +1,21 @@ +use crate::block::Signal; +use crate::sync::{Fence, Semaphore, SemaphoreUsage}; +use crate::task::{Task, TaskController, TaskHandle, TaskState}; +use crate::MVSynced; +use mvutils::utils::next_id; +use mvutils::{id_eq, sealable}; use std::alloc::{alloc_zeroed, dealloc, Layout}; use std::error::Error; use std::fmt::{Debug, Display, Formatter}; use std::future::Future; use std::sync::{Arc, RwLock}; -use mvutils::{id_eq, sealable}; -use mvutils::utils::next_id; -use crate::block::Signal; -use crate::MVSynced; -use crate::sync::{Fence, Semaphore, SemaphoreUsage}; -use crate::task::{Task, TaskController, TaskHandle, TaskState}; sealable!(); struct TaskChain { id: u64, tasks: Vec, - controllers: Vec + controllers: Vec, } id_eq!(TaskChain); @@ -25,7 +25,7 @@ impl TaskChain { TaskChain { id: next_id("MVSync"), tasks: Vec::new(), - controllers: Vec::new() + controllers: Vec::new(), } } @@ -63,11 +63,11 @@ impl TaskChain { self.tasks[i - 1].bind_semaphore(semaphore.clone(), SemaphoreUsage::Signal); self.tasks[i].bind_semaphore(semaphore.clone(), SemaphoreUsage::Wait); } - self.tasks.drain(..).collect() + std::mem::take(&mut self.tasks) } fn get_controllers(&mut self) -> Vec { - self.controllers.drain(..).collect() + std::mem::take(&mut self.controllers) } } @@ -76,11 +76,14 @@ struct RawCommandBuffer { chain_links: Vec<(usize, usize)>, fences: Vec<(usize, Arc)>, group: usize, - signal: Arc + signal: Arc, } impl RawCommandBuffer { - fn add_sync_task(&mut self, function: impl FnOnce() -> T + Send + 'static) -> TaskHandle { + fn add_sync_task( + &mut self, + function: impl FnOnce() -> T + Send + 'static, + ) -> TaskHandle { if !self.tasks[self.group].tasks.is_empty() { self.group += 1; self.tasks.push(TaskChain::new()); @@ -94,7 +97,10 @@ impl RawCommandBuffer { result } - fn add_task + Send>(&mut self, function: impl FnOnce() -> F + Send + 'static) -> TaskHandle { + fn add_task + Send>( + &mut self, + function: impl FnOnce() -> F + Send + 'static, + ) -> TaskHandle { if !self.tasks[self.group].tasks.is_empty() { self.group += 1; self.tasks.push(TaskChain::new()); @@ -108,22 +114,42 @@ impl RawCommandBuffer { result } - fn add_chained_task + Send>(&mut self, function: impl FnOnce(T) -> F + Send + 'static, predecessor: TaskHandle) -> TaskHandle { + fn add_chained_task + Send>( + &mut self, + function: impl FnOnce(T) -> F + Send + 'static, + predecessor: TaskHandle, + ) -> TaskHandle { let buffer = Arc::new(RwLock::new(None)); let state = Arc::new(RwLock::new(TaskState::Pending)); let signal = Arc::new(Signal::new()); let result = TaskHandle::new(buffer.clone(), state.clone(), signal.clone()); - let task = Task::from_async_continuation(function, buffer, state, [signal, self.signal.clone()], predecessor); + let task = Task::from_async_continuation( + function, + buffer, + state, + [signal, self.signal.clone()], + predecessor, + ); self.tasks[self.group].add_task(task, result.make_controller()); result } - fn add_sync_chained_task(&mut self, function: impl FnOnce(T) -> R + Send + 'static, predecessor: TaskHandle) -> TaskHandle { + fn add_sync_chained_task( + &mut self, + function: impl FnOnce(T) -> R + Send + 'static, + predecessor: TaskHandle, + ) -> TaskHandle { let buffer = Arc::new(RwLock::new(None)); let state = Arc::new(RwLock::new(TaskState::Pending)); let signal = Arc::new(Signal::new()); let result = TaskHandle::new(buffer.clone(), state.clone(), signal.clone()); - let task = Task::from_continuation(function, buffer, state, [signal, self.signal.clone()], predecessor); + let task = Task::from_continuation( + function, + buffer, + state, + [signal, self.signal.clone()], + predecessor, + ); self.tasks[self.group].add_task(task, result.make_controller()); result } @@ -132,29 +158,30 @@ impl RawCommandBuffer { self.tasks[self.group].get_controllers() } - fn finish(&self) -> Vec { - unsafe { - let this = (self as *const RawCommandBuffer).cast_mut().as_mut().unwrap(); - for (i, fence) in this.fences.drain(..) { - if i < this.tasks.len() { - this.tasks[i].ending_fence(fence); - } - else { - panic!("Fence index '{}' out of bounds", i); - } + fn finish(&mut self) -> Vec { + for (i, fence) in self.fences.drain(..) { + if i < self.tasks.len() { + self.tasks[i].ending_fence(fence); + } else { + panic!("Fence index '{}' out of bounds", i); } - for (e, s) in this.chain_links.drain(..) { - if e < this.tasks.len() && s < this.tasks.len() { - let semaphore = Arc::new(Semaphore::new()); - this.tasks[e].ending_semaphore(semaphore.clone()); - this.tasks[s].starting_semaphore(semaphore); - } - else { - panic!("Semaphore index '{}' or '{}' out of bounds", s, e); - } + } + + for (e, s) in self.chain_links.drain(..) { + if e < self.tasks.len() && s < self.tasks.len() { + let semaphore = Arc::new(Semaphore::new()); + self.tasks[e].ending_semaphore(semaphore.clone()); + self.tasks[s].starting_semaphore(semaphore); + } else { + panic!("Semaphore index '{}' or '{}' out of bounds", s, e); } - this.tasks.drain(..).flat_map(|mut chain| chain.chain_semaphores()).collect() } + + let mut tasks = Vec::with_capacity(self.tasks.iter().map(|chain| chain.tasks.len()).sum()); + for mut chain in self.tasks.drain(..) { + tasks.extend(chain.chain_semaphores()); + } + tasks } } @@ -197,43 +224,55 @@ impl CommandBuffer { chain_links: Vec::new(), fences: Vec::new(), group: 0, - signal + signal, }); refs.write(1); - Ok( - CommandBuffer { - id: next_id("MVSync"), - ptr, - refs, - baked: None - } - ) + Ok(CommandBuffer { + id: next_id("MVSync"), + ptr, + refs, + baked: None, + }) } } - fn chain_task + Send>(&self, function: impl FnOnce(T) -> F + Send + 'static, predecessor: TaskHandle) -> TaskHandle { + fn chain_task + Send>( + &self, + function: impl FnOnce(T) -> F + Send + 'static, + predecessor: TaskHandle, + ) -> TaskHandle { unsafe { - self.ptr.as_mut().unwrap().add_chained_task(function, predecessor) + self.ptr + .as_mut() + .unwrap() + .add_chained_task(function, predecessor) } } - fn chain_sync_task(&self, function: impl FnOnce(T) -> R + Send + 'static, predecessor: TaskHandle) -> TaskHandle { + fn chain_sync_task( + &self, + function: impl FnOnce(T) -> R + Send + 'static, + predecessor: TaskHandle, + ) -> TaskHandle { unsafe { - self.ptr.as_mut().unwrap().add_sync_chained_task(function, predecessor) + self.ptr + .as_mut() + .unwrap() + .add_sync_chained_task(function, predecessor) } } fn get_controllers(&self) -> Vec { - unsafe { - self.ptr.as_mut().unwrap().get_controllers() - } + unsafe { self.ptr.as_mut().unwrap().get_controllers() } } pub(crate) fn tasks(self) -> Vec { unsafe { let this = (&self as *const CommandBuffer).cast_mut().as_mut().unwrap(); - this.baked.take().expect("Command buffer has now been baked!") + this.baked + .take() + .expect("Command buffer has now been baked!") } } @@ -275,7 +314,11 @@ impl CommandBuffer { panic!("You cannot modify a baked command buffer!"); } unsafe { - self.ptr.as_mut().unwrap().chain_links.push((dependency, successor)); + self.ptr + .as_mut() + .unwrap() + .chain_links + .push((dependency, successor)); } } @@ -314,7 +357,7 @@ impl Clone for CommandBuffer { id: self.id, ptr: self.ptr, refs: self.refs, - baked: None + baked: None, } } } @@ -339,16 +382,25 @@ sealed!( /// Add a command and continue the command chain, returning the result of the command wrapped in /// a buffered command. Do not call this function directly unless you are defining you own commands. /// This is the same as [`add_command`], however it takes in a synchronous function instead. - fn add_sync_command(&self, function: impl FnOnce() -> T + Send + 'static) -> BufferedCommand; + fn add_sync_command( + &self, + function: impl FnOnce() -> T + Send + 'static, + ) -> BufferedCommand; /// Add a command and continue the command chain, returning the result of the command wrapped in /// a buffered command. Do not call this function directly unless you are defining you own commands. - fn add_command + Send>(&self, function: impl FnOnce() -> F + Send + 'static) -> BufferedCommand; + fn add_command + Send>( + &self, + function: impl FnOnce() -> F + Send + 'static, + ) -> BufferedCommand; } ); impl CommandBufferEntry for CommandBuffer { - fn add_sync_command(&self, function: impl FnOnce() -> T + Send + 'static) -> BufferedCommand { + fn add_sync_command( + &self, + function: impl FnOnce() -> T + Send + 'static, + ) -> BufferedCommand { if self.baked.is_some() { panic!("You cannot modify a baked command buffer!"); } @@ -357,7 +409,10 @@ impl CommandBufferEntry for CommandBuffer { }) } - fn add_command + Send>(&self, function: impl FnOnce() -> F + Send + 'static) -> BufferedCommand { + fn add_command + Send>( + &self, + function: impl FnOnce() -> F + Send + 'static, + ) -> BufferedCommand { if self.baked.is_some() { panic!("You cannot modify a baked command buffer!"); } @@ -433,12 +488,18 @@ pub trait Command: Sized + Sealed { /// Add a command and continue the command chain, returning the result of the command wrapped in /// a buffered command. Do not call this function directly unless you are defining you own commands. - fn add_command + Send>(self, function: impl FnOnce(T) -> F + Send + 'static) -> BufferedCommand; + fn add_command + Send>( + self, + function: impl FnOnce(T) -> F + Send + 'static, + ) -> BufferedCommand; /// Add a command and continue the command chain, returning the result of the command wrapped in /// a buffered command. Do not call this function directly unless you are defining you own commands. /// This is the same as [`add_command`], however it takes in a synchronous function instead. - fn add_sync_command(self, function: impl FnOnce(T) -> R + Send + 'static) -> BufferedCommand; + fn add_sync_command( + self, + function: impl FnOnce(T) -> R + Send + 'static, + ) -> BufferedCommand; } /// A buffered command. This is returned when you add a command to a command buffer, and can be @@ -453,10 +514,7 @@ impl Sealed for BufferedCommand {} impl BufferedCommand { fn new(parent: CommandBuffer, response: TaskHandle) -> Self { - BufferedCommand { - parent, - response - } + BufferedCommand { parent, response } } } @@ -470,15 +528,21 @@ impl Command for BufferedCommand { (self.response, controllers) } - fn add_command + Send>(self, function: impl FnOnce(T) -> F + Send + 'static) -> BufferedCommand { + fn add_command + Send>( + self, + function: impl FnOnce(T) -> F + Send + 'static, + ) -> BufferedCommand { let parent = self.parent().clone(); let response = parent.chain_task(function, self.response); BufferedCommand::new(parent, response) } - fn add_sync_command(self, function: impl FnOnce(T) -> R + Send + 'static) -> BufferedCommand { + fn add_sync_command( + self, + function: impl FnOnce(T) -> R + Send + 'static, + ) -> BufferedCommand { let parent = self.parent().clone(); let response = parent.chain_sync_task(function, self.response); BufferedCommand::new(parent, response) } -} \ No newline at end of file +} diff --git a/src/command_buffers/commands.rs b/src/command_buffers/commands.rs index 28c9aa8..0a375e8 100644 --- a/src/command_buffers/commands.rs +++ b/src/command_buffers/commands.rs @@ -1,6 +1,6 @@ -use std::fmt::{Debug, Display}; use crate::command_buffers::buffer::{BufferedCommand, Command}; use crate::MVSynced; +use std::fmt::{Debug, Display}; pub trait End: Command { /// Ends the command chain by consuming the last return value and doing nothing with it. This @@ -57,6 +57,20 @@ pub trait Dbg: Command { impl> Dbg for C {} +pub trait Inspect: Command { + /// Runs a side-effect function with a shared reference to the current value and returns the + /// original value unchanged. This is useful for logging, metrics, or validation in the middle + /// of a command chain without breaking the chain. + fn inspect(self, inspect: impl FnOnce(&T) + Send + 'static) -> BufferedCommand { + self.add_sync_command(|t| { + inspect(&t); + t + }) + } +} + +impl> Inspect for C {} + pub trait Unwrap: Command> { /// Unwraps the [`Option`] value returned by the previous command, and returns the result if it /// was [`Some(T)`]. If the value was [`None`], it halts the entire command chain. This will lead to the @@ -64,25 +78,19 @@ pub trait Unwrap: Command> { /// /// [`TaskHandle`]: crate::task::TaskHandle fn unwrap(self) -> BufferedCommand { - self.add_sync_command(|t| { - t.unwrap() - }) + self.add_sync_command(|t| t.unwrap()) } /// Unwraps the [`Option`] value returned by the previous command, and returns the result if it /// was [`Some(T)`]. If the value was [`None`], the default value provided is returned instead. fn unwrap_or(self, default: T) -> BufferedCommand { - self.add_sync_command(|t| { - t.unwrap_or(default) - }) + self.add_sync_command(|t| t.unwrap_or(default)) } /// Unwraps the [`Option`] value returned by the previous command, and returns the result if it /// was [`Some(T)`]. If the value was [`None`], the default value provided is returned instead. fn unwrap_or_else(self, default: impl FnOnce() -> T + Send + 'static) -> BufferedCommand { - self.add_sync_command(|t| { - t.unwrap_or_else(default) - }) + self.add_sync_command(|t| t.unwrap_or_else(default)) } /// Unwraps the [`Option`] value returned by the previous command, and returns the result if it @@ -92,9 +100,7 @@ pub trait Unwrap: Command> { /// [`TaskHandle`]: crate::task::TaskHandle fn expect(self, msg: &str) -> BufferedCommand { let msg = msg.to_string(); - self.add_sync_command(move |t| { - t.expect(&msg) - }) + self.add_sync_command(move |t| t.expect(&msg)) } } @@ -107,25 +113,19 @@ pub trait UnwrapOk: Command> { /// /// [`TaskHandle`]: crate::task::TaskHandle fn unwrap(self) -> BufferedCommand { - self.add_sync_command(|t| { - t.unwrap() - }) + self.add_sync_command(|t| t.unwrap()) } /// Unwraps the [`Result`] value returned by the previous command, and returns the result if it /// was [`Ok(T)`]. If the value was [`Err(E)`], the default value provided is returned instead. fn unwrap_or(self, default: T) -> BufferedCommand { - self.add_sync_command(|t| { - t.unwrap_or(default) - }) + self.add_sync_command(|t| t.unwrap_or(default)) } /// Unwraps the [`Result`] value returned by the previous command, and returns the result if it /// was [`Ok(T)`]. If the value was [`Err(E)`], the default value provided is returned instead. fn unwrap_or_else(self, default: impl FnOnce(E) -> T + Send + 'static) -> BufferedCommand { - self.add_sync_command(|t| { - t.unwrap_or_else(default) - }) + self.add_sync_command(|t| t.unwrap_or_else(default)) } /// Unwraps the [`Result`] value returned by the previous command, and returns the result if it @@ -135,9 +135,7 @@ pub trait UnwrapOk: Command> { /// [`TaskHandle`]: crate::task::TaskHandle fn expect(self, msg: &str) -> BufferedCommand { let msg = msg.to_string(); - self.add_sync_command(move |t| { - t.expect(&msg) - }) + self.add_sync_command(move |t| t.expect(&msg)) } } @@ -150,9 +148,7 @@ pub trait UnwrapErr: Command> { /// /// [`TaskHandle`]: crate::task::TaskHandle fn unwrap_err(self) -> BufferedCommand { - self.add_sync_command(|t| { - t.unwrap_err() - }) + self.add_sync_command(|t| t.unwrap_err()) } /// Unwraps the [`Result`] error value returned by the previous command, and returns the result if it @@ -162,9 +158,7 @@ pub trait UnwrapErr: Command> { /// [`TaskHandle`]: crate::task::TaskHandle fn expect_err(self, msg: &str) -> BufferedCommand { let msg = msg.to_string(); - self.add_sync_command(move |t| { - t.expect_err(&msg) - }) + self.add_sync_command(move |t| t.expect_err(&msg)) } }