Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 133 additions & 69 deletions src/command_buffers/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
use crate::block::Signal;
use crate::sync::{Fence, Semaphore, SemaphoreUsage};
use crate::task::{Task, TaskController, TaskHandle, TaskState};
use crate::MVSynced;
use mvutils::utils::next_id;
use mvutils::{id_eq, sealable};
use std::alloc::{alloc_zeroed, dealloc, Layout};
use std::error::Error;
use std::fmt::{Debug, Display, Formatter};
use std::future::Future;
use std::sync::{Arc, RwLock};
use mvutils::{id_eq, sealable};
use mvutils::utils::next_id;
use crate::block::Signal;
use crate::MVSynced;
use crate::sync::{Fence, Semaphore, SemaphoreUsage};
use crate::task::{Task, TaskController, TaskHandle, TaskState};

sealable!();

struct TaskChain {
id: u64,
tasks: Vec<Task>,
controllers: Vec<TaskController>
controllers: Vec<TaskController>,
}

id_eq!(TaskChain);
Expand All @@ -25,7 +25,7 @@ impl TaskChain {
TaskChain {
id: next_id("MVSync"),
tasks: Vec::new(),
controllers: Vec::new()
controllers: Vec::new(),
}
}

Expand Down Expand Up @@ -63,11 +63,11 @@ impl TaskChain {
self.tasks[i - 1].bind_semaphore(semaphore.clone(), SemaphoreUsage::Signal);
self.tasks[i].bind_semaphore(semaphore.clone(), SemaphoreUsage::Wait);
}
self.tasks.drain(..).collect()
std::mem::take(&mut self.tasks)
}

fn get_controllers(&mut self) -> Vec<TaskController> {
self.controllers.drain(..).collect()
std::mem::take(&mut self.controllers)
}
}

Expand All @@ -76,11 +76,14 @@ struct RawCommandBuffer {
chain_links: Vec<(usize, usize)>,
fences: Vec<(usize, Arc<Fence>)>,
group: usize,
signal: Arc<Signal>
signal: Arc<Signal>,
}

