diff --git a/CHANGELOG.md b/CHANGELOG.md index 29b81ae4e4..2fac7df2fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 longer lines return `{error, {parser, {line_too_long, Prefix}}}` with the first 128 bytes of the offending line. Callers whose upstream servers emit unusually large headers must account for this limit +- Improved performance of SMP scheduler. As a result, resources selected with `enif_select` and + stopped with the `ERL_NIF_SELECT_STOP_SCHEDULED` result are now released asynchronously by the + scheduler polling events, staying within the boundaries of the BEAM `enif_select` specification ### Removed - Removed `ahttp_client` support for obsolete line folding (RFC 9112 ยง5.2); folded header and diff --git a/src/libAtomVM/CMakeLists.txt b/src/libAtomVM/CMakeLists.txt index 37c2e02bc4..eba7b1fba4 100644 --- a/src/libAtomVM/CMakeLists.txt +++ b/src/libAtomVM/CMakeLists.txt @@ -261,6 +261,7 @@ define_if_function_exists(libAtomVM execve "unistd.h" PRIVATE HAVE_EXECVE) define_if_function_exists(libAtomVM tcgetattr "termios.h" PRIVATE HAVE_TCGETATTR) define_if_function_exists(libAtomVM closefrom "unistd.h" PRIVATE HAVE_CLOSEFROM) define_if_function_exists(libAtomVM getcwd "unistd.h" PUBLIC HAVE_GETCWD) +define_if_function_exists(libAtomVM sched_yield "sched.h" PUBLIC HAVE_SCHED_YIELD) define_if_symbol_exists(libAtomVM POSIX_SPAWN_CLOEXEC_DEFAULT "spawn.h" PRIVATE HAVE_POSIX_SPAWN_CLOEXEC_DEFAULT) define_if_symbol_exists(libAtomVM O_CLOEXEC "fcntl.h" PRIVATE HAVE_O_CLOEXEC) define_if_symbol_exists(libAtomVM O_DIRECTORY "fcntl.h" PRIVATE HAVE_O_DIRECTORY) diff --git a/src/libAtomVM/scheduler.c b/src/libAtomVM/scheduler.c index a856d60012..da3dbb3b1c 100644 --- a/src/libAtomVM/scheduler.c +++ b/src/libAtomVM/scheduler.c @@ -152,6 +152,32 @@ static void scheduler_process_native_signal_messages(Context *ctx) } } +static Context *scheduler_first_runnable_ready(GlobalContext *global) +{ + Context *result = NULL; + SMP_SPINLOCK_LOCK(&global->processes_spinlock); + // Pick first ready which is not running. + struct ListHead *next_ready = list_first(&global->ready_processes); + while (next_ready != &global->ready_processes) { + result = GET_LIST_ENTRY(next_ready, Context, processes_list_head); + if (!(result->flags & Running)) { + list_remove(next_ready); + context_update_flags(result, ~Ready, Running); + if (result->native_handler) { + // Native handlers are marked as waiting + list_append(&global->waiting_processes, next_ready); + } else { + list_append(&global->running_processes, next_ready); + } + break; + } + next_ready = next_ready->next; + result = NULL; + } + SMP_SPINLOCK_UNLOCK(&global->processes_spinlock); + return result; +} + static Context *scheduler_run0(GlobalContext *global) { // This function should return a new process to run. @@ -209,6 +235,13 @@ static Context *scheduler_run0(GlobalContext *global) return NULL; } if (!is_waiting) { + // If a process is ready, process it instead of waking up + // the poller scheduler + result = scheduler_first_runnable_ready(global); + if (result != NULL) { + break; + } + // Before entering the condition variable, signal the poll events // so the thread polling on events can check the ready queue. sys_signal(global); @@ -232,32 +265,23 @@ static Context *scheduler_run0(GlobalContext *global) int32_t wait_timeout = update_timer_list(global); SMP_SPINLOCK_UNLOCK(&global->timer_spinlock); - SMP_SPINLOCK_LOCK(&global->processes_spinlock); - // Pick first ready which is not running. - struct ListHead *next_ready = list_first(&global->ready_processes); - while (next_ready != &global->ready_processes) { - result = GET_LIST_ENTRY(next_ready, Context, processes_list_head); - if (!(result->flags & Running)) { - list_remove(next_ready); - context_update_flags(result, ~Ready, Running); - if (result->native_handler) { - // Native handlers are marked as waiting - list_append(&global->waiting_processes, next_ready); - } else { - list_append(&global->running_processes, next_ready); - } - break; - } - next_ready = next_ready->next; - result = NULL; + if (result == NULL) { + result = scheduler_first_runnable_ready(global); } - SMP_SPINLOCK_UNLOCK(&global->processes_spinlock); - if (result == NULL && !global->scheduler_stop_all) { - sys_poll_events(global, wait_timeout); - } else { - sys_poll_events(global, SYS_POLL_EVENTS_DO_NOT_WAIT); + // Only the poller scheduler drives the event loop. +#ifndef AVM_NO_SMP + if (is_waiting) { +#endif + if (result == NULL && !global->scheduler_stop_all) { + // The poller may block waiting for events. + sys_poll_events(global, wait_timeout); + } else { + sys_poll_events(global, SYS_POLL_EVENTS_DO_NOT_WAIT); + } +#ifndef AVM_NO_SMP } +#endif #ifdef AVM_TASK_DRIVER_ENABLED globalcontext_process_task_driver_queues(global); #endif @@ -265,8 +289,11 @@ static Context *scheduler_run0(GlobalContext *global) } while (result == NULL); #ifndef AVM_NO_SMP - global->waiting_scheduler = false; - smp_condvar_signal(global->schedulers_cv); + // Only the polling scheduler relinquishes the poller role. + if (is_waiting) { + global->waiting_scheduler = false; + smp_condvar_signal(global->schedulers_cv); + } SMP_MUTEX_UNLOCK(global->schedulers_mutex); #endif @@ -356,6 +383,12 @@ static void scheduler_make_ready(Context *ctx) return; } list_remove(&ctx->processes_list_head); + // Move to ready queue (from waiting or running) + // The process may be running (it would be signaled), so mark it + // as ready + context_update_flags(ctx, ~NoFlags, Ready); + list_append(&global->ready_processes, &ctx->processes_list_head); + SMP_SPINLOCK_UNLOCK(&global->processes_spinlock); #ifndef AVM_NO_SMP if (SMP_MUTEX_TRYLOCK(global->schedulers_mutex)) { // Start a new scheduler if none are going to take this process. @@ -366,17 +399,6 @@ static void scheduler_make_ready(Context *ctx) global->running_schedulers++; smp_scheduler_start(global); } - SMP_MUTEX_UNLOCK(global->schedulers_mutex); - } -#endif - // Move to ready queue (from waiting or running) - // The process may be running (it would be signaled), so mark it - // as ready - context_update_flags(ctx, ~NoFlags, Ready); - list_append(&global->ready_processes, &ctx->processes_list_head); - SMP_SPINLOCK_UNLOCK(&global->processes_spinlock); -#ifndef AVM_NO_SMP - if (SMP_MUTEX_TRYLOCK(global->schedulers_mutex)) { if (global->waiting_scheduler) { sys_signal(global); } diff --git a/src/libAtomVM/smp.h b/src/libAtomVM/smp.h index 33416f0324..1a6399a99d 100644 --- a/src/libAtomVM/smp.h +++ b/src/libAtomVM/smp.h @@ -207,16 +207,42 @@ static inline void smp_spinlock_init(SpinLock *lock) lock->lock = 0; } +#if !defined(SMP_SPIN_YIELD) && defined(HAVE_SCHED_YIELD) +#include +#define SMP_SPIN_YIELD() ((void) sched_yield()) +#endif + +#if defined(SMP_SPIN_YIELD) && !defined(SMP_SPIN_YIELD_INTERVAL) +/* + * Number of failed CAS attempts between two yields. Low enough to let + * serialized scheduler threads make progress under Valgrind/CI, while avoiding + * a syscall on very short transient contention. + */ +#define SMP_SPIN_YIELD_INTERVAL 64U +#endif + /** * @brief Lock a spinlock. * @param lock the spin lock to lock */ static inline void smp_spinlock_lock(SpinLock *lock) { +#ifdef SMP_SPIN_YIELD + unsigned int spins = 0; +#endif int current; - do { + while (true) { current = 0; - } while (!ATOMIC_COMPARE_EXCHANGE_WEAK_INT(&lock->lock, ¤t, 1)); + if (ATOMIC_COMPARE_EXCHANGE_WEAK_INT(&lock->lock, ¤t, 1)) { + return; + } +#ifdef SMP_SPIN_YIELD + if (++spins >= SMP_SPIN_YIELD_INTERVAL) { + spins = 0; + SMP_SPIN_YIELD(); + } +#endif + } } /** diff --git a/src/platforms/esp32/components/avm_sys/sys.c b/src/platforms/esp32/components/avm_sys/sys.c index 42786b6c8e..606b26b8fb 100644 --- a/src/platforms/esp32/components/avm_sys/sys.c +++ b/src/platforms/esp32/components/avm_sys/sys.c @@ -39,6 +39,7 @@ #include "esp_system.h" #include "esp_timer.h" #include "freertos/FreeRTOS.h" +#include "freertos/semphr.h" #include "freertos/task.h" #include #include @@ -112,12 +113,16 @@ static const char *const revision_atom = "\x8" "revision"; QueueHandle_t event_queue = NULL; QueueSetHandle_t event_set = NULL; +static SemaphoreHandle_t signal_semaphore = NULL; void esp32_sys_queue_init() { - event_set = xQueueCreateSet(EVENT_QUEUE_LEN * 4); + // + 1 accounts for the signal binary semaphore + event_set = xQueueCreateSet(EVENT_QUEUE_LEN * 4 + 1); event_queue = xQueueCreate(EVENT_QUEUE_LEN, sizeof(void *)); xQueueAddToSet(event_queue, event_set); + signal_semaphore = xSemaphoreCreateBinary(); + xQueueAddToSet(signal_semaphore, event_set); } static inline void sys_clock_gettime(struct timespec *t) @@ -132,6 +137,14 @@ static void receive_events(GlobalContext *glb, TickType_t wait_ticks) void *sender = NULL; QueueSetMemberHandle_t event_source; while ((event_source = xQueueSelectFromSet(event_set, wait_ticks))) { +#if !defined(AVM_NO_SMP) || defined(AVM_TASK_DRIVER_ENABLED) + if (event_source == signal_semaphore) { + // We've been signaled + xSemaphoreTake(signal_semaphore, 0); + return; + } +#endif + // Listener used shared event_queue. if (event_source == event_queue) { if (UNLIKELY(xQueueReceive(event_queue, &sender, 0) == pdFALSE)) { @@ -141,13 +154,6 @@ static void receive_events(GlobalContext *glb, TickType_t wait_ticks) sender = event_source; } -#if !defined(AVM_NO_SMP) || defined(AVM_TASK_DRIVER_ENABLED) - if (sender == CAST_FUNC_TO_VOID_PTR(sys_signal)) { - // We've been signaled - return; - } -#endif - struct ListHead *listeners = synclist_wrlock(&glb->listeners); if (!process_listener_handler(glb, sender, listeners, NULL, NULL)) { TRACE("sys: handler not found for: %p\n", (void *) sender); @@ -170,8 +176,8 @@ void sys_poll_events(GlobalContext *glb, int timeout_ms) void sys_signal(GlobalContext *glb) { - void *queue_item = CAST_FUNC_TO_VOID_PTR(sys_signal); - xQueueSendToBack(event_queue, &queue_item, 0); + UNUSED(glb); + xSemaphoreGive(signal_semaphore); } void sys_time(struct timespec *t) diff --git a/src/platforms/generic_unix/lib/socket_driver.c b/src/platforms/generic_unix/lib/socket_driver.c index 7e9439622d..dd9228c0d2 100644 --- a/src/platforms/generic_unix/lib/socket_driver.c +++ b/src/platforms/generic_unix/lib/socket_driver.c @@ -47,6 +47,9 @@ #include #include +void sys_register_listener_nolock(GlobalContext *global, struct EventListener *listener); +void sys_unregister_listener_nolock(GlobalContext *global, struct EventListener *listener); + // #define ENABLE_TRACE #include "trace.h" @@ -83,6 +86,22 @@ typedef struct SocketDriverData PassiveRecvListener *passive_listener; } SocketDriverData; +static void register_active_listener(GlobalContext *glb, SocketDriverData *socket_data, ActiveRecvListener *listener) +{ + synclist_wrlock(&glb->listeners); + socket_data->active_listener = listener; + sys_register_listener_nolock(glb, &listener->base); + synclist_unlock(&glb->listeners); +} + +static void register_passive_listener(GlobalContext *glb, SocketDriverData *socket_data, PassiveRecvListener *listener) +{ + synclist_wrlock(&glb->listeners); + socket_data->passive_listener = listener; + sys_register_listener_nolock(glb, &listener->base); + synclist_unlock(&glb->listeners); +} + // clang-format off // TODO define in defaultatoms const char *const send_a = "\x4" "send"; @@ -253,8 +272,7 @@ static term init_udp_socket(Context *ctx, SocketDriverData *socket_data, term pa listener->base.handler = active_recvfrom_callback; listener->buf_size = socket_data->buffer; listener->process_id = ctx->process_id; - sys_register_listener(glb, &listener->base); - socket_data->active_listener = listener; + register_active_listener(glb, socket_data, listener); } } return ret; @@ -338,8 +356,7 @@ static term init_client_tcp_socket(Context *ctx, SocketDriverData *socket_data, listener->base.handler = active_recv_callback; listener->buf_size = socket_data->buffer; listener->process_id = ctx->process_id; - sys_register_listener(glb, &listener->base); - socket_data->active_listener = listener; + register_active_listener(glb, socket_data, listener); } } return ret; @@ -731,10 +748,10 @@ static EventListener *active_recv_callback(GlobalContext *glb, EventListener *ba port_send_message_nolock(glb, pid, msg); mailbox_send(ctx, globalcontext_make_atom(glb, close_internal)); // See socket_consume_mailbox close path below - if (socket_data->active_listener) { + if (socket_data->active_listener == listener) { socket_data->active_listener = NULL; - free(listener); } + free(listener); result = NULL; END_WITH_STACK_HEAP(heap, glb); } else { @@ -835,12 +852,12 @@ static EventListener *passive_recv_callback(GlobalContext *glb, EventListener *b port_send_message_nolock(glb, pid, reply); memory_destroy_heap(&heap, glb); } - globalcontext_get_process_unlock(glb, ctx); // See socket_consume_mailbox close path below - if (socket_data->passive_listener) { + if (socket_data->passive_listener == listener) { socket_data->passive_listener = NULL; - free(listener); } + globalcontext_get_process_unlock(glb, ctx); + free(listener); free(buf); // remove the EventListener from the global list return NULL; @@ -975,12 +992,12 @@ static EventListener *passive_recvfrom_callback(GlobalContext *glb, EventListene port_send_message_nolock(glb, pid, reply); memory_destroy_heap(&heap, glb); } - globalcontext_get_process_unlock(glb, ctx); // See socket_consume_mailbox close path below - if (socket_data->passive_listener) { + if (socket_data->passive_listener == listener) { socket_data->passive_listener = NULL; - free(listener); } + globalcontext_get_process_unlock(glb, ctx); + free(listener); free(buf); // remove the EventListener from the global list and clean up return NULL; @@ -1015,8 +1032,7 @@ static void do_recv(Context *ctx, term pid, term ref, term length, term timeout, listener->length = term_to_int(length); listener->buffer = socket_data->buffer; listener->ref_ticks = term_to_ref_ticks(ref); - sys_register_listener(glb, &listener->base); - socket_data->passive_listener = listener; + register_passive_listener(glb, socket_data, listener); } void socket_driver_do_recvfrom(Context *ctx, term pid, term ref, term length, term timeout) @@ -1044,6 +1060,13 @@ static EventListener *accept_callback(GlobalContext *glb, EventListener *base_li socklen_t clientlen = sizeof(clientaddr); int fd = accept(listener->base.fd, (struct sockaddr *) &clientaddr, &clientlen); Context *ctx = globalcontext_get_process_lock(glb, listener->process_id); + if (UNLIKELY(ctx == NULL)) { + if (fd != -1) { + close(fd); + } + free(listener); + return NULL; + } SocketDriverData *socket_data = (SocketDriverData *) ctx->platform_data; EventListener *result = NULL; if (fd == -1) { @@ -1058,6 +1081,21 @@ static EventListener *accept_callback(GlobalContext *glb, EventListener *base_li TRACE("socket_driver|accept_callback: accepted connection. fd: %i\n", fd); term pid = listener->pid; + if (UNLIKELY(fcntl(fd, F_SETFL, O_NONBLOCK) == -1)) { + int err = errno; + close(fd); + BEGIN_WITH_STACK_HEAP(12, heap); + term ref = term_from_ref_ticks(listener->ref_ticks, &heap); + term reply = port_heap_create_reply(&heap, ref, port_heap_create_sys_error_tuple(&heap, FCNTL_ATOM, err)); + port_send_message_nolock(glb, pid, reply); + END_WITH_STACK_HEAP(heap, glb); + if (socket_data->passive_listener == listener) { + socket_data->passive_listener = NULL; + } + globalcontext_get_process_unlock(glb, ctx); + free(listener); + return NULL; + } SocketDriverData *new_socket_data = socket_driver_create_data(); new_socket_data->sockfd = fd; new_socket_data->proto = socket_data->proto; @@ -1070,9 +1108,12 @@ static EventListener *accept_callback(GlobalContext *glb, EventListener *base_li Context *new_ctx = create_accepting_socket(glb, new_socket_data); ctx = globalcontext_get_process_lock(glb, listener->process_id); if (UNLIKELY(ctx == NULL)) { + socket_driver_do_close(new_ctx); + scheduler_terminate(new_ctx); free(listener); return NULL; } + socket_data = (SocketDriverData *) ctx->platform_data; if (new_socket_data->active) { result = &create_accepting_socket_listener(new_ctx, new_socket_data)->base; } @@ -1086,12 +1127,12 @@ static EventListener *accept_callback(GlobalContext *glb, EventListener *base_li port_send_message_nolock(glb, pid, reply); END_WITH_STACK_HEAP(heap, glb); } - globalcontext_get_process_unlock(glb, ctx); // See socket_consume_mailbox close path below - if (socket_data->passive_listener) { + if (socket_data->passive_listener == listener) { socket_data->passive_listener = NULL; - free(listener); } + globalcontext_get_process_unlock(glb, ctx); + free(listener); // remove the EventListener from the global list and replace it if needed return result; } @@ -1117,8 +1158,7 @@ void socket_driver_do_accept(Context *ctx, term pid, term ref, term timeout) listener->length = 0; listener->buffer = 0; listener->ref_ticks = term_to_ref_ticks(ref); - sys_register_listener(glb, &listener->base); - socket_data->passive_listener = listener; + register_passive_listener(glb, socket_data, listener); } static NativeHandlerResult socket_consume_mailbox(Context *ctx) @@ -1194,31 +1234,26 @@ static NativeHandlerResult socket_consume_mailbox(Context *ctx) TRACE("close\n"); port_send_reply(ctx, pid, ref, OK_ATOM); SocketDriverData *socket_data = (SocketDriverData *) ctx->platform_data; - // Callbacks (active_recv_callback, passive_recv_callback) are called - // while glb->listeners lock is held. They may want to free the - // listener, causing a potential double free here. - // We acquire the lock on listeners here and set the listeners - // to NULL in the socket_data structure to prevent them from freeing - // the listeners. + // Callbacks (active_recv_callback, passive_recv_callback, accept_callback) + // are called while glb->listeners lock is held. They may free the + // listener and set the socket_data pointer to NULL. + // We must atomically detach the pointers AND unlink from the listeners + // list under the same lock hold, to prevent a race where the callback + // also unlinks the same listener node. synclist_wrlock(&glb->listeners); ActiveRecvListener *active_listener = socket_data->active_listener; PassiveRecvListener *passive_listener = socket_data->passive_listener; socket_data->active_listener = NULL; socket_data->passive_listener = NULL; - synclist_unlock(&glb->listeners); if (active_listener) { - // Then we unregister, which also acquires the lock. The callbacks - // may have returned NULL which means the listener would no longer - // be registered, but this will work. - sys_unregister_listener(glb, &active_listener->base); - // After the listener is unregistered, the callbacks can no longer - // be called, so we can eventually free the listener - free(active_listener); + sys_unregister_listener_nolock(glb, &active_listener->base); } if (passive_listener) { - sys_unregister_listener(glb, &passive_listener->base); - free(passive_listener); + sys_unregister_listener_nolock(glb, &passive_listener->base); } + synclist_unlock(&glb->listeners); + free(active_listener); + free(passive_listener); socket_driver_do_close(ctx); // We don't need to remove message. return NativeTerminate; diff --git a/src/platforms/generic_unix/lib/sys.c b/src/platforms/generic_unix/lib/sys.c index ce6a032fc9..5fc26dbbcb 100644 --- a/src/platforms/generic_unix/lib/sys.c +++ b/src/platforms/generic_unix/lib/sys.c @@ -660,9 +660,8 @@ void event_listener_add_to_polling_set(struct EventListener *listener, GlobalCon #endif } -void sys_register_listener(GlobalContext *global, struct EventListener *listener) +void sys_register_listener_nolock(GlobalContext *global, struct EventListener *listener) { - struct ListHead *listeners = synclist_wrlock(&global->listeners); event_listener_add_to_polling_set(listener, global); #ifndef AVM_NO_SMP #ifndef HAVE_KQUEUE @@ -670,7 +669,13 @@ void sys_register_listener(GlobalContext *global, struct EventListener *listener sys_signal(global); #endif #endif - list_append(listeners, &listener->listeners_list_head); + list_append(synclist_nolock(&global->listeners), &listener->listeners_list_head); +} + +void sys_register_listener(GlobalContext *global, struct EventListener *listener) +{ + synclist_wrlock(&global->listeners); + sys_register_listener_nolock(global, listener); synclist_unlock(&global->listeners); } @@ -692,10 +697,17 @@ static void listener_event_remove_from_polling_set(listener_event_t listener_fd, #endif } -void sys_unregister_listener(GlobalContext *global, struct EventListener *listener) +void sys_unregister_listener_nolock(GlobalContext *global, struct EventListener *listener) { listener_event_remove_from_polling_set(listener->fd, global); - synclist_remove(&global->listeners, &listener->listeners_list_head); + list_remove(&listener->listeners_list_head); +} + +void sys_unregister_listener(GlobalContext *global, struct EventListener *listener) +{ + synclist_wrlock(&global->listeners); + sys_unregister_listener_nolock(global, listener); + synclist_unlock(&global->listeners); } void sys_register_select_event(GlobalContext *global, ErlNifEvent event, bool is_write) @@ -729,6 +741,9 @@ void sys_unregister_select_event(GlobalContext *global, ErlNifEvent event, bool EV_SET(&kev, event, is_write ? EVFILT_WRITE : EVFILT_READ, EV_DELETE, 0, 0, NULL); (void) kevent(platform->kqueue_fd, &kev, 1, NULL, 0, &ts); platform->select_events_poll_count = -1; +#ifndef AVM_NO_SMP + sys_signal(global); +#endif #else UNUSED(event); UNUSED(is_write); diff --git a/src/platforms/rp2/src/lib/rp2_sys.h b/src/platforms/rp2/src/lib/rp2_sys.h index 77b2918c88..3d1393d066 100644 --- a/src/platforms/rp2/src/lib/rp2_sys.h +++ b/src/platforms/rp2/src/lib/rp2_sys.h @@ -27,7 +27,8 @@ #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wpedantic" -#include +#include +#include #include #include @@ -77,8 +78,7 @@ void sys_unregister_listener_from_event(GlobalContext *global, listener_event_t struct RP2PlatformData { #ifndef AVM_NO_SMP - mutex_t event_poll_mutex; - cond_t event_poll_cond; + semaphore_t event_poll_sem; #endif queue_t event_queue; diff --git a/src/platforms/rp2/src/lib/sys.c b/src/platforms/rp2/src/lib/sys.c index 0894d0bf1d..97311832a8 100644 --- a/src/platforms/rp2/src/lib/sys.c +++ b/src/platforms/rp2/src/lib/sys.c @@ -82,8 +82,7 @@ void sys_init_platform(GlobalContext *glb) struct RP2PlatformData *platform = malloc(sizeof(struct RP2PlatformData)); glb->platform_data = platform; #ifndef AVM_NO_SMP - mutex_init(&platform->event_poll_mutex); - cond_init(&platform->event_poll_cond); + sem_init(&platform->event_poll_sem, 0, 1); #endif queue_init(&platform->event_queue, sizeof(queue_t *), EVENT_QUEUE_LEN); @@ -152,37 +151,10 @@ bool sys_try_post_listener_event_from_isr(GlobalContext *glb, listener_event_t l return false; } -#ifndef AVM_NO_SMP - uint32_t owner; - bool acquired_mutex = mutex_try_enter(&platform->event_poll_mutex, &owner); - // We're from an ISR, so we cannot wait for the interrupted code (running - // on the same core as we do) to release the mutex. - if (!acquired_mutex) { - // If this core is not the owner, wait for the other core to release - // the mutex. - // TODO: implement queue_try_remove_wait_timeout_ms in Pico SDK to - // simplify this logic - uint32_t caller = (uint32_t) lock_get_caller_owner_id(); // same cast exists in mutex_try_enter - if (caller != owner) { - mutex_enter_blocking(&platform->event_poll_mutex); - acquired_mutex = true; - } - } -#endif if (UNLIKELY(!queue_try_add(&platform->event_queue, &listener_queue))) { -#ifndef AVM_NO_SMP - if (acquired_mutex) { - mutex_exit(&platform->event_poll_mutex); - } -#endif fprintf(stderr, "Lost event from ISR as global event queue is full. System is overloaded or EVENT_QUEUE_LEN is too low\n"); return false; } -#ifndef AVM_NO_SMP - if (acquired_mutex) { - mutex_exit(&platform->event_poll_mutex); - } -#endif #ifndef AVM_NO_SMP sys_signal(glb); @@ -202,16 +174,12 @@ void sys_poll_events(GlobalContext *glb, int timeout_ms) sys_tinyusb_unlock(glb); #endif #ifndef AVM_NO_SMP - if (timeout_ms != 0) { - mutex_enter_blocking(&platform->event_poll_mutex); - if (queue_is_empty(&platform->event_queue)) { - if (timeout_ms > 0) { - cond_wait_timeout_ms(&platform->event_poll_cond, &platform->event_poll_mutex, timeout_ms); - } else { - cond_wait(&platform->event_poll_cond, &platform->event_poll_mutex); - } + if (timeout_ms != 0 && queue_is_empty(&platform->event_queue)) { + if (timeout_ms > 0) { + sem_acquire_timeout_ms(&platform->event_poll_sem, timeout_ms); + } else { + sem_acquire_blocking(&platform->event_poll_sem); } - mutex_exit(&platform->event_poll_mutex); } #else UNUSED(timeout_ms); @@ -230,7 +198,7 @@ void sys_poll_events(GlobalContext *glb, int timeout_ms) void sys_signal(GlobalContext *glb) { struct RP2PlatformData *platform = glb->platform_data; - cond_signal(&platform->event_poll_cond); + sem_release(&platform->event_poll_sem); } #endif diff --git a/tests/libs/eavmlib/test_file.erl b/tests/libs/eavmlib/test_file.erl index c69a3a9f24..b87b4c8d96 100644 --- a/tests/libs/eavmlib/test_file.erl +++ b/tests/libs/eavmlib/test_file.erl @@ -166,7 +166,11 @@ test_gc(HasSelect) -> ?ASSERT_EQUALS(MemorySize8, MemorySize1), call_gc_loop(GCSubPid, close), call_gc_loop(GCSubPid, gc), - MemorySize9 = erlang:memory(binary), + % If select_stop raced the ready_output notification, the stop + % was scheduled (ERL_NIF_SELECT_STOP_SCHEDULED) and the resource + % is only released once the scheduler polling events retires the + % closed select event, so wait for memory to converge. + MemorySize9 = wait_memory_binary(MemorySize0, 100), ?ASSERT_EQUALS(MemorySize9, MemorySize0); true -> ok @@ -182,6 +186,19 @@ call_gc_loop(Pid, Message) -> {Pid, Message} -> ok end. +wait_memory_binary(Expected, Retries) -> + case erlang:memory(binary) of + Expected -> + Expected; + Other when Retries =:= 0 -> + Other; + _ -> + receive + after 10 -> ok + end, + wait_memory_binary(Expected, Retries - 1) + end. + gc_loop(Path, File) -> receive {select, _Resource, undefined, _Direction} ->