From cd630581e026d04bfaccb4ea32e169a21dc71663 Mon Sep 17 00:00:00 2001 From: Paul Guyot Date: Sun, 7 Jun 2026 10:09:26 +0200 Subject: [PATCH 1/7] smp: avoid spinlock starvation when schedulers are serialized This is especially useful with valgrind and on CI Signed-off-by: Paul Guyot --- src/libAtomVM/CMakeLists.txt | 1 + src/libAtomVM/smp.h | 30 ++++++++++++++++++++++++++++-- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/src/libAtomVM/CMakeLists.txt b/src/libAtomVM/CMakeLists.txt index 37c2e02bc4..eba7b1fba4 100644 --- a/src/libAtomVM/CMakeLists.txt +++ b/src/libAtomVM/CMakeLists.txt @@ -261,6 +261,7 @@ define_if_function_exists(libAtomVM execve "unistd.h" PRIVATE HAVE_EXECVE) define_if_function_exists(libAtomVM tcgetattr "termios.h" PRIVATE HAVE_TCGETATTR) define_if_function_exists(libAtomVM closefrom "unistd.h" PRIVATE HAVE_CLOSEFROM) define_if_function_exists(libAtomVM getcwd "unistd.h" PUBLIC HAVE_GETCWD) +define_if_function_exists(libAtomVM sched_yield "sched.h" PUBLIC HAVE_SCHED_YIELD) define_if_symbol_exists(libAtomVM POSIX_SPAWN_CLOEXEC_DEFAULT "spawn.h" PRIVATE HAVE_POSIX_SPAWN_CLOEXEC_DEFAULT) define_if_symbol_exists(libAtomVM O_CLOEXEC "fcntl.h" PRIVATE HAVE_O_CLOEXEC) define_if_symbol_exists(libAtomVM O_DIRECTORY "fcntl.h" PRIVATE HAVE_O_DIRECTORY) diff --git a/src/libAtomVM/smp.h b/src/libAtomVM/smp.h index 33416f0324..1a6399a99d 100644 --- a/src/libAtomVM/smp.h +++ b/src/libAtomVM/smp.h @@ -207,16 +207,42 @@ static inline void smp_spinlock_init(SpinLock *lock) lock->lock = 0; } +#if !defined(SMP_SPIN_YIELD) && defined(HAVE_SCHED_YIELD) +#include +#define SMP_SPIN_YIELD() ((void) sched_yield()) +#endif + +#if defined(SMP_SPIN_YIELD) && !defined(SMP_SPIN_YIELD_INTERVAL) +/* + * Number of failed CAS attempts between two yields. Low enough to let + * serialized scheduler threads make progress under Valgrind/CI, while avoiding + * a syscall on very short transient contention. + */ +#define SMP_SPIN_YIELD_INTERVAL 64U +#endif + /** * @brief Lock a spinlock. * @param lock the spin lock to lock */ static inline void smp_spinlock_lock(SpinLock *lock) { +#ifdef SMP_SPIN_YIELD + unsigned int spins = 0; +#endif int current; - do { + while (true) { current = 0; - } while (!ATOMIC_COMPARE_EXCHANGE_WEAK_INT(&lock->lock, ¤t, 1)); + if (ATOMIC_COMPARE_EXCHANGE_WEAK_INT(&lock->lock, ¤t, 1)) { + return; + } +#ifdef SMP_SPIN_YIELD + if (++spins >= SMP_SPIN_YIELD_INTERVAL) { + spins = 0; + SMP_SPIN_YIELD(); + } +#endif + } } /** From 3d41ee6a980adea06d701bb82f6c5739cd98435e Mon Sep 17 00:00:00 2001 From: Peter M Date: Wed, 4 Mar 2026 13:10:22 +0100 Subject: [PATCH 2/7] Fix use-after-free race in socket driver close MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit https://ampcode.com/threads/T-019cb8b8-9e4c-7316-9566-c7e3f5f2b6db Fix a use-after-free race condition in the generic_unix socket driver's close handler, detected by Valgrind during CI gen_tcp tests. The close handler in socket_consume_mailbox used a two-phase locking pattern: it acquired the glb->listeners lock to NULL-out the socket_data listener pointers, released it, then called sys_unregister_listener (which re-acquires the lock) to remove the listener from the linked list. Between the unlock and re-lock, the event loop thread could also unlink the same listener node via process_listener_handler after the callback returned NULL. The subsequent list_remove in sys_unregister_listener then operated on stale prev/next pointers, corrupting the list or writing to freed memory. The fix makes the pointer detach and list unlink atomic under a single lock hold by introducing sys_unregister_listener_nolock — a variant that assumes the caller already holds the glb->listeners write lock. The close handler now NULLs the pointers, unlinks the listeners, and releases the lock before freeing the memory. This pattern is specific to generic_unix; ESP32 and RP2 use a single global listener for the socket driver subsystem and are not affected. Signed-off-by: Peter M --- .../generic_unix/lib/socket_driver.c | 29 +++++++++---------- src/platforms/generic_unix/lib/sys.c | 6 ++++ 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/src/platforms/generic_unix/lib/socket_driver.c b/src/platforms/generic_unix/lib/socket_driver.c index 7e9439622d..7ff51b31e0 100644 --- a/src/platforms/generic_unix/lib/socket_driver.c +++ b/src/platforms/generic_unix/lib/socket_driver.c @@ -47,6 +47,8 @@ #include #include +void sys_unregister_listener_nolock(GlobalContext *global, struct EventListener *listener); + // #define ENABLE_TRACE #include "trace.h" @@ -1194,31 +1196,26 @@ static NativeHandlerResult socket_consume_mailbox(Context *ctx) TRACE("close\n"); port_send_reply(ctx, pid, ref, OK_ATOM); SocketDriverData *socket_data = (SocketDriverData *) ctx->platform_data; - // Callbacks (active_recv_callback, passive_recv_callback) are called - // while glb->listeners lock is held. They may want to free the - // listener, causing a potential double free here. - // We acquire the lock on listeners here and set the listeners - // to NULL in the socket_data structure to prevent them from freeing - // the listeners. + // Callbacks (active_recv_callback, passive_recv_callback, accept_callback) + // are called while glb->listeners lock is held. They may free the + // listener and set the socket_data pointer to NULL. + // We must atomically detach the pointers AND unlink from the listeners + // list under the same lock hold, to prevent a race where the callback + // also unlinks the same listener node. synclist_wrlock(&glb->listeners); ActiveRecvListener *active_listener = socket_data->active_listener; PassiveRecvListener *passive_listener = socket_data->passive_listener; socket_data->active_listener = NULL; socket_data->passive_listener = NULL; - synclist_unlock(&glb->listeners); if (active_listener) { - // Then we unregister, which also acquires the lock. The callbacks - // may have returned NULL which means the listener would no longer - // be registered, but this will work. - sys_unregister_listener(glb, &active_listener->base); - // After the listener is unregistered, the callbacks can no longer - // be called, so we can eventually free the listener - free(active_listener); + sys_unregister_listener_nolock(glb, &active_listener->base); } if (passive_listener) { - sys_unregister_listener(glb, &passive_listener->base); - free(passive_listener); + sys_unregister_listener_nolock(glb, &passive_listener->base); } + synclist_unlock(&glb->listeners); + free(active_listener); + free(passive_listener); socket_driver_do_close(ctx); // We don't need to remove message. return NativeTerminate; diff --git a/src/platforms/generic_unix/lib/sys.c b/src/platforms/generic_unix/lib/sys.c index ce6a032fc9..1f78174d6d 100644 --- a/src/platforms/generic_unix/lib/sys.c +++ b/src/platforms/generic_unix/lib/sys.c @@ -698,6 +698,12 @@ void sys_unregister_listener(GlobalContext *global, struct EventListener *listen synclist_remove(&global->listeners, &listener->listeners_list_head); } +void sys_unregister_listener_nolock(GlobalContext *global, struct EventListener *listener) +{ + listener_event_remove_from_polling_set(listener->fd, global); + list_remove(&listener->listeners_list_head); +} + void sys_register_select_event(GlobalContext *global, ErlNifEvent event, bool is_write) { struct GenericUnixPlatformData *platform = global->platform_data; From 979bb8c4dfba3ba64defce1c13aeef4ea58915b2 Mon Sep 17 00:00:00 2001 From: Peter M Date: Wed, 11 Mar 2026 15:45:32 +0100 Subject: [PATCH 3/7] Fix listener publication race in socket driver That commit fixed the close-time double-unlink/use-after-free race in generic_unix listener teardown. This change addresses a separate race in listener registration, where a listener could become visible to the event loop before socket_data published the corresponding pointer. Both fixes are needed; this patch complements the earlier teardown fix rather than replacing it. Fix a race in the generic_unix socket driver where newly created listeners were registered in the global listener list before the corresponding socket_data->{active,passive}_listener pointer was published. If the event loop processed the listener in that window, the callback could consume, free, or replace the listener before the socket driver stored the pointer. The later assignment then left socket_data pointing at stale listener memory, which could surface as random hangs or corruption in gen_tcp tests, including timeouts waiting for the server helper process to start. Publish the listener pointer before calling sys_register_listener in all affected paths: active UDP receive listener setup active TCP receive listener setup passive recv/recvfrom listener setup accept listener setup This complements the earlier close-path fix by removing another generic_unix listener lifecycle race. Signed-off-by: Peter M --- src/platforms/generic_unix/lib/socket_driver.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/platforms/generic_unix/lib/socket_driver.c b/src/platforms/generic_unix/lib/socket_driver.c index 7ff51b31e0..ffa0b7eb7d 100644 --- a/src/platforms/generic_unix/lib/socket_driver.c +++ b/src/platforms/generic_unix/lib/socket_driver.c @@ -255,8 +255,8 @@ static term init_udp_socket(Context *ctx, SocketDriverData *socket_data, term pa listener->base.handler = active_recvfrom_callback; listener->buf_size = socket_data->buffer; listener->process_id = ctx->process_id; - sys_register_listener(glb, &listener->base); socket_data->active_listener = listener; + sys_register_listener(glb, &listener->base); } } return ret; @@ -340,8 +340,8 @@ static term init_client_tcp_socket(Context *ctx, SocketDriverData *socket_data, listener->base.handler = active_recv_callback; listener->buf_size = socket_data->buffer; listener->process_id = ctx->process_id; - sys_register_listener(glb, &listener->base); socket_data->active_listener = listener; + sys_register_listener(glb, &listener->base); } } return ret; @@ -1017,8 +1017,8 @@ static void do_recv(Context *ctx, term pid, term ref, term length, term timeout, listener->length = term_to_int(length); listener->buffer = socket_data->buffer; listener->ref_ticks = term_to_ref_ticks(ref); - sys_register_listener(glb, &listener->base); socket_data->passive_listener = listener; + sys_register_listener(glb, &listener->base); } void socket_driver_do_recvfrom(Context *ctx, term pid, term ref, term length, term timeout) @@ -1119,8 +1119,8 @@ void socket_driver_do_accept(Context *ctx, term pid, term ref, term timeout) listener->length = 0; listener->buffer = 0; listener->ref_ticks = term_to_ref_ticks(ref); - sys_register_listener(glb, &listener->base); socket_data->passive_listener = listener; + sys_register_listener(glb, &listener->base); } static NativeHandlerResult socket_consume_mailbox(Context *ctx) From 96ea1fdf19f6d6f146955ab9567d652925ce4d31 Mon Sep 17 00:00:00 2001 From: Peter M Date: Sat, 14 Mar 2026 18:34:44 +0100 Subject: [PATCH 4/7] Set accepted sockets nonblocking Configure newly accepted generic_unix TCP sockets as nonblocking before publishing them to the socket driver machinery. If fcntl fails, close the accepted fd and reply with an error so callers never observe a partially initialized connection. Signed-off-by: Peter M --- src/platforms/generic_unix/lib/socket_driver.c | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/platforms/generic_unix/lib/socket_driver.c b/src/platforms/generic_unix/lib/socket_driver.c index ffa0b7eb7d..b9daa38229 100644 --- a/src/platforms/generic_unix/lib/socket_driver.c +++ b/src/platforms/generic_unix/lib/socket_driver.c @@ -1060,6 +1060,21 @@ static EventListener *accept_callback(GlobalContext *glb, EventListener *base_li TRACE("socket_driver|accept_callback: accepted connection. fd: %i\n", fd); term pid = listener->pid; + if (UNLIKELY(fcntl(fd, F_SETFL, O_NONBLOCK) == -1)) { + int err = errno; + close(fd); + BEGIN_WITH_STACK_HEAP(12, heap); + term ref = term_from_ref_ticks(listener->ref_ticks, &heap); + term reply = port_heap_create_reply(&heap, ref, port_heap_create_sys_error_tuple(&heap, FCNTL_ATOM, err)); + port_send_message_nolock(glb, pid, reply); + END_WITH_STACK_HEAP(heap, glb); + globalcontext_get_process_unlock(glb, ctx); + if (socket_data->passive_listener) { + socket_data->passive_listener = NULL; + free(listener); + } + return NULL; + } SocketDriverData *new_socket_data = socket_driver_create_data(); new_socket_data->sockfd = fd; new_socket_data->proto = socket_data->proto; From 1f9a3b36e9ed4d6f8b1d5703bbae14b3902be7aa Mon Sep 17 00:00:00 2001 From: Peter M Date: Sat, 14 Mar 2026 20:36:00 +0100 Subject: [PATCH 5/7] Fix NULL dereference in accept_callback when process terminates If the owning process terminates between the accept() call and globalcontext_get_process_lock(), ctx will be NULL. The code immediately dereferences ctx->platform_data without checking, causing a segfault. Add a NULL check consistent with other callbacks (e.g. recv_callback), closing the accepted fd if needed and freeing the listener before returning. Signed-off-by: Peter M --- src/platforms/generic_unix/lib/socket_driver.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/platforms/generic_unix/lib/socket_driver.c b/src/platforms/generic_unix/lib/socket_driver.c index b9daa38229..f7f35ff2e3 100644 --- a/src/platforms/generic_unix/lib/socket_driver.c +++ b/src/platforms/generic_unix/lib/socket_driver.c @@ -1046,6 +1046,13 @@ static EventListener *accept_callback(GlobalContext *glb, EventListener *base_li socklen_t clientlen = sizeof(clientaddr); int fd = accept(listener->base.fd, (struct sockaddr *) &clientaddr, &clientlen); Context *ctx = globalcontext_get_process_lock(glb, listener->process_id); + if (UNLIKELY(ctx == NULL)) { + if (fd != -1) { + close(fd); + } + free(listener); + return NULL; + } SocketDriverData *socket_data = (SocketDriverData *) ctx->platform_data; EventListener *result = NULL; if (fd == -1) { From 481621f1a2320c981eee539934030182c8194e40 Mon Sep 17 00:00:00 2001 From: Peter M Date: Sat, 6 Jun 2026 08:13:21 +0200 Subject: [PATCH 6/7] Fix listener lifetime races in socket callbacks Serialize listener pointer publication and global listener registration under the listeners lock, so callbacks cannot observe a partially published listener or erase a replacement listener. Callbacks always free the listener being dispatched, while clearing socket_data only when it still points to that listener. Keep socket_data accesses under the process-table lock and clean up accepted socket contexts if the listening process terminates during callback processing. Signed-off-by: Peter M --- .../generic_unix/lib/socket_driver.c | 60 ++++++++++++------- src/platforms/generic_unix/lib/sys.c | 22 ++++--- 2 files changed, 52 insertions(+), 30 deletions(-) diff --git a/src/platforms/generic_unix/lib/socket_driver.c b/src/platforms/generic_unix/lib/socket_driver.c index f7f35ff2e3..dd9228c0d2 100644 --- a/src/platforms/generic_unix/lib/socket_driver.c +++ b/src/platforms/generic_unix/lib/socket_driver.c @@ -47,6 +47,7 @@ #include #include +void sys_register_listener_nolock(GlobalContext *global, struct EventListener *listener); void sys_unregister_listener_nolock(GlobalContext *global, struct EventListener *listener); // #define ENABLE_TRACE @@ -85,6 +86,22 @@ typedef struct SocketDriverData PassiveRecvListener *passive_listener; } SocketDriverData; +static void register_active_listener(GlobalContext *glb, SocketDriverData *socket_data, ActiveRecvListener *listener) +{ + synclist_wrlock(&glb->listeners); + socket_data->active_listener = listener; + sys_register_listener_nolock(glb, &listener->base); + synclist_unlock(&glb->listeners); +} + +static void register_passive_listener(GlobalContext *glb, SocketDriverData *socket_data, PassiveRecvListener *listener) +{ + synclist_wrlock(&glb->listeners); + socket_data->passive_listener = listener; + sys_register_listener_nolock(glb, &listener->base); + synclist_unlock(&glb->listeners); +} + // clang-format off // TODO define in defaultatoms const char *const send_a = "\x4" "send"; @@ -255,8 +272,7 @@ static term init_udp_socket(Context *ctx, SocketDriverData *socket_data, term pa listener->base.handler = active_recvfrom_callback; listener->buf_size = socket_data->buffer; listener->process_id = ctx->process_id; - socket_data->active_listener = listener; - sys_register_listener(glb, &listener->base); + register_active_listener(glb, socket_data, listener); } } return ret; @@ -340,8 +356,7 @@ static term init_client_tcp_socket(Context *ctx, SocketDriverData *socket_data, listener->base.handler = active_recv_callback; listener->buf_size = socket_data->buffer; listener->process_id = ctx->process_id; - socket_data->active_listener = listener; - sys_register_listener(glb, &listener->base); + register_active_listener(glb, socket_data, listener); } } return ret; @@ -733,10 +748,10 @@ static EventListener *active_recv_callback(GlobalContext *glb, EventListener *ba port_send_message_nolock(glb, pid, msg); mailbox_send(ctx, globalcontext_make_atom(glb, close_internal)); // See socket_consume_mailbox close path below - if (socket_data->active_listener) { + if (socket_data->active_listener == listener) { socket_data->active_listener = NULL; - free(listener); } + free(listener); result = NULL; END_WITH_STACK_HEAP(heap, glb); } else { @@ -837,12 +852,12 @@ static EventListener *passive_recv_callback(GlobalContext *glb, EventListener *b port_send_message_nolock(glb, pid, reply); memory_destroy_heap(&heap, glb); } - globalcontext_get_process_unlock(glb, ctx); // See socket_consume_mailbox close path below - if (socket_data->passive_listener) { + if (socket_data->passive_listener == listener) { socket_data->passive_listener = NULL; - free(listener); } + globalcontext_get_process_unlock(glb, ctx); + free(listener); free(buf); // remove the EventListener from the global list return NULL; @@ -977,12 +992,12 @@ static EventListener *passive_recvfrom_callback(GlobalContext *glb, EventListene port_send_message_nolock(glb, pid, reply); memory_destroy_heap(&heap, glb); } - globalcontext_get_process_unlock(glb, ctx); // See socket_consume_mailbox close path below - if (socket_data->passive_listener) { + if (socket_data->passive_listener == listener) { socket_data->passive_listener = NULL; - free(listener); } + globalcontext_get_process_unlock(glb, ctx); + free(listener); free(buf); // remove the EventListener from the global list and clean up return NULL; @@ -1017,8 +1032,7 @@ static void do_recv(Context *ctx, term pid, term ref, term length, term timeout, listener->length = term_to_int(length); listener->buffer = socket_data->buffer; listener->ref_ticks = term_to_ref_ticks(ref); - socket_data->passive_listener = listener; - sys_register_listener(glb, &listener->base); + register_passive_listener(glb, socket_data, listener); } void socket_driver_do_recvfrom(Context *ctx, term pid, term ref, term length, term timeout) @@ -1075,11 +1089,11 @@ static EventListener *accept_callback(GlobalContext *glb, EventListener *base_li term reply = port_heap_create_reply(&heap, ref, port_heap_create_sys_error_tuple(&heap, FCNTL_ATOM, err)); port_send_message_nolock(glb, pid, reply); END_WITH_STACK_HEAP(heap, glb); - globalcontext_get_process_unlock(glb, ctx); - if (socket_data->passive_listener) { + if (socket_data->passive_listener == listener) { socket_data->passive_listener = NULL; - free(listener); } + globalcontext_get_process_unlock(glb, ctx); + free(listener); return NULL; } SocketDriverData *new_socket_data = socket_driver_create_data(); @@ -1094,9 +1108,12 @@ static EventListener *accept_callback(GlobalContext *glb, EventListener *base_li Context *new_ctx = create_accepting_socket(glb, new_socket_data); ctx = globalcontext_get_process_lock(glb, listener->process_id); if (UNLIKELY(ctx == NULL)) { + socket_driver_do_close(new_ctx); + scheduler_terminate(new_ctx); free(listener); return NULL; } + socket_data = (SocketDriverData *) ctx->platform_data; if (new_socket_data->active) { result = &create_accepting_socket_listener(new_ctx, new_socket_data)->base; } @@ -1110,12 +1127,12 @@ static EventListener *accept_callback(GlobalContext *glb, EventListener *base_li port_send_message_nolock(glb, pid, reply); END_WITH_STACK_HEAP(heap, glb); } - globalcontext_get_process_unlock(glb, ctx); // See socket_consume_mailbox close path below - if (socket_data->passive_listener) { + if (socket_data->passive_listener == listener) { socket_data->passive_listener = NULL; - free(listener); } + globalcontext_get_process_unlock(glb, ctx); + free(listener); // remove the EventListener from the global list and replace it if needed return result; } @@ -1141,8 +1158,7 @@ void socket_driver_do_accept(Context *ctx, term pid, term ref, term timeout) listener->length = 0; listener->buffer = 0; listener->ref_ticks = term_to_ref_ticks(ref); - socket_data->passive_listener = listener; - sys_register_listener(glb, &listener->base); + register_passive_listener(glb, socket_data, listener); } static NativeHandlerResult socket_consume_mailbox(Context *ctx) diff --git a/src/platforms/generic_unix/lib/sys.c b/src/platforms/generic_unix/lib/sys.c index 1f78174d6d..64ecf4f643 100644 --- a/src/platforms/generic_unix/lib/sys.c +++ b/src/platforms/generic_unix/lib/sys.c @@ -660,9 +660,8 @@ void event_listener_add_to_polling_set(struct EventListener *listener, GlobalCon #endif } -void sys_register_listener(GlobalContext *global, struct EventListener *listener) +void sys_register_listener_nolock(GlobalContext *global, struct EventListener *listener) { - struct ListHead *listeners = synclist_wrlock(&global->listeners); event_listener_add_to_polling_set(listener, global); #ifndef AVM_NO_SMP #ifndef HAVE_KQUEUE @@ -670,7 +669,13 @@ void sys_register_listener(GlobalContext *global, struct EventListener *listener sys_signal(global); #endif #endif - list_append(listeners, &listener->listeners_list_head); + list_append(synclist_nolock(&global->listeners), &listener->listeners_list_head); +} + +void sys_register_listener(GlobalContext *global, struct EventListener *listener) +{ + synclist_wrlock(&global->listeners); + sys_register_listener_nolock(global, listener); synclist_unlock(&global->listeners); } @@ -692,16 +697,17 @@ static void listener_event_remove_from_polling_set(listener_event_t listener_fd, #endif } -void sys_unregister_listener(GlobalContext *global, struct EventListener *listener) +void sys_unregister_listener_nolock(GlobalContext *global, struct EventListener *listener) { listener_event_remove_from_polling_set(listener->fd, global); - synclist_remove(&global->listeners, &listener->listeners_list_head); + list_remove(&listener->listeners_list_head); } -void sys_unregister_listener_nolock(GlobalContext *global, struct EventListener *listener) +void sys_unregister_listener(GlobalContext *global, struct EventListener *listener) { - listener_event_remove_from_polling_set(listener->fd, global); - list_remove(&listener->listeners_list_head); + synclist_wrlock(&global->listeners); + sys_unregister_listener_nolock(global, listener); + synclist_unlock(&global->listeners); } void sys_register_select_event(GlobalContext *global, ErlNifEvent event, bool is_write) From c1ea3dc473a635b80a8989179a91c94a29175552 Mon Sep 17 00:00:00 2001 From: Paul Guyot Date: Sat, 6 Jun 2026 08:05:16 +0200 Subject: [PATCH 7/7] scheduler: avoid redundant scheduler wake on process handoff If SMP is enabled, instead of waking the polling scheduler, grab the first ready process immediately if any. Also simplify scheduler_make_ready a little bit to reduce the number of locks. Also use semaphores on esp32 and rp2 to avoid saturating with signals. Also add a missing signal on macOS to avoid resources cleanup waiting for an unrelated wake. Since schedulers no longer poll events at every iteration, resources selected with enif_select and stopped with ERL_NIF_SELECT_STOP_SCHEDULED are now released asynchronously by the scheduler polling events, staying within the boundaries of the BEAM enif_select specification. Adjust test_file's test_gc accordingly. Signed-off-by: Paul Guyot --- CHANGELOG.md | 3 + src/libAtomVM/scheduler.c | 94 ++++++++++++-------- src/platforms/esp32/components/avm_sys/sys.c | 26 +++--- src/platforms/generic_unix/lib/sys.c | 3 + src/platforms/rp2/src/lib/rp2_sys.h | 6 +- src/platforms/rp2/src/lib/sys.c | 46 ++-------- tests/libs/eavmlib/test_file.erl | 19 +++- 7 files changed, 108 insertions(+), 89 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 29b81ae4e4..2fac7df2fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 longer lines return `{error, {parser, {line_too_long, Prefix}}}` with the first 128 bytes of the offending line. Callers whose upstream servers emit unusually large headers must account for this limit +- Improved performance of SMP scheduler. As a result, resources selected with `enif_select` and + stopped with the `ERL_NIF_SELECT_STOP_SCHEDULED` result are now released asynchronously by the + scheduler polling events, staying within the boundaries of the BEAM `enif_select` specification ### Removed - Removed `ahttp_client` support for obsolete line folding (RFC 9112 §5.2); folded header and diff --git a/src/libAtomVM/scheduler.c b/src/libAtomVM/scheduler.c index a856d60012..da3dbb3b1c 100644 --- a/src/libAtomVM/scheduler.c +++ b/src/libAtomVM/scheduler.c @@ -152,6 +152,32 @@ static void scheduler_process_native_signal_messages(Context *ctx) } } +static Context *scheduler_first_runnable_ready(GlobalContext *global) +{ + Context *result = NULL; + SMP_SPINLOCK_LOCK(&global->processes_spinlock); + // Pick first ready which is not running. + struct ListHead *next_ready = list_first(&global->ready_processes); + while (next_ready != &global->ready_processes) { + result = GET_LIST_ENTRY(next_ready, Context, processes_list_head); + if (!(result->flags & Running)) { + list_remove(next_ready); + context_update_flags(result, ~Ready, Running); + if (result->native_handler) { + // Native handlers are marked as waiting + list_append(&global->waiting_processes, next_ready); + } else { + list_append(&global->running_processes, next_ready); + } + break; + } + next_ready = next_ready->next; + result = NULL; + } + SMP_SPINLOCK_UNLOCK(&global->processes_spinlock); + return result; +} + static Context *scheduler_run0(GlobalContext *global) { // This function should return a new process to run. @@ -209,6 +235,13 @@ static Context *scheduler_run0(GlobalContext *global) return NULL; } if (!is_waiting) { + // If a process is ready, process it instead of waking up + // the poller scheduler + result = scheduler_first_runnable_ready(global); + if (result != NULL) { + break; + } + // Before entering the condition variable, signal the poll events // so the thread polling on events can check the ready queue. sys_signal(global); @@ -232,32 +265,23 @@ static Context *scheduler_run0(GlobalContext *global) int32_t wait_timeout = update_timer_list(global); SMP_SPINLOCK_UNLOCK(&global->timer_spinlock); - SMP_SPINLOCK_LOCK(&global->processes_spinlock); - // Pick first ready which is not running. - struct ListHead *next_ready = list_first(&global->ready_processes); - while (next_ready != &global->ready_processes) { - result = GET_LIST_ENTRY(next_ready, Context, processes_list_head); - if (!(result->flags & Running)) { - list_remove(next_ready); - context_update_flags(result, ~Ready, Running); - if (result->native_handler) { - // Native handlers are marked as waiting - list_append(&global->waiting_processes, next_ready); - } else { - list_append(&global->running_processes, next_ready); - } - break; - } - next_ready = next_ready->next; - result = NULL; + if (result == NULL) { + result = scheduler_first_runnable_ready(global); } - SMP_SPINLOCK_UNLOCK(&global->processes_spinlock); - if (result == NULL && !global->scheduler_stop_all) { - sys_poll_events(global, wait_timeout); - } else { - sys_poll_events(global, SYS_POLL_EVENTS_DO_NOT_WAIT); + // Only the poller scheduler drives the event loop. +#ifndef AVM_NO_SMP + if (is_waiting) { +#endif + if (result == NULL && !global->scheduler_stop_all) { + // The poller may block waiting for events. + sys_poll_events(global, wait_timeout); + } else { + sys_poll_events(global, SYS_POLL_EVENTS_DO_NOT_WAIT); + } +#ifndef AVM_NO_SMP } +#endif #ifdef AVM_TASK_DRIVER_ENABLED globalcontext_process_task_driver_queues(global); #endif @@ -265,8 +289,11 @@ static Context *scheduler_run0(GlobalContext *global) } while (result == NULL); #ifndef AVM_NO_SMP - global->waiting_scheduler = false; - smp_condvar_signal(global->schedulers_cv); + // Only the polling scheduler relinquishes the poller role. + if (is_waiting) { + global->waiting_scheduler = false; + smp_condvar_signal(global->schedulers_cv); + } SMP_MUTEX_UNLOCK(global->schedulers_mutex); #endif @@ -356,6 +383,12 @@ static void scheduler_make_ready(Context *ctx) return; } list_remove(&ctx->processes_list_head); + // Move to ready queue (from waiting or running) + // The process may be running (it would be signaled), so mark it + // as ready + context_update_flags(ctx, ~NoFlags, Ready); + list_append(&global->ready_processes, &ctx->processes_list_head); + SMP_SPINLOCK_UNLOCK(&global->processes_spinlock); #ifndef AVM_NO_SMP if (SMP_MUTEX_TRYLOCK(global->schedulers_mutex)) { // Start a new scheduler if none are going to take this process. @@ -366,17 +399,6 @@ static void scheduler_make_ready(Context *ctx) global->running_schedulers++; smp_scheduler_start(global); } - SMP_MUTEX_UNLOCK(global->schedulers_mutex); - } -#endif - // Move to ready queue (from waiting or running) - // The process may be running (it would be signaled), so mark it - // as ready - context_update_flags(ctx, ~NoFlags, Ready); - list_append(&global->ready_processes, &ctx->processes_list_head); - SMP_SPINLOCK_UNLOCK(&global->processes_spinlock); -#ifndef AVM_NO_SMP - if (SMP_MUTEX_TRYLOCK(global->schedulers_mutex)) { if (global->waiting_scheduler) { sys_signal(global); } diff --git a/src/platforms/esp32/components/avm_sys/sys.c b/src/platforms/esp32/components/avm_sys/sys.c index 42786b6c8e..606b26b8fb 100644 --- a/src/platforms/esp32/components/avm_sys/sys.c +++ b/src/platforms/esp32/components/avm_sys/sys.c @@ -39,6 +39,7 @@ #include "esp_system.h" #include "esp_timer.h" #include "freertos/FreeRTOS.h" +#include "freertos/semphr.h" #include "freertos/task.h" #include #include @@ -112,12 +113,16 @@ static const char *const revision_atom = "\x8" "revision"; QueueHandle_t event_queue = NULL; QueueSetHandle_t event_set = NULL; +static SemaphoreHandle_t signal_semaphore = NULL; void esp32_sys_queue_init() { - event_set = xQueueCreateSet(EVENT_QUEUE_LEN * 4); + // + 1 accounts for the signal binary semaphore + event_set = xQueueCreateSet(EVENT_QUEUE_LEN * 4 + 1); event_queue = xQueueCreate(EVENT_QUEUE_LEN, sizeof(void *)); xQueueAddToSet(event_queue, event_set); + signal_semaphore = xSemaphoreCreateBinary(); + xQueueAddToSet(signal_semaphore, event_set); } static inline void sys_clock_gettime(struct timespec *t) @@ -132,6 +137,14 @@ static void receive_events(GlobalContext *glb, TickType_t wait_ticks) void *sender = NULL; QueueSetMemberHandle_t event_source; while ((event_source = xQueueSelectFromSet(event_set, wait_ticks))) { +#if !defined(AVM_NO_SMP) || defined(AVM_TASK_DRIVER_ENABLED) + if (event_source == signal_semaphore) { + // We've been signaled + xSemaphoreTake(signal_semaphore, 0); + return; + } +#endif + // Listener used shared event_queue. if (event_source == event_queue) { if (UNLIKELY(xQueueReceive(event_queue, &sender, 0) == pdFALSE)) { @@ -141,13 +154,6 @@ static void receive_events(GlobalContext *glb, TickType_t wait_ticks) sender = event_source; } -#if !defined(AVM_NO_SMP) || defined(AVM_TASK_DRIVER_ENABLED) - if (sender == CAST_FUNC_TO_VOID_PTR(sys_signal)) { - // We've been signaled - return; - } -#endif - struct ListHead *listeners = synclist_wrlock(&glb->listeners); if (!process_listener_handler(glb, sender, listeners, NULL, NULL)) { TRACE("sys: handler not found for: %p\n", (void *) sender); @@ -170,8 +176,8 @@ void sys_poll_events(GlobalContext *glb, int timeout_ms) void sys_signal(GlobalContext *glb) { - void *queue_item = CAST_FUNC_TO_VOID_PTR(sys_signal); - xQueueSendToBack(event_queue, &queue_item, 0); + UNUSED(glb); + xSemaphoreGive(signal_semaphore); } void sys_time(struct timespec *t) diff --git a/src/platforms/generic_unix/lib/sys.c b/src/platforms/generic_unix/lib/sys.c index 64ecf4f643..5fc26dbbcb 100644 --- a/src/platforms/generic_unix/lib/sys.c +++ b/src/platforms/generic_unix/lib/sys.c @@ -741,6 +741,9 @@ void sys_unregister_select_event(GlobalContext *global, ErlNifEvent event, bool EV_SET(&kev, event, is_write ? EVFILT_WRITE : EVFILT_READ, EV_DELETE, 0, 0, NULL); (void) kevent(platform->kqueue_fd, &kev, 1, NULL, 0, &ts); platform->select_events_poll_count = -1; +#ifndef AVM_NO_SMP + sys_signal(global); +#endif #else UNUSED(event); UNUSED(is_write); diff --git a/src/platforms/rp2/src/lib/rp2_sys.h b/src/platforms/rp2/src/lib/rp2_sys.h index 77b2918c88..3d1393d066 100644 --- a/src/platforms/rp2/src/lib/rp2_sys.h +++ b/src/platforms/rp2/src/lib/rp2_sys.h @@ -27,7 +27,8 @@ #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wpedantic" -#include +#include +#include #include #include @@ -77,8 +78,7 @@ void sys_unregister_listener_from_event(GlobalContext *global, listener_event_t struct RP2PlatformData { #ifndef AVM_NO_SMP - mutex_t event_poll_mutex; - cond_t event_poll_cond; + semaphore_t event_poll_sem; #endif queue_t event_queue; diff --git a/src/platforms/rp2/src/lib/sys.c b/src/platforms/rp2/src/lib/sys.c index 0894d0bf1d..97311832a8 100644 --- a/src/platforms/rp2/src/lib/sys.c +++ b/src/platforms/rp2/src/lib/sys.c @@ -82,8 +82,7 @@ void sys_init_platform(GlobalContext *glb) struct RP2PlatformData *platform = malloc(sizeof(struct RP2PlatformData)); glb->platform_data = platform; #ifndef AVM_NO_SMP - mutex_init(&platform->event_poll_mutex); - cond_init(&platform->event_poll_cond); + sem_init(&platform->event_poll_sem, 0, 1); #endif queue_init(&platform->event_queue, sizeof(queue_t *), EVENT_QUEUE_LEN); @@ -152,37 +151,10 @@ bool sys_try_post_listener_event_from_isr(GlobalContext *glb, listener_event_t l return false; } -#ifndef AVM_NO_SMP - uint32_t owner; - bool acquired_mutex = mutex_try_enter(&platform->event_poll_mutex, &owner); - // We're from an ISR, so we cannot wait for the interrupted code (running - // on the same core as we do) to release the mutex. - if (!acquired_mutex) { - // If this core is not the owner, wait for the other core to release - // the mutex. - // TODO: implement queue_try_remove_wait_timeout_ms in Pico SDK to - // simplify this logic - uint32_t caller = (uint32_t) lock_get_caller_owner_id(); // same cast exists in mutex_try_enter - if (caller != owner) { - mutex_enter_blocking(&platform->event_poll_mutex); - acquired_mutex = true; - } - } -#endif if (UNLIKELY(!queue_try_add(&platform->event_queue, &listener_queue))) { -#ifndef AVM_NO_SMP - if (acquired_mutex) { - mutex_exit(&platform->event_poll_mutex); - } -#endif fprintf(stderr, "Lost event from ISR as global event queue is full. System is overloaded or EVENT_QUEUE_LEN is too low\n"); return false; } -#ifndef AVM_NO_SMP - if (acquired_mutex) { - mutex_exit(&platform->event_poll_mutex); - } -#endif #ifndef AVM_NO_SMP sys_signal(glb); @@ -202,16 +174,12 @@ void sys_poll_events(GlobalContext *glb, int timeout_ms) sys_tinyusb_unlock(glb); #endif #ifndef AVM_NO_SMP - if (timeout_ms != 0) { - mutex_enter_blocking(&platform->event_poll_mutex); - if (queue_is_empty(&platform->event_queue)) { - if (timeout_ms > 0) { - cond_wait_timeout_ms(&platform->event_poll_cond, &platform->event_poll_mutex, timeout_ms); - } else { - cond_wait(&platform->event_poll_cond, &platform->event_poll_mutex); - } + if (timeout_ms != 0 && queue_is_empty(&platform->event_queue)) { + if (timeout_ms > 0) { + sem_acquire_timeout_ms(&platform->event_poll_sem, timeout_ms); + } else { + sem_acquire_blocking(&platform->event_poll_sem); } - mutex_exit(&platform->event_poll_mutex); } #else UNUSED(timeout_ms); @@ -230,7 +198,7 @@ void sys_poll_events(GlobalContext *glb, int timeout_ms) void sys_signal(GlobalContext *glb) { struct RP2PlatformData *platform = glb->platform_data; - cond_signal(&platform->event_poll_cond); + sem_release(&platform->event_poll_sem); } #endif diff --git a/tests/libs/eavmlib/test_file.erl b/tests/libs/eavmlib/test_file.erl index c69a3a9f24..b87b4c8d96 100644 --- a/tests/libs/eavmlib/test_file.erl +++ b/tests/libs/eavmlib/test_file.erl @@ -166,7 +166,11 @@ test_gc(HasSelect) -> ?ASSERT_EQUALS(MemorySize8, MemorySize1), call_gc_loop(GCSubPid, close), call_gc_loop(GCSubPid, gc), - MemorySize9 = erlang:memory(binary), + % If select_stop raced the ready_output notification, the stop + % was scheduled (ERL_NIF_SELECT_STOP_SCHEDULED) and the resource + % is only released once the scheduler polling events retires the + % closed select event, so wait for memory to converge. + MemorySize9 = wait_memory_binary(MemorySize0, 100), ?ASSERT_EQUALS(MemorySize9, MemorySize0); true -> ok @@ -182,6 +186,19 @@ call_gc_loop(Pid, Message) -> {Pid, Message} -> ok end. +wait_memory_binary(Expected, Retries) -> + case erlang:memory(binary) of + Expected -> + Expected; + Other when Retries =:= 0 -> + Other; + _ -> + receive + after 10 -> ok + end, + wait_memory_binary(Expected, Retries - 1) + end. + gc_loop(Path, File) -> receive {select, _Resource, undefined, _Direction} ->