diff --git a/Cargo.lock b/Cargo.lock index 277c019576..a67172ec86 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -864,6 +864,7 @@ dependencies = [ "raw-cpuid", "riscv", "sbi-rt", + "seahash", "semihosting", "shell-words", "simple-shell", @@ -1693,6 +1694,12 @@ dependencies = [ "syn", ] +[[package]] +name = "seahash" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" + [[package]] name = "semihosting" version = "0.1.25" diff --git a/Cargo.toml b/Cargo.toml index 5cf5f8b083..cb814c3896 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -348,6 +348,7 @@ thiserror = { version = "2", default-features = false } time = { version = "0.3", default-features = false } volatile = "0.6" uhyve-interface = { version = "0.2", optional = true } +seahash = "4.1.0" [dependencies.smoltcp] version = "0.13" diff --git a/src/arch/x86_64/kernel/apic.rs b/src/arch/x86_64/kernel/apic.rs index a5fefe4fbd..b8d1b0bf4a 100644 --- a/src/arch/x86_64/kernel/apic.rs +++ b/src/arch/x86_64/kernel/apic.rs @@ -886,10 +886,7 @@ pub fn ipi_tlb_flush() { #[allow(unused_variables)] pub fn wakeup_core(core_id_to_wakeup: CoreId) { #[cfg(all(feature = "smp", not(feature = "idle-poll")))] - if core_id_to_wakeup != core_id() - && !processor::supports_mwait() - && scheduler::take_core_hlt_state(core_id_to_wakeup) - { + if core_id_to_wakeup != core_id() && !processor::supports_mwait() { without_interrupts(|| { let apic_ids = CPU_LOCAL_APIC_IDS.lock(); let local_apic_id = apic_ids[core_id_to_wakeup as usize]; diff --git a/src/arch/x86_64/kernel/core_local.rs b/src/arch/x86_64/kernel/core_local.rs index 9b89a7659b..000b07ca26 100644 --- a/src/arch/x86_64/kernel/core_local.rs +++ b/src/arch/x86_64/kernel/core_local.rs @@ -1,8 +1,6 @@ use alloc::boxed::Box; use core::arch::asm; use core::cell::Cell; -#[cfg(feature = "smp")] -use core::sync::atomic::AtomicBool; use core::sync::atomic::Ordering; use core::{mem, ptr}; @@ -34,8 +32,6 @@ pub(crate) struct CoreLocal { irq_statistics: &'static IrqStatistics, /// The core-local async executor. ex: StaticLocalExecutor, - #[cfg(feature = "smp")] - pub hlt: AtomicBool, /// Queues to handle incoming requests from the other cores #[cfg(feature = "smp")] pub scheduler_input: InterruptTicketMutex, @@ -63,8 +59,6 @@ impl CoreLocal { irq_statistics, ex: StaticLocalExecutor::new(), #[cfg(feature = "smp")] - hlt: AtomicBool::new(false), - #[cfg(feature = "smp")] scheduler_input: InterruptTicketMutex::new(SchedulerInput::new()), }; let this = if core_id == 0 { diff --git a/src/arch/x86_64/kernel/interrupts.rs b/src/arch/x86_64/kernel/interrupts.rs index 6d39f076ef..02c8e3cfec 100644 --- a/src/arch/x86_64/kernel/interrupts.rs +++ b/src/arch/x86_64/kernel/interrupts.rs @@ -82,8 +82,6 @@ pub(crate) fn enable_and_wait() { ); } } else { - #[cfg(feature = "smp")] - crate::CoreLocal::get().hlt.store(true, Ordering::Relaxed); enable_and_hlt(); } } diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 8731482ace..e665e82cab 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -8,8 +8,6 @@ use alloc::sync::Arc; use alloc::vec::Vec; use core::cell::RefCell; use core::ptr; -#[cfg(all(target_arch = "x86_64", feature = "smp"))] -use core::sync::atomic::AtomicBool; use core::sync::atomic::{AtomicI32, AtomicU32, Ordering}; use ahash::RandomState; @@ -40,8 +38,6 @@ static NO_TASKS: AtomicU32 = AtomicU32::new(0); #[cfg(feature = "smp")] static SCHEDULER_INPUTS: SpinMutex>> = SpinMutex::new(Vec::new()); -#[cfg(all(target_arch = "x86_64", feature = "smp"))] -static CORE_HLT_STATE: SpinMutex> = SpinMutex::new(Vec::new()); /// Map between Task ID and Queue of waiting tasks static WAITING_TASKS: InterruptTicketMutex>> = InterruptTicketMutex::new(BTreeMap::new()); @@ -891,19 +887,9 @@ pub(crate) fn add_current_core() { core_id.try_into().unwrap(), &CoreLocal::get().scheduler_input, ); - #[cfg(target_arch = "x86_64")] - CORE_HLT_STATE - .lock() - .insert(core_id.try_into().unwrap(), &CoreLocal::get().hlt); } } -#[inline] -#[cfg(all(target_arch = "x86_64", feature = "smp", not(feature = "idle-poll")))] -pub(crate) fn take_core_hlt_state(core_id: CoreId) -> bool { - CORE_HLT_STATE.lock()[usize::try_from(core_id).unwrap()].swap(false, Ordering::Acquire) -} - #[inline] #[cfg(feature = "smp")] fn get_scheduler_input(core_id: CoreId) -> &'static InterruptTicketMutex { diff --git a/src/synch/futex.rs b/src/synch/futex.rs index 5cdfb8bd44..51207febba 100644 --- a/src/synch/futex.rs +++ b/src/synch/futex.rs @@ -1,20 +1,123 @@ +use alloc::collections::LinkedList; +use alloc::collections::linked_list::CursorMut; use core::sync::atomic::AtomicU32; use core::sync::atomic::Ordering::SeqCst; -use ahash::RandomState; -use hashbrown::HashMap; -use hashbrown::hash_map::Entry; -use hermit_sync::InterruptTicketMutex; +use hermit_sync::{InterruptSpinMutex, InterruptSpinMutexGuard}; use crate::arch::kernel::core_local::core_scheduler; use crate::arch::kernel::processor::get_timer_ticks; use crate::errno::Errno; use crate::scheduler::PerCoreSchedulerExt; -use crate::scheduler::task::TaskHandlePriorityQueue; +use crate::scheduler::task::{TaskHandle, TaskHandlePriorityQueue}; -// TODO: Replace with a concurrent hashmap. -static PARKING_LOT: InterruptTicketMutex> = - InterruptTicketMutex::new(HashMap::with_hasher(RandomState::with_seeds(0, 0, 0, 0))); +struct BucketElem(usize, TaskHandlePriorityQueue); + +type Bucket = InterruptSpinMutex; + +#[repr(transparent)] +struct TaskListBucket(LinkedList); + +struct WrappedCursor<'a>(CursorMut<'a, BucketElem>); + +impl WrappedCursor<'_> { + pub fn pop(&mut self) -> Option { + match self.0.current() { + None => None, + Some(task_list) => { + let task = task_list.1.pop(); + + if task_list.1.is_empty() { + self.0.remove_current(); + } + + task + } + } + } +} + +impl TaskListBucket { + pub fn insert_task(&mut self, address: usize, handle: TaskHandle) { + for elem in self.0.iter_mut() { + if elem.0 == address { + elem.1.push(handle); + return; + } + } + + let mut task_list = TaskHandlePriorityQueue::new(); + task_list.push(handle); + self.0.push_front(BucketElem(address, task_list)); + } + + pub fn contains_task(&self, address: usize, handle: TaskHandle) -> bool { + for elem in self.0.iter() { + if elem.0 == address { + return elem.1.contains(handle); + } + } + false + } + + /// Removes a task from this bucket, and returns a boolean indicating if it was present. + pub fn remove_task(&mut self, address: usize, task: TaskHandle) -> bool { + let mut cursor = self.0.cursor_front_mut(); + while let Some(elem) = cursor.current() { + if elem.0 == address { + let was_present = elem.1.remove(task); + + if elem.1.is_empty() { + cursor.remove_current(); + } + + return was_present; + } + cursor.move_next(); + } + + false + } + + fn get_pop_list(&mut self, address: usize) -> Option> { + let mut cursor = self.0.cursor_front_mut(); + while let Some(elem) = cursor.current() { + if elem.0 == address { + return Some(WrappedCursor(cursor)); + } + cursor.move_next(); + } + None + } +} + +struct BucketList([Bucket; N]); + +impl BucketList { + pub const fn new() -> Self { + Self([const { InterruptSpinMutex::new(TaskListBucket(LinkedList::new())) }; N]) + } + + fn hash_key(v: usize) -> usize { + let v = (v >> 3).to_be_bytes(); + let hashed = seahash::hash(&v) as usize; + hashed % N + } + + pub fn lock_bucket(&self, address: usize) -> InterruptSpinMutexGuard<'_, TaskListBucket> { + if N == 1 { + return self.0[0].lock(); + } + let bucket = Self::hash_key(address); + self.0[bucket].lock() + } +} + +#[cfg(feature = "smp")] +static PARKING_LOT: BucketList<64> = BucketList::new(); + +#[cfg(not(feature = "smp"))] +static PARKING_LOT: BucketList<1> = BucketList::new(); bitflags! { pub struct Flags: u32 { @@ -23,6 +126,7 @@ bitflags! { } } +#[inline(always)] fn addr(addr: &AtomicU32) -> usize { let ptr: *const _ = addr; ptr.addr() @@ -40,7 +144,8 @@ pub(crate) fn futex_wait( timeout: Option, flags: Flags, ) -> i32 { - let mut parking_lot = PARKING_LOT.lock(); + let address_usize = addr(address); + let mut parking_lot = PARKING_LOT.lock_bucket(address_usize); // Check the futex value after locking the parking lot so that all changes are observed. if address.load(SeqCst) != expected { return -i32::from(Errno::Again); @@ -55,40 +160,34 @@ pub(crate) fn futex_wait( let scheduler = core_scheduler(); scheduler.block_current_task(wakeup_time); let handle = scheduler.get_current_task_handle(); - parking_lot.entry(addr(address)).or_default().push(handle); + parking_lot.insert_task(address_usize, handle); drop(parking_lot); loop { scheduler.reschedule(); + // Assume this will return immediately (no other task on core!) - let mut parking_lot = PARKING_LOT.lock(); + let mut parking_lot = PARKING_LOT.lock_bucket(address_usize); if matches!(wakeup_time, Some(t) if t <= get_timer_ticks()) { - let mut wakeup = true; // Timeout occurred, try to remove ourselves from the waiting queue. - if let Entry::Occupied(mut queue) = parking_lot.entry(addr(address)) { - // If we are not in the waking queue, this must have been a wakeup. - wakeup = !queue.get_mut().remove(handle); - if queue.get().is_empty() { - queue.remove(); - } - } + let was_present = parking_lot.remove_task(address_usize, handle); - if wakeup { - return 0; + return if was_present { + -i32::from(Errno::Timedout) } else { - return -i32::from(Errno::Timedout); - } + // If we are not in the waking queue, this must have been a wakeup. + 0 + }; } else { - // If we are not in the waking queue, this must have been a wakeup. - let wakeup = !matches!(parking_lot - .get(&addr(address)), Some(queue) if queue.contains(handle)); + let is_in_queue = parking_lot.contains_task(address_usize, handle); - if wakeup { - return 0; - } else { + if is_in_queue { // A spurious wakeup occurred, sleep again. // Tasks do not change core, so the handle in the parking lot is still current. scheduler.block_current_task(wakeup_time); + } else { + // If we are not in the waking queue, this must have been a wakeup. + return 0; } } drop(parking_lot); @@ -109,7 +208,8 @@ pub(crate) fn futex_wait_and_set( flags: Flags, new_value: u32, ) -> i32 { - let mut parking_lot = PARKING_LOT.lock(); + let address_usize = addr(address); + let mut parking_lot = PARKING_LOT.lock_bucket(address_usize); // Check the futex value after locking the parking lot so that all changes are observed. if address.swap(new_value, SeqCst) != expected { return -i32::from(Errno::Again); @@ -124,40 +224,33 @@ pub(crate) fn futex_wait_and_set( let scheduler = core_scheduler(); scheduler.block_current_task(wakeup_time); let handle = scheduler.get_current_task_handle(); - parking_lot.entry(addr(address)).or_default().push(handle); + parking_lot.insert_task(address_usize, handle); drop(parking_lot); loop { scheduler.reschedule(); - let mut parking_lot = PARKING_LOT.lock(); + let mut parking_lot = PARKING_LOT.lock_bucket(address_usize); if matches!(wakeup_time, Some(t) if t <= get_timer_ticks()) { - let mut wakeup = true; // Timeout occurred, try to remove ourselves from the waiting queue. - if let Entry::Occupied(mut queue) = parking_lot.entry(addr(address)) { - // If we are not in the waking queue, this must have been a wakeup. - wakeup = !queue.get_mut().remove(handle); - if queue.get().is_empty() { - queue.remove(); - } - } + let was_present = parking_lot.remove_task(address_usize, handle); - if wakeup { - return 0; + return if was_present { + -i32::from(Errno::Timedout) } else { - return -i32::from(Errno::Timedout); - } + // If we are not in the waking queue, this must have been a wakeup. + 0 + }; } else { - // If we are not in the waking queue, this must have been a wakeup. - let wakeup = !matches!(parking_lot - .get(&addr(address)), Some(queue) if queue.contains(handle)); + let is_in_queue = parking_lot.contains_task(address_usize, handle); - if wakeup { - return 0; - } else { + if is_in_queue { // A spurious wakeup occurred, sleep again. // Tasks do not change core, so the handle in the parking lot is still current. scheduler.block_current_task(wakeup_time); + } else { + // If we are not in the waking queue, this must have been a wakeup. + return 0; } } drop(parking_lot); @@ -174,26 +267,22 @@ pub(crate) fn futex_wake(address: *const AtomicU32, count: i32) -> i32 { return -i32::from(Errno::Inval); } - let mut parking_lot = PARKING_LOT.lock(); - let mut queue = match parking_lot.entry(address.addr()) { - Entry::Occupied(entry) => entry, - Entry::Vacant(_) => return 0, + let address_usize = address.addr(); + let mut parking_lot = PARKING_LOT.lock_bucket(address_usize); + let Some(mut queue) = parking_lot.get_pop_list(address_usize) else { + return 0; }; let scheduler = core_scheduler(); let mut woken = 0; while woken != count || count == i32::MAX { - match queue.get_mut().pop() { + match queue.pop() { Some(handle) => scheduler.custom_wakeup(handle), None => break, } woken = woken.saturating_add(1); } - if queue.get().is_empty() { - queue.remove(); - } - woken } @@ -206,29 +295,23 @@ pub(crate) fn futex_wake_or_set(address: &AtomicU32, count: i32, new_value: u32) return -i32::from(Errno::Inval); } - let mut parking_lot = PARKING_LOT.lock(); - let mut queue = match parking_lot.entry(addr(address)) { - Entry::Occupied(entry) => entry, - Entry::Vacant(_) => { - address.store(new_value, SeqCst); - return 0; - } + let address_usize = addr(address); + let mut parking_lot = PARKING_LOT.lock_bucket(address_usize); + let Some(mut queue) = parking_lot.get_pop_list(address_usize) else { + address.store(new_value, SeqCst); + return 0; }; let scheduler = core_scheduler(); let mut woken = 0; while woken != count || count == i32::MAX { - match queue.get_mut().pop() { + match queue.pop() { Some(handle) => scheduler.custom_wakeup(handle), None => break, } woken = woken.saturating_add(1); } - if queue.get().is_empty() { - queue.remove(); - } - if woken == 0 { address.store(new_value, SeqCst); }