impl RawCommandBuffer {
fn add_sync_task<T: MVSynced>(&mut self, function: impl FnOnce() -> T + Send + 'static) -> TaskHandle<T> {
fn add_sync_task<T: MVSynced>(
&mut self,
function: impl FnOnce() -> T + Send + 'static,
) -> TaskHandle<T> {
if !self.tasks[self.group].tasks.is_empty() {
self.group += 1;
self.tasks.push(TaskChain::new());
Expand All @@ -94,7 +97,10 @@ impl RawCommandBuffer {
result
}

fn add_task<T: MVSynced, F: Future<Output = T> + Send>(&mut self, function: impl FnOnce() -> F + Send + 'static) -> TaskHandle<T> {
fn add_task<T: MVSynced, F: Future<Output = T> + Send>(
&mut self,
function: impl FnOnce() -> F + Send + 'static,
) -> TaskHandle<T> {
if !self.tasks[self.group].tasks.is_empty() {
self.group += 1;
self.tasks.push(TaskChain::new());
Expand All @@ -108,22 +114,42 @@ impl RawCommandBuffer {
result
}

fn add_chained_task<T: MVSynced, R: MVSynced, F: Future<Output = R> + Send>(&mut self, function: impl FnOnce(T) -> F + Send + 'static, predecessor: TaskHandle<T>) -> TaskHandle<R> {
fn add_chained_task<T: MVSynced, R: MVSynced, F: Future<Output = R> + Send>(
&mut self,
function: impl FnOnce(T) -> F + Send + 'static,
predecessor: TaskHandle<T>,
) -> TaskHandle<R> {
let buffer = Arc::new(RwLock::new(None));
let state = Arc::new(RwLock::new(TaskState::Pending));
let signal = Arc::new(Signal::new());
let result = TaskHandle::new(buffer.clone(), state.clone(), signal.clone());
let task = Task::from_async_continuation(function, buffer, state, [signal, self.signal.clone()], predecessor);
let task = Task::from_async_continuation(
function,
buffer,
state,
[signal, self.signal.clone()],
predecessor,
);
self.tasks[self.group].add_task(task, result.make_controller());
result
}

fn add_sync_chained_task<T: MVSynced, R: MVSynced>(&mut self, function: impl FnOnce(T) -> R + Send + 'static, predecessor: TaskHandle<T>) -> TaskHandle<R> {
fn add_sync_chained_task<T: MVSynced, R: MVSynced>(
&mut self,
function: impl FnOnce(T) -> R + Send + 'static,
predecessor: TaskHandle<T>,
) -> TaskHandle<R> {
let buffer = Arc::new(RwLock::new(None));
let state = Arc::new(RwLock::new(TaskState::Pending));
let signal = Arc::new(Signal::new());
let result = TaskHandle::new(buffer.clone(), state.clone(), signal.clone());
let task = Task::from_continuation(function, buffer, state, [signal, self.signal.clone()], predecessor);
let task = Task::from_continuation(
function,
buffer,
state,
[signal, self.signal.clone()],
predecessor,
);
self.tasks[self.group].add_task(task, result.make_controller());
result
}
Expand All @@ -132,29 +158,30 @@ impl RawCommandBuffer {
self.tasks[self.group].get_controllers()
}

fn finish(&self) -> Vec<Task> {
unsafe {
let this = (self as *const RawCommandBuffer).cast_mut().as_mut().unwrap();
for (i, fence) in this.fences.drain(..) {
if i < this.tasks.len() {
this.tasks[i].ending_fence(fence);
}
else {
panic!("Fence index '{}' out of bounds", i);
}
fn finish(&mut self) -> Vec<Task> {
for (i, fence) in self.fences.drain(..) {
if i < self.tasks.len() {
self.tasks[i].ending_fence(fence);
} else {
panic!("Fence index '{}' out of bounds", i);
}
for (e, s) in this.chain_links.drain(..) {
if e < this.tasks.len() && s < this.tasks.len() {
let semaphore = Arc::new(Semaphore::new());
this.tasks[e].ending_semaphore(semaphore.clone());
this.tasks[s].starting_semaphore(semaphore);
}
else {
panic!("Semaphore index '{}' or '{}' out of bounds", s, e);
}
}

for (e, s) in self.chain_links.drain(..) {
if e < self.tasks.len() && s < self.tasks.len() {
let semaphore = Arc::new(Semaphore::new());
self.tasks[e].ending_semaphore(semaphore.clone());
self.tasks[s].starting_semaphore(semaphore);
} else {
panic!("Semaphore index '{}' or '{}' out of bounds", s, e);
}
this.tasks.drain(..).flat_map(|mut chain| chain.chain_semaphores()).collect()
}

let mut tasks = Vec::with_capacity(self.tasks.iter().map(|chain| chain.tasks.len()).sum());
for mut chain in self.tasks.drain(..) {
tasks.extend(chain.chain_semaphores());
}
tasks
}
}

Expand Down Expand Up @@ -197,43 +224,55 @@ impl CommandBuffer {
chain_links: Vec::new(),
fences: Vec::new(),
group: 0,
signal
signal,
});
refs.write(1);

Ok(
CommandBuffer {
id: next_id("MVSync"),
ptr,
refs,
baked: None
}
)
Ok(CommandBuffer {
id: next_id("MVSync"),
ptr,
refs,
baked: None,
})
}
}

fn chain_task<T: MVSynced, R: MVSynced, F: Future<Output=R> + Send>(&self, function: impl FnOnce(T) -> F + Send + 'static, predecessor: TaskHandle<T>) -> TaskHandle<R> {
fn chain_task<T: MVSynced, R: MVSynced, F: Future<Output = R> + Send>(
&self,
function: impl FnOnce(T) -> F + Send + 'static,
predecessor: TaskHandle<T>,
) -> TaskHandle<R> {
unsafe {
self.ptr.as_mut().unwrap().add_chained_task(function, predecessor)
self.ptr
.as_mut()
.unwrap()
.add_chained_task(function, predecessor)
}
}

fn chain_sync_task<T: MVSynced, R: MVSynced>(&self, function: impl FnOnce(T) -> R + Send + 'static, predecessor: TaskHandle<T>) -> TaskHandle<R> {
fn chain_sync_task<T: MVSynced, R: MVSynced>(
&self,
function: impl FnOnce(T) -> R + Send + 'static,
predecessor: TaskHandle<T>,
) -> TaskHandle<R> {
unsafe {
self.ptr.as_mut().unwrap().add_sync_chained_task(function, predecessor)
self.ptr
.as_mut()
.unwrap()
.add_sync_chained_task(function, predecessor)
}
}

fn get_controllers(&self) -> Vec<TaskController> {
unsafe {
self.ptr.as_mut().unwrap().get_controllers()
}
unsafe { self.ptr.as_mut().unwrap().get_controllers() }
}

pub(crate) fn tasks(self) -> Vec<Task> {
unsafe {
let this = (&self as *const CommandBuffer).cast_mut().as_mut().unwrap();
this.baked.take().expect("Command buffer has now been baked!")
this.baked
.take()
.expect("Command buffer has now been baked!")
}
}

Expand Down Expand Up @@ -275,7 +314,11 @@ impl CommandBuffer {
panic!("You cannot modify a baked command buffer!");
}
unsafe {
self.ptr.as_mut().unwrap().chain_links.push((dependency, successor));
self.ptr
.as_mut()
.unwrap()
.chain_links
.push((dependency, successor));
}
}

Expand Down Expand Up @@ -314,7 +357,7 @@ impl Clone for CommandBuffer {
id: self.id,
ptr: self.ptr,
refs: self.refs,
baked: None
baked: None,
}
}
}
Expand All @@ -339,16 +382,25 @@ sealed!(
/// Add a command and continue the command chain, returning the result of the command wrapped in
/// a buffered command. Do not call this function directly unless you are defining you own commands.
/// This is the same as [`add_command`], however it takes in a synchronous function instead.
fn add_sync_command<T: MVSynced>(&self, function: impl FnOnce() -> T + Send + 'static) -> BufferedCommand<T>;
fn add_sync_command<T: MVSynced>(
&self,
function: impl FnOnce() -> T + Send + 'static,
) -> BufferedCommand<T>;

/// Add a command and continue the command chain, returning the result of the command wrapped in
/// a buffered command. Do not call this function directly unless you are defining you own commands.
fn add_command<T: MVSynced, F: Future<Output = T> + Send>(&self, function: impl FnOnce() -> F + Send + 'static) -> BufferedCommand<T>;
fn add_command<T: MVSynced, F: Future<Output = T> + Send>(
&self,
function: impl FnOnce() -> F + Send + 'static,
) -> BufferedCommand<T>;
}
);

impl CommandBufferEntry for CommandBuffer {
fn add_sync_command<T: MVSynced>(&self, function: impl FnOnce() -> T + Send + 'static) -> BufferedCommand<T> {
fn add_sync_command<T: MVSynced>(
&self,
function: impl FnOnce() -> T + Send + 'static,
) -> BufferedCommand<T> {
if self.baked.is_some() {
panic!("You cannot modify a baked command buffer!");
}
Expand All @@ -357,7 +409,10 @@ impl CommandBufferEntry for CommandBuffer {
})
}

fn add_command<T: MVSynced, F: Future<Output = T> + Send>(&self, function: impl FnOnce() -> F + Send + 'static) -> BufferedCommand<T> {
fn add_command<T: MVSynced, F: Future<Output = T> + Send>(
&self,
function: impl FnOnce() -> F + Send + 'static,
) -> BufferedCommand<T> {
if self.baked.is_some() {
panic!("You cannot modify a baked command buffer!");
}
Expand Down Expand Up @@ -433,12 +488,18 @@ pub trait Command<T: MVSynced>: Sized + Sealed {

/// Add a command and continue the command chain, returning the result of the command wrapped in
/// a buffered command. Do not call this function directly unless you are defining you own commands.
fn add_command<R: MVSynced, F: Future<Output = R> + Send>(self, function: impl FnOnce(T) -> F + Send + 'static) -> BufferedCommand<R>;
fn add_command<R: MVSynced, F: Future<Output = R> + Send>(
self,
function: impl FnOnce(T) -> F + Send + 'static,
) -> BufferedCommand<R>;

/// Add a command and continue the command chain, returning the result of the command wrapped in
/// a buffered command. Do not call this function directly unless you are defining you own commands.
/// This is the same as [`add_command`], however it takes in a synchronous function instead.
fn add_sync_command<R: MVSynced>(self, function: impl FnOnce(T) -> R + Send + 'static) -> BufferedCommand<R>;
fn add_sync_command<R: MVSynced>(
self,
function: impl FnOnce(T) -> R + Send + 'static,
) -> BufferedCommand<R>;
}

/// A buffered command. This is returned when you add a command to a command buffer, and can be
Expand All @@ -453,10 +514,7 @@ impl<T: MVSynced> Sealed for BufferedCommand<T> {}

impl<T: MVSynced> BufferedCommand<T> {
fn new(parent: CommandBuffer, response: TaskHandle<T>) -> Self {
BufferedCommand {
parent,
response
}
BufferedCommand { parent, response }
}
}

Expand All @@ -470,15 +528,21 @@ impl<T: MVSynced> Command<T> for BufferedCommand<T> {
(self.response, controllers)
}

fn add_command<R: MVSynced, F: Future<Output = R> + Send>(self, function: impl FnOnce(T) -> F + Send + 'static) -> BufferedCommand<R> {
fn add_command<R: MVSynced, F: Future<Output = R> + Send>(
self,
function: impl FnOnce(T) -> F + Send + 'static,
) -> BufferedCommand<R> {
let parent = self.parent().clone();
let response = parent.chain_task(function, self.response);
BufferedCommand::new(parent, response)
}

fn add_sync_command<R: MVSynced>(self, function: impl FnOnce(T) -> R + Send + 'static) -> BufferedCommand<R> {
fn add_sync_command<R: MVSynced>(
self,
function: impl FnOnce(T) -> R + Send + 'static,
) -> BufferedCommand<R> {
let parent = self.parent().clone();
let response = parent.chain_sync_task(function, self.response);
BufferedCommand::new(parent, response)
}
}
}
Loading