diff --git a/CHANGELOG.md b/CHANGELOG.md index 29b81ae4e4..4e15f31a40 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added `"USB_SERIAL_JTAG"` peripheral to the ESP32 `uart` module on chips with a built-in USB-Serial-JTAG controller (C3/C5/C6/C61/H2/H21/H4/P4/S3) - Added support for the `safe` option in `erlang:binary_to_term/2` +- Added support for process aliases: `erlang:alias/0,1`, `erlang:unalias/1`, + `erlang:monitor/3` with the `{alias, Mode}` option, `spawn_opt` `{monitor, MonitorOpts}` and + sending to an alias reference - Added xtensa JIT backend for esp32 platform - Added support for configuring pins and width for sdmmc on ESP32 - Added support for map comprehensions @@ -43,6 +46,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 longer lines return `{error, {parser, {line_too_long, Prefix}}}` with the first 128 bytes of the offending line. Callers whose upstream servers emit unusually large headers must account for this limit +- Deprecated the C macro `REF_SIZE`: use `TERM_BOXED_REFERENCE_SHORT_SIZE` for references built + from ref ticks, `TERM_BOXED_REFERENCE_PROCESS_SIZE` for process references (aliases), or + `TERM_BOXED_REFERENCE_MAX_SIZE` to fit any reference. `REF_SIZE` still expands to the short + reference size, but now emits a compiler warning ### Removed - Removed `ahttp_client` support for obsolete line folding (RFC 9112 ยง5.2); folded header and diff --git a/doc/src/distributed-erlang.md b/doc/src/distributed-erlang.md index 7d7c152365..05d2a9b87b 100644 --- a/doc/src/distributed-erlang.md +++ b/doc/src/distributed-erlang.md @@ -325,4 +325,11 @@ RPC (remote procedure call) from Erlang/OTP to AtomVM is also supported. Shell requires several OTP standard library modules. See [the example project](https://github.com/pguyot/atomvm_shell). +## Known Issues & Limitations + +- Sending to a remote process alias is not supported: a message sent from AtomVM to an alias + (a reference) of another node is silently dropped instead of being routed over distribution. + The other direction works: a message sent from a remote BEAM node to an alias of an AtomVM + process is delivered. + Please do not hesitate to file issues or pull requests for additional features. diff --git a/libs/estdlib/src/erlang.erl b/libs/estdlib/src/erlang.erl index 760063cec6..99872a94a0 100644 --- a/libs/estdlib/src/erlang.erl +++ b/libs/estdlib/src/erlang.erl @@ -104,6 +104,7 @@ make_ref/0, send/2, monitor/2, + monitor/3, demonitor/1, demonitor/2, exit/1, @@ -174,7 +175,10 @@ tl/1, trunc/1, tuple_size/1, - tuple_to_list/1 + tuple_to_list/1, + alias/0, + alias/1, + unalias/1 ]). -export_type([ @@ -212,12 +216,14 @@ | {max_heap_size, pos_integer()} | {atomvm_heap_growth, atomvm_heap_growth_strategy()} | link - | monitor. + | monitor + | {monitor, [monitor_option()]}. -type send_destination() :: pid() | port() - | atom(). + | atom() + | reference(). % Current type until we make these references -type resource() :: binary(). @@ -238,6 +244,8 @@ -type raise_stacktrace() :: [{module(), atom(), arity() | [term()]} | {function(), arity() | [term()]}] | stacktrace(). +-type monitor_option() :: {alias, explicit_unalias | demonitor | reply_demonitor}. + %%----------------------------------------------------------------------------- %% @param Time time in milliseconds after which to send the timeout message. %% @param Dest Pid or server name to which to send the timeout message. @@ -1222,8 +1230,10 @@ spawn_monitor(Module, Function, Args) -> %%----------------------------------------------------------------------------- %% @param Function function to create a process from -%% @param Options additional options. -%% @returns pid of the new process +%% @param Options additional options, see `spawn_option()'. With `monitor' +%% or `{monitor, MonitorOpts}' the new process is also monitored, see +%% `monitor/3' for the monitor options. +%% @returns pid of the new process, or `{Pid, MonitorRef}' when monitoring %% @doc Create a new process. %% @end %%----------------------------------------------------------------------------- @@ -1236,8 +1246,10 @@ spawn_opt(_Name, _Options) -> %% @param Module module of the function to create a process from %% @param Function name of the function to create a process from %% @param Args arguments to pass to the function to create a process from -%% @param Options additional options. -%% @returns pid of the new process +%% @param Options additional options, see `spawn_option()'. With `monitor' +%% or `{monitor, MonitorOpts}' the new process is also monitored, see +%% `monitor/3' for the monitor options. +%% @returns pid of the new process, or `{Pid, MonitorRef}' when monitoring %% @doc Create a new process by calling exported Function from Module with Args. %% @end %%----------------------------------------------------------------------------- @@ -1277,10 +1289,11 @@ make_ref() -> erlang:nif_error(undefined). %%----------------------------------------------------------------------------- -%% @param Pid process to send the message to +%% @param Target process, registered name or alias to send the message to %% @param Message message to send %% @returns the sent message -%% @doc Send a message to a given process +%% @doc Send a message to a given process. A message sent to a reference +%% that is not an active alias is silently dropped. %% @end %%----------------------------------------------------------------------------- -spec send(Target :: send_destination(), Message :: Message) -> Message. @@ -1306,6 +1319,32 @@ send(_Target, _Message) -> monitor(_Type, _PidOrPort) -> erlang:nif_error(undefined). +%%----------------------------------------------------------------------------- +%% @param Type type of monitor to create +%% @param PidOrPort pid or port of the object to monitor +%% @param Options monitor options +%% @returns a monitor reference +%% @doc Creates a monitor and allows passing additional options. +%% Currently, only the `{alias, AliasMode}' option is supported. Passing it +%% makes the monitor also an alias on the calling process (see `alias/0'). +%% `AliasMode' defines the behaviour of the alias: +%% - explicit_unalias - the alias can be only removed with `unalias/1', +%% - demonitor - the alias is also removed when the monitor is removed, +%% by `demonitor/1' or by the delivery of a `DOWN' message, +%% - reply_demonitor - additionally, the alias is deactivated and the +%% monitor removed (as by `demonitor/1') when the +%% first message sent via the alias is delivered. +%% +%% Note: Unlike Erlang/OTP, the `{tag, Term}' option is not +%% supported and raises `unsupported'. +%% @end +%%----------------------------------------------------------------------------- +-spec monitor + (Type :: process, Pid :: pid() | atom(), [monitor_option()]) -> reference(); + (Type :: port, Port :: port() | atom(), [monitor_option()]) -> reference(). +monitor(_Type, _PidOrPort, _Options) -> + erlang:nif_error(undefined). + %%----------------------------------------------------------------------------- %% @param Monitor reference of monitor to remove %% @returns `true' @@ -2147,3 +2186,41 @@ tuple_size(_Tuple) -> -spec tuple_to_list(Tuple :: tuple()) -> [term()]. tuple_to_list(_Tuple) -> erlang:nif_error(undefined). + +%%----------------------------------------------------------------------------- +%% @returns A reference aliasing the calling process. +%% @doc Creates an alias for the calling process. The alias can be used +%% to send messages to the process like the PID. The alias can also be +%% created along with a monitor - see `monitor/3'. The alias can be +%% removed by calling `unalias/1'. +%% @end +%%----------------------------------------------------------------------------- +-spec alias() -> Alias when Alias :: reference(). +alias() -> + erlang:nif_error(undefined). + +%%----------------------------------------------------------------------------- +%% @param Options alias options +%% @returns A reference aliasing the calling process. +%% @doc Creates an alias for the calling process, like `alias/0'. +%% With `explicit_unalias' (the default, so `alias([])' is `alias/0') +%% the alias stays active until `unalias/1'; with `reply' it is +%% deactivated when the first message sent via the alias is delivered. +%% +%% Note: Unlike Erlang/OTP, the `priority' option (OTP 28) is +%% not supported and raises `unsupported'. +%% @end +%%----------------------------------------------------------------------------- +-spec alias(Options) -> Alias when Options :: [explicit_unalias | reply], Alias :: reference(). +alias(_Options) -> + erlang:nif_error(undefined). + +%%----------------------------------------------------------------------------- +%% @param Alias the alias to be removed. +%% @returns `true' if alias was removed, `false' if it was not found +%% @doc Removes process alias. See `alias/0' for more information. +%% @end +%%----------------------------------------------------------------------------- +-spec unalias(Alias) -> boolean() when Alias :: reference(). +unalias(_Alias) -> + erlang:nif_error(undefined). diff --git a/src/libAtomVM/context.c b/src/libAtomVM/context.c index b8b29273d5..3c70f91f07 100644 --- a/src/libAtomVM/context.c +++ b/src/libAtomVM/context.c @@ -53,6 +53,11 @@ #define DEFAULT_STACK_SIZE 8 #define BYTES_PER_TERM (TERM_BITS / 8) +// active_alias_count saturates at this value instead of wrapping to 0. A wrap would make +// context_find_alias skip the list walk and silently drop every alias of the process. Once +// saturated, the count is never incremented or decremented again. +#define ACTIVE_ALIAS_COUNT_SATURATED 0xFF + static struct Monitor *context_monitors_handle_terminate(Context *ctx); static void context_distribution_handle_terminate(Context *ctx); static void destroy_extended_registers(Context *ctx, unsigned int live); @@ -83,6 +88,7 @@ Context *context_new(GlobalContext *glb) ctx->heap_growth_strategy = BoundedFreeHeapGrowth; ctx->has_min_heap_size = 0; ctx->has_max_heap_size = 0; + ctx->active_alias_count = 0; mailbox_init(&ctx->mailbox); @@ -139,6 +145,13 @@ void context_destroy(Context *ctx) // Hold and release the spin lock for timers and cancel any timer scheduler_cancel_timeout(ctx); + // A process terminated by the scheduler was already dequeued and its queue item reset, so this + // is a no-op. A process destroyed before it was ever scheduled (e.g. a spawn_opt option error) + // is still on the waiting queue, and dequeuing it here avoids leaving a dangling entry. + SMP_SPINLOCK_LOCK(&ctx->global->processes_spinlock); + list_remove(&ctx->processes_list_head); + SMP_SPINLOCK_UNLOCK(&ctx->global->processes_spinlock); + // Another process can get an access to our mailbox until this point. struct ListHead *processes_table_list = synclist_wrlock(&ctx->global->processes_table); UNUSED(processes_table_list); @@ -218,6 +231,7 @@ void context_destroy(Context *ctx) case FlushInfoMonitorSignal: case LinkExitSignal: // target will not be found when processing this link case MonitorDownSignal: // likewise + case AliasMessageSignal: // process is terminating; drop the alias message case CodeServerResumeSignal: break; case NormalMessage: { @@ -265,8 +279,10 @@ void context_destroy(Context *ctx) } case CONTEXT_MONITOR_LINK_LOCAL: case CONTEXT_MONITOR_MONITORED_LOCAL: + case CONTEXT_MONITOR_MONITORED_LOCAL_ALIAS: case CONTEXT_MONITOR_MONITORING_LOCAL: case CONTEXT_MONITOR_MONITORING_LOCAL_REGISTEREDNAME: + case CONTEXT_MONITOR_ALIAS: UNREACHABLE(); } } @@ -434,6 +450,14 @@ void context_process_monitor_down_signal(Context *ctx, struct TermSignal *signal // Remove link list_remove(&monitor->monitor_list_head); free(monitoring_monitor); + + // {alias, demonitor} / {alias, reply_demonitor}: the alias is deactivated when the + // monitor is removed, including the automatic removal at 'DOWN' delivery. + struct MonitorAlias *alias = context_find_alias(ctx, ref_ticks); + if (alias != NULL && alias->alias_type != ContextMonitorAliasExplicitUnalias) { + context_unalias(ctx, alias); + } + // Enqueue the term as a message. mailbox_send(ctx, signal->signal_term); break; @@ -454,6 +478,11 @@ void context_process_monitor_down_signal(Context *ctx, struct TermSignal *signal mailbox_send(ctx, signal->signal_term); END_WITH_STACK_HEAP(temp_heap, ctx->global); + struct MonitorAlias *alias = context_find_alias(ctx, ref_ticks); + if (alias != NULL && alias->alias_type != ContextMonitorAliasExplicitUnalias) { + context_unalias(ctx, alias); + } + free(monitoring_monitor); break; } @@ -463,6 +492,38 @@ void context_process_monitor_down_signal(Context *ctx, struct TermSignal *signal // (flush option removes messages that were already sent) } +term context_process_alias_message_signal(Context *ctx, struct TermSignal *signal) +{ + term ref = term_get_tuple_element(signal->signal_term, 0); + uint64_t ref_ticks = term_to_ref_ticks(ref); + term message = term_get_tuple_element(signal->signal_term, 1); + + struct MonitorAlias *alias = context_find_alias(ctx, ref_ticks); + if (IS_NULL_PTR(alias)) { + // Alias is not (or no longer) active: drop the message, matching OTP. + return term_invalid_term(); + } + + if (alias->alias_type == ContextMonitorAliasReplyDemonitor) { + // Capture the monitored pid before context_demonitor removes the local monitoring entry. + bool is_monitoring = false; + term monitor_pid = context_get_monitor_pid(ctx, ref_ticks, &is_monitoring); + context_demonitor(ctx, ref_ticks); + if (!term_is_invalid_term(monitor_pid) && is_monitoring) { + // Takes the processes_table read lock: callers of the with-aliases drain must not + // already hold it (see mailbox_process_outer_list_with_aliases). + int32_t monitored_process_id = term_to_local_process_id(monitor_pid); + Context *target = globalcontext_get_process_lock(ctx->global, monitored_process_id); + if (target) { + mailbox_send_ref_signal(target, DemonitorSignal, ref_ticks); + globalcontext_get_process_unlock(ctx->global, target); + } + } + } + + return message; +} + void context_process_code_server_resume_signal(Context *ctx) { #ifndef AVM_NO_JIT @@ -559,7 +620,8 @@ bool context_get_process_info(Context *ctx, term *out, size_t *term_size, term a ret_size = TUPLE_SIZE(2); LIST_FOR_EACH (item, &ctx->monitors_head) { struct Monitor *monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head); - if (monitor->monitor_type == CONTEXT_MONITOR_MONITORED_LOCAL) { + if (monitor->monitor_type == CONTEXT_MONITOR_MONITORED_LOCAL + || monitor->monitor_type == CONTEXT_MONITOR_MONITORED_LOCAL_ALIAS) { ret_size += CONS_SIZE; } else if (monitor->monitor_type == CONTEXT_MONITOR_RESOURCE) { ret_size += CONS_SIZE + TERM_BOXED_REFERENCE_RESOURCE_SIZE; @@ -668,7 +730,8 @@ bool context_get_process_info(Context *ctx, term *out, size_t *term_size, term a struct ListHead *item; LIST_FOR_EACH (item, &ctx->monitors_head) { struct Monitor *monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head); - if (monitor->monitor_type == CONTEXT_MONITOR_MONITORED_LOCAL) { + if (monitor->monitor_type == CONTEXT_MONITOR_MONITORED_LOCAL + || monitor->monitor_type == CONTEXT_MONITOR_MONITORED_LOCAL_ALIAS) { struct MonitorLocalMonitor *monitored_monitor = CONTAINER_OF(monitor, struct MonitorLocalMonitor, monitor); list = term_list_prepend(monitored_monitor->monitor_obj, list, heap); } else if (monitor->monitor_type == CONTEXT_MONITOR_RESOURCE) { @@ -780,21 +843,29 @@ static struct Monitor *context_monitors_handle_terminate(Context *ctx) } break; } - case CONTEXT_MONITOR_MONITORED_LOCAL: { + case CONTEXT_MONITOR_MONITORED_LOCAL: + case CONTEXT_MONITOR_MONITORED_LOCAL_ALIAS: { struct MonitorLocalMonitor *monitored_monitor = CONTAINER_OF(monitor, struct MonitorLocalMonitor, monitor); int32_t local_process_id = term_to_local_process_id(monitored_monitor->monitor_obj); Context *target = globalcontext_get_process_nolock(glb, local_process_id); // Target cannot be NULL as we processed Demonitor signals assert(target != NULL); - int required_terms = REF_SIZE + TUPLE_SIZE(5); + bool is_monitored_alias = monitor->monitor_type == CONTEXT_MONITOR_MONITORED_LOCAL_ALIAS; + int ref_size = is_monitored_alias ? TERM_BOXED_REFERENCE_PROCESS_SIZE : TERM_BOXED_REFERENCE_SHORT_SIZE; + int required_terms = ref_size + TUPLE_SIZE(5); if (UNLIKELY(memory_ensure_free(ctx, required_terms) != MEMORY_GC_OK)) { // TODO: handle out of memory here fprintf(stderr, "Cannot handle out of memory.\n"); globalcontext_get_process_unlock(glb, target); AVM_ABORT(); } - // Prepare the message on ctx's heap which will be freed afterwards. - term ref = term_from_ref_ticks(monitored_monitor->ref_ticks, &ctx->heap); + // Prepare the message on ctx's heap, which is freed afterwards. + term ref; + if (is_monitored_alias) { + ref = term_make_process_reference(local_process_id, monitored_monitor->ref_ticks, &ctx->heap); + } else { + ref = term_from_ref_ticks(monitored_monitor->ref_ticks, &ctx->heap); + } term port_or_process = term_pid_or_port_from_context(ctx); term port_or_process_atom @@ -811,6 +882,14 @@ static struct Monitor *context_monitors_handle_terminate(Context *ctx) free(monitored_monitor); break; } + case CONTEXT_MONITOR_ALIAS: { + struct MonitorAlias *alias = CONTAINER_OF(monitor, struct MonitorAlias, monitor); + if (LIKELY(ctx->active_alias_count != ACTIVE_ALIAS_COUNT_SATURATED)) { + ctx->active_alias_count--; + } + free(alias); + break; + } } } return result; @@ -850,17 +929,16 @@ struct Monitor *monitor_link_new(term link_pid) } } -struct Monitor *monitor_new(term monitor_pid, uint64_t ref_ticks, bool is_monitoring) +struct Monitor *monitor_new(term monitor_pid, uint64_t ref_ticks, enum ContextMonitorType monitor_type) { + assert(monitor_type == CONTEXT_MONITOR_MONITORING_LOCAL + || monitor_type == CONTEXT_MONITOR_MONITORED_LOCAL + || monitor_type == CONTEXT_MONITOR_MONITORED_LOCAL_ALIAS); struct MonitorLocalMonitor *monitor = malloc(sizeof(struct MonitorLocalMonitor)); if (IS_NULL_PTR(monitor)) { return NULL; } - if (is_monitoring) { - monitor->monitor.monitor_type = CONTEXT_MONITOR_MONITORING_LOCAL; - } else { - monitor->monitor.monitor_type = CONTEXT_MONITOR_MONITORED_LOCAL; - } + monitor->monitor.monitor_type = monitor_type; monitor->monitor_obj = monitor_pid; monitor->ref_ticks = ref_ticks; @@ -881,6 +959,19 @@ struct Monitor *monitor_registeredname_monitor_new(int32_t monitor_process_id, t return &monitor->monitor; } +struct Monitor *monitor_alias_new(uint64_t ref_ticks, context_monitor_alias_type_t alias_type) +{ + struct MonitorAlias *monitor = malloc(sizeof(struct MonitorAlias)); + if (IS_NULL_PTR(monitor)) { + return NULL; + } + monitor->monitor.monitor_type = CONTEXT_MONITOR_ALIAS; + monitor->ref_ticks = ref_ticks; + monitor->alias_type = alias_type; + + return &monitor->monitor; +} + struct Monitor *monitor_resource_monitor_new(void *resource, uint64_t ref_ticks) { struct ResourceContextMonitor *monitor = malloc(sizeof(struct ResourceContextMonitor)); @@ -895,6 +986,35 @@ struct Monitor *monitor_resource_monitor_new(void *resource, uint64_t ref_ticks) return &monitor->monitor; } +void monitor_destroy(struct Monitor *monitor) +{ + if (monitor == NULL) { + return; + } + switch (monitor->monitor_type) { + case CONTEXT_MONITOR_LINK_LOCAL: + free(CONTAINER_OF(monitor, struct LinkLocalMonitor, monitor)); + break; + case CONTEXT_MONITOR_LINK_REMOTE: + free(CONTAINER_OF(monitor, struct LinkRemoteMonitor, monitor)); + break; + case CONTEXT_MONITOR_MONITORING_LOCAL: + case CONTEXT_MONITOR_MONITORED_LOCAL: + case CONTEXT_MONITOR_MONITORED_LOCAL_ALIAS: + free(CONTAINER_OF(monitor, struct MonitorLocalMonitor, monitor)); + break; + case CONTEXT_MONITOR_MONITORING_LOCAL_REGISTEREDNAME: + free(CONTAINER_OF(monitor, struct MonitorLocalRegisteredNameMonitor, monitor)); + break; + case CONTEXT_MONITOR_ALIAS: + free(CONTAINER_OF(monitor, struct MonitorAlias, monitor)); + break; + case CONTEXT_MONITOR_RESOURCE: + free(CONTAINER_OF(monitor, struct ResourceContextMonitor, monitor)); + break; + } +} + bool context_add_monitor(Context *ctx, struct Monitor *new_monitor) { struct ListHead *item; @@ -912,7 +1032,8 @@ bool context_add_monitor(Context *ctx, struct Monitor *new_monitor) break; } case CONTEXT_MONITOR_MONITORING_LOCAL: - case CONTEXT_MONITOR_MONITORED_LOCAL: { + case CONTEXT_MONITOR_MONITORED_LOCAL: + case CONTEXT_MONITOR_MONITORED_LOCAL_ALIAS: { struct MonitorLocalMonitor *new_local_monitor = CONTAINER_OF(new_monitor, struct MonitorLocalMonitor, monitor); struct MonitorLocalMonitor *existing_local_monitor = CONTAINER_OF(existing, struct MonitorLocalMonitor, monitor); if (UNLIKELY(existing_local_monitor->monitor_obj == new_local_monitor->monitor_obj && existing_local_monitor->ref_ticks == new_local_monitor->ref_ticks)) { @@ -932,6 +1053,16 @@ bool context_add_monitor(Context *ctx, struct Monitor *new_monitor) } break; } + case CONTEXT_MONITOR_ALIAS: { + struct MonitorAlias *new_alias_monitor = CONTAINER_OF(new_monitor, struct MonitorAlias, monitor); + struct MonitorAlias *existing_alias_monitor = CONTAINER_OF(existing, struct MonitorAlias, monitor); + + if (UNLIKELY(existing_alias_monitor->alias_type == new_alias_monitor->alias_type && existing_alias_monitor->ref_ticks == new_alias_monitor->ref_ticks)) { + free(new_alias_monitor); + return false; + } + break; + } case CONTEXT_MONITOR_RESOURCE: { struct ResourceContextMonitor *new_resource_monitor = CONTAINER_OF(new_monitor, struct ResourceContextMonitor, monitor); struct ResourceContextMonitor *existing_resource_monitor = CONTAINER_OF(existing, struct ResourceContextMonitor, monitor); @@ -957,6 +1088,11 @@ bool context_add_monitor(Context *ctx, struct Monitor *new_monitor) } } list_append(&ctx->monitors_head, &new_monitor->monitor_list_head); + if (new_monitor->monitor_type == CONTEXT_MONITOR_ALIAS) { + if (LIKELY(ctx->active_alias_count < ACTIVE_ALIAS_COUNT_SATURATED)) { + ctx->active_alias_count++; + } + } return true; } @@ -1065,12 +1201,18 @@ void context_unlink_ack(Context *ctx, term link_pid, uint64_t unlink_id) void context_demonitor(Context *ctx, uint64_t ref_ticks) { + struct MonitorAlias *alias = context_find_alias(ctx, ref_ticks); + if (alias != NULL && alias->alias_type != ContextMonitorAliasExplicitUnalias) { + context_unalias(ctx, alias); + } + struct ListHead *item; LIST_FOR_EACH (item, &ctx->monitors_head) { struct Monitor *monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head); switch (monitor->monitor_type) { case CONTEXT_MONITOR_MONITORING_LOCAL: - case CONTEXT_MONITOR_MONITORED_LOCAL: { + case CONTEXT_MONITOR_MONITORED_LOCAL: + case CONTEXT_MONITOR_MONITORED_LOCAL_ALIAS: { struct MonitorLocalMonitor *local_monitor = CONTAINER_OF(monitor, struct MonitorLocalMonitor, monitor); if (local_monitor->ref_ticks == ref_ticks) { list_remove(&monitor->monitor_list_head); @@ -1095,14 +1237,49 @@ void context_demonitor(Context *ctx, uint64_t ref_ticks) free(resource_monitor); return; } + break; } case CONTEXT_MONITOR_LINK_LOCAL: case CONTEXT_MONITOR_LINK_REMOTE: + case CONTEXT_MONITOR_ALIAS: break; } } } +struct MonitorAlias *context_find_alias(Context *ctx, uint64_t ref_ticks) +{ + // The vast majority of processes never create an alias: skip the monitor list walk + // entirely for them (this runs for every 'DOWN' and demonitor, not only for aliases). + if (LIKELY(ctx->active_alias_count == 0)) { + return NULL; + } + struct ListHead *item; + LIST_FOR_EACH (item, &ctx->monitors_head) { + struct Monitor *monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head); + if (monitor->monitor_type == CONTEXT_MONITOR_ALIAS) { + struct MonitorAlias *alias_monitor = CONTAINER_OF(monitor, struct MonitorAlias, monitor); + if (alias_monitor->ref_ticks == ref_ticks) { + return alias_monitor; + } + } + } + + return NULL; +} + +void context_unalias(Context *ctx, struct MonitorAlias *alias) +{ + TERM_DEBUG_ASSERT(alias != NULL); + TERM_DEBUG_ASSERT(ctx->active_alias_count > 0); + if (LIKELY(ctx->active_alias_count != ACTIVE_ALIAS_COUNT_SATURATED)) { + ctx->active_alias_count--; + } + struct Monitor *monitor = &alias->monitor; + list_remove(&monitor->monitor_list_head); + free(alias); +} + term context_get_monitor_pid(Context *ctx, uint64_t ref_ticks, bool *is_monitoring) { struct ListHead *item; @@ -1110,7 +1287,8 @@ term context_get_monitor_pid(Context *ctx, uint64_t ref_ticks, bool *is_monitori struct Monitor *monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head); switch (monitor->monitor_type) { case CONTEXT_MONITOR_MONITORING_LOCAL: - case CONTEXT_MONITOR_MONITORED_LOCAL: { + case CONTEXT_MONITOR_MONITORED_LOCAL: + case CONTEXT_MONITOR_MONITORED_LOCAL_ALIAS: { struct MonitorLocalMonitor *local_monitor = CONTAINER_OF(monitor, struct MonitorLocalMonitor, monitor); if (local_monitor->ref_ticks == ref_ticks) { *is_monitoring = monitor->monitor_type == CONTEXT_MONITOR_MONITORING_LOCAL; @@ -1129,6 +1307,7 @@ term context_get_monitor_pid(Context *ctx, uint64_t ref_ticks, bool *is_monitori case CONTEXT_MONITOR_LINK_LOCAL: case CONTEXT_MONITOR_LINK_REMOTE: case CONTEXT_MONITOR_RESOURCE: + case CONTEXT_MONITOR_ALIAS: break; } } @@ -1259,7 +1438,14 @@ COLD_FUNC void context_dump(Context *ctx) fprintf(stderr, "\n"); break; } - case CONTEXT_MONITOR_MONITORED_LOCAL: { + case CONTEXT_MONITOR_ALIAS: { + struct MonitorAlias *monitor_alias = CONTAINER_OF(monitor, struct MonitorAlias, monitor); + fprintf(stderr, "has alias ref=%lu", (long unsigned) monitor_alias->ref_ticks); + fprintf(stderr, "\n"); + break; + } + case CONTEXT_MONITOR_MONITORED_LOCAL: + case CONTEXT_MONITOR_MONITORED_LOCAL_ALIAS: { struct MonitorLocalMonitor *monitored_monitor = CONTAINER_OF(monitor, struct MonitorLocalMonitor, monitor); fprintf(stderr, "monitored by "); term_display(stderr, monitored_monitor->monitor_obj, ctx); diff --git a/src/libAtomVM/context.h b/src/libAtomVM/context.h index e43f360620..e7630a95a3 100644 --- a/src/libAtomVM/context.h +++ b/src/libAtomVM/context.h @@ -138,6 +138,9 @@ struct Context unsigned int leader : 1; unsigned int has_min_heap_size : 1; unsigned int has_max_heap_size : 1; + // Count of active aliases, so alias lookups short-circuit for the common alias-free process. + // Saturates at ACTIVE_ALIAS_COUNT_SATURATED (context.c). + unsigned int active_alias_count : 8; bool trap_exit : 1; #ifndef AVM_NO_EMU @@ -177,8 +180,18 @@ enum ContextMonitorType CONTEXT_MONITOR_RESOURCE, CONTEXT_MONITOR_LINK_REMOTE, CONTEXT_MONITOR_MONITORING_LOCAL_REGISTEREDNAME, + CONTEXT_MONITOR_ALIAS, + // Like CONTEXT_MONITOR_MONITORED_LOCAL, but the 'DOWN' carries a process reference (an alias). + CONTEXT_MONITOR_MONITORED_LOCAL_ALIAS, }; +typedef enum +{ + ContextMonitorAliasExplicitUnalias, + ContextMonitorAliasDemonitor, + ContextMonitorAliasReplyDemonitor, +} context_monitor_alias_type_t; + #define UNLINK_ID_LINK_ACTIVE 0x0 /** @@ -212,6 +225,13 @@ struct MonitorLocalRegisteredNameMonitor term monitor_name; }; +struct MonitorAlias +{ + struct Monitor monitor; + uint64_t ref_ticks; + context_monitor_alias_type_t alias_type; +}; + // The other half is called ResourceMonitor and is a linked list of resources struct ResourceContextMonitor { @@ -479,6 +499,21 @@ bool context_process_link_exit_signal(Context *ctx, struct TermSignal *signal); */ void context_process_monitor_down_signal(Context *ctx, struct TermSignal *signal); +/** + * @brief Process an alias message signal. + * + * @details The signal term is a 2-tuple {Ref, Message}. If Ref is an active alias of this process the + * Message is returned for delivery as a normal message; otherwise an invalid term is returned and the + * message is dropped. For a reply_demonitor alias the alias is also deactivated and the monitor + * removed. Runs in the owner's own context (no race). The caller delivers the returned message in send + * order so an alias send is not reordered against a plain send. + * + * @param ctx the context being executed + * @param signal the signal with the {Ref, Message} tuple + * @return the message to deliver, or an invalid term to drop it + */ +term context_process_alias_message_signal(Context *ctx, struct TermSignal *signal); + /** * @brief Resume execution after module has been loaded * @@ -510,12 +545,23 @@ struct Monitor *monitor_link_new(term link_pid); /** * @brief Create a monitor on a process. * - * @param monitor_pid monitored process + * @param monitor_pid monitored process (or monitoring process when ctx is the monitored one) * @param ref_ticks reference of the monitor - * @param is_monitoring if ctx is the monitoring process + * @param monitor_type \c CONTEXT_MONITOR_MONITORING_LOCAL for the monitoring process's half, + * \c CONTEXT_MONITOR_MONITORED_LOCAL or \c CONTEXT_MONITOR_MONITORED_LOCAL_ALIAS (when the + * monitor was created with the {alias, _} option) for the monitored process's half * @return the allocated monitor or NULL if allocation failed */ -struct Monitor *monitor_new(term monitor_pid, uint64_t ref_ticks, bool is_monitoring); +struct Monitor *monitor_new(term monitor_pid, uint64_t ref_ticks, enum ContextMonitorType monitor_type); + +/** + * @brief Create a process alias. + * + * @param ref_ticks reference of the alias + * @param alias_type when the alias is deactivated, see the erlang:monitor/3 alias option + * @return the allocated monitor or NULL if allocation failed + */ +struct Monitor *monitor_alias_new(uint64_t ref_ticks, context_monitor_alias_type_t alias_type); /** * @brief Create a monitor on a process by registered name. @@ -536,6 +582,18 @@ struct Monitor *monitor_registeredname_monitor_new(int32_t monitor_process_id, t */ struct Monitor *monitor_resource_monitor_new(void *resource, uint64_t ref_ticks); +/** + * @brief Destroy a monitor that was not installed yet. + * @details Frees the container struct the monitor is embedded in, recovering + * it from its monitor type with CONTAINER_OF instead of relying on the monitor + * being the first member. It doesn't remove the monitor from any list, so it + * must only be used on monitors that were never passed to context_add_monitor, + * e.g. on error paths. + * + * @param monitor the monitor to free, or NULL in which case nothing is done + */ +void monitor_destroy(struct Monitor *monitor); + /** * @brief Half-unlink process to another process * @details If process is found, an unlink id is generated and the link is @@ -582,6 +640,23 @@ void context_unlink_ack(Context *ctx, term link_pid, uint64_t unlink_id); */ void context_demonitor(Context *ctx, uint64_t ref_ticks); +/** + * @brief Find a process alias + * + * @param ctx the context being executed + * @param ref_ticks reference of the alias to find + * @return found alias or NULL + */ +struct MonitorAlias *context_find_alias(Context *ctx, uint64_t ref_ticks); + +/** + * @brief Remove an alias of a process + * + * @param ctx the context owning the alias + * @param alias The alias to remove, can be obtained using context_find_alias + */ +void context_unalias(Context *ctx, struct MonitorAlias *alias); + /** * @brief Get target of a monitor. * diff --git a/src/libAtomVM/defaultatoms.def b/src/libAtomVM/defaultatoms.def index 6dc95175d9..8911bc90a0 100644 --- a/src/libAtomVM/defaultatoms.def +++ b/src/libAtomVM/defaultatoms.def @@ -222,3 +222,11 @@ X(JIT_RISCV32_ATOM, "\xB", "jit_riscv32") X(JIT_RISCV64_ATOM, "\xB", "jit_riscv64") X(JIT_WASM32_ATOM, "\xA", "jit_wasm32") X(JIT_XTENSA_ATOM, "\xA", "jit_xtensa") + +X(ALIAS_ATOM, "\x5", "alias") +X(DEMONITOR_ATOM, "\x9", "demonitor") +X(EXPLICIT_UNALIAS_ATOM, "\x10", "explicit_unalias") +X(REPLY_DEMONITOR_ATOM, "\xF", "reply_demonitor") +X(REPLY_ATOM, "\x5", "reply") +X(TAG_ATOM, "\x3", "tag") +X(PRIORITY_ATOM, "\x8", "priority") diff --git a/src/libAtomVM/dist_nifs.c b/src/libAtomVM/dist_nifs.c index dff76b6b5a..1b6d93f24a 100644 --- a/src/libAtomVM/dist_nifs.c +++ b/src/libAtomVM/dist_nifs.c @@ -560,6 +560,29 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[]) globalcontext_send_message(ctx->global, target_process_id, payload); break; } + case OPERATION_ALIAS_SEND: + case OPERATION_ALIAS_SEND_TT: { + // {DOP_ALIAS_SEND, FromPid, Alias} or {DOP_ALIAS_SEND_TT, FromPid, Alias, TraceToken}, + // followed by the message payload. The trace token is ignored. + size_t expected_arity = (term_to_int(operation) == OPERATION_ALIAS_SEND) ? 3 : 4; + if (UNLIKELY(arity != expected_arity)) { + RAISE_ERROR(BADARG_ATOM); + } + term roots[3]; + roots[0] = argv[0]; // dist handle + roots[1] = argv[1]; + roots[2] = control; + term payload = external_term_from_binary_with_roots(ctx, 1, 1 + bytes_read, &bytes_read, 3, roots); + control = roots[2]; + term target = term_get_tuple_element(control, 2); + if (LIKELY(term_is_process_reference(target))) { + int32_t target_process_id = term_process_ref_to_process_id(target); + globalcontext_send_message_to_alias(ctx->global, target_process_id, target, payload); + } + // A ref minted by a previous incarnation of this node is not an active alias here. + // Drop the message instead of crashing the dist connection with badarg. + break; + } case OPERATION_SPAWN_REQUEST: { if (UNLIKELY(arity != 6)) { RAISE_ERROR(BADARG_ATOM); diff --git a/src/libAtomVM/ets.c b/src/libAtomVM/ets.c index b33715b686..f02576a6a1 100644 --- a/src/libAtomVM/ets.c +++ b/src/libAtomVM/ets.c @@ -180,7 +180,7 @@ ets_result_t ets_create_table_maybe_gc( if (named) { *ret = name; } else { - if (UNLIKELY(memory_ensure_free_opt(ctx, REF_SIZE, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + if (UNLIKELY(memory_ensure_free_opt(ctx, TERM_BOXED_REFERENCE_SHORT_SIZE, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { ets_multimap_delete(multimap, ctx->global); #ifndef AVM_NO_SMP smp_rwlock_destroy(table->lock); diff --git a/src/libAtomVM/external_term.c b/src/libAtomVM/external_term.c index e0da90a042..f306e2e810 100644 --- a/src/libAtomVM/external_term.c +++ b/src/libAtomVM/external_term.c @@ -521,6 +521,8 @@ static int serialize_term(uint8_t *buf, term t, GlobalContext *glb) uint32_t len; if (term_is_resource_reference(t)) { len = 4; + } else if (term_is_process_reference(t)) { + len = 3; } else { len = 2; } @@ -542,6 +544,15 @@ static int serialize_term(uint8_t *buf, term t, GlobalContext *glb) WRITE_64_UNALIGNED(buf + k + 12, ((uintptr_t) serialize_ref)); } return k + 20; + } else if (term_is_process_reference(t)) { + if (!IS_NULL_PTR(buf)) { + uint64_t ticks = term_to_ref_ticks(t); + uint32_t process_id = term_process_ref_to_process_id(t); + WRITE_32_UNALIGNED(buf + k, creation); + WRITE_64_UNALIGNED(buf + k + 4, ticks); + WRITE_32_UNALIGNED(buf + k + 12, process_id); + } + return k + 16; } else { if (!IS_NULL_PTR(buf)) { uint64_t ticks = term_to_ref_ticks(t); @@ -999,6 +1010,13 @@ static term parse_external_terms(const uint8_t *external_term_buf, size_t *eterm if (len == 2 && node == this_node && creation == this_creation) { uint64_t ticks = ((uint64_t) data[0]) << 32 | data[1]; return term_from_ref_ticks(ticks, heap); + } else if (len == 3 && node == this_node && creation == this_creation) { + uint64_t ticks = ((uint64_t) data[0]) << 32 | data[1]; + uint32_t process_id = data[2]; + if (UNLIKELY(process_id == INVALID_PROCESS_ID || process_id > TERM_MAX_LOCAL_PROCESS_ID)) { + return term_invalid_term(); + } + return term_make_process_reference(process_id, ticks, heap); } else if (len == 4 && node == this_node && creation == this_creation) { // This is a resource uint64_t resource_type_ptr = ((uint64_t) data[0]) << 32 | data[1]; @@ -1442,7 +1460,9 @@ static int calculate_heap_usage(const uint8_t *external_term_buf, size_t remaini // Check if it's non-distributed node, in which case it's always a local ref if (external_term_buf[4] == strlen("nonode@nohost") && memcmp(external_term_buf + 5, "nonode@nohost", strlen("nonode@nohost")) == 0) { if (len == 2) { - heap_size = REF_SIZE; + heap_size = TERM_BOXED_REFERENCE_SHORT_SIZE; + } else if (len == 3) { + heap_size = TERM_BOXED_REFERENCE_PROCESS_SIZE; } else if (len == 4) { heap_size = TERM_BOXED_REFERENCE_RESOURCE_SIZE; } diff --git a/src/libAtomVM/globalcontext.c b/src/libAtomVM/globalcontext.c index d00213284c..cc85e4620b 100644 --- a/src/libAtomVM/globalcontext.c +++ b/src/libAtomVM/globalcontext.c @@ -414,6 +414,20 @@ void globalcontext_send_message(GlobalContext *glb, int32_t process_id, term t) } } +void globalcontext_send_message_to_alias(GlobalContext *glb, int32_t process_id, term ref, term message) +{ + Context *p = globalcontext_get_process_lock(glb, process_id); + if (p) { + BEGIN_WITH_STACK_HEAP(TUPLE_SIZE(2), temp_heap) + term tuple = term_alloc_tuple(2, &temp_heap); + term_put_tuple_element(tuple, 0, ref); + term_put_tuple_element(tuple, 1, message); + mailbox_send_term_signal(p, AliasMessageSignal, tuple); + END_WITH_STACK_HEAP(temp_heap, glb) + globalcontext_get_process_unlock(glb, p); + } +} + void globalcontext_send_message_nolock(GlobalContext *glb, int32_t process_id, term t) { Context *p = globalcontext_get_process_nolock(glb, process_id); diff --git a/src/libAtomVM/globalcontext.h b/src/libAtomVM/globalcontext.h index eb7779e9d1..b36465e28c 100644 --- a/src/libAtomVM/globalcontext.h +++ b/src/libAtomVM/globalcontext.h @@ -46,8 +46,6 @@ extern "C" { #endif -#define INVALID_PROCESS_ID 0 - struct Context; #ifndef TYPEDEF_CONTEXT @@ -264,6 +262,20 @@ bool globalcontext_process_exists(GlobalContext *glb, int32_t process_id); */ void globalcontext_send_message(GlobalContext *glb, int32_t process_id, term t); +/** + * @brief Send a message to a process alias (process reference). + * + * @details Posts an AliasMessageSignal carrying {Ref, Message} to the owner process. The owner + * validates the alias against its own monitors when it drains signals and either delivers Message + * as a normal message or drops it. This avoids touching the owner's monitor list from the sender. + * + * @param glb the global context (that owns the process table). + * @param process_id the local process id of the alias owner. + * @param ref the process reference (alias) used as the send target. + * @param message the message to send. + */ +void globalcontext_send_message_to_alias(GlobalContext *glb, int32_t process_id, term ref, term message); + /** * @brief Send a message to a process from another process. * There should be a lock on the process table. This variant can be used by diff --git a/src/libAtomVM/jit.c b/src/libAtomVM/jit.c index 8d8bd5b4f8..e705e5e12f 100644 --- a/src/libAtomVM/jit.c +++ b/src/libAtomVM/jit.c @@ -840,25 +840,33 @@ static bool jit_send(Context *ctx, JITState *jit_state) return false; } ctx->x[0] = return_value; - } else { - if (term_is_atom(recipient_term)) { - recipient_term = globalcontext_get_registered_process(ctx->global, term_to_atom_index(recipient_term)); - if (UNLIKELY(recipient_term == UNDEFINED_ATOM)) { - set_error(ctx, jit_state, 0, BADARG_ATOM); - return false; - } - } - - int local_process_id; - if (term_is_local_pid_or_port(recipient_term)) { - local_process_id = term_to_local_process_id(recipient_term); - } else { + } else if (term_is_local_pid_or_port(recipient_term)) { + int local_process_id = term_to_local_process_id(recipient_term); + globalcontext_send_message(ctx->global, local_process_id, ctx->x[1]); + ctx->x[0] = ctx->x[1]; + } else if (term_is_atom(recipient_term)) { + recipient_term = globalcontext_get_registered_process(ctx->global, term_to_atom_index(recipient_term)); + if (UNLIKELY(recipient_term == UNDEFINED_ATOM)) { set_error(ctx, jit_state, 0, BADARG_ATOM); return false; } + // A registered name always resolves to a local pid or port (register/2 validates this). + int local_process_id = term_to_local_process_id(recipient_term); globalcontext_send_message(ctx->global, local_process_id, ctx->x[1]); ctx->x[0] = ctx->x[1]; + } else if (UNLIKELY(!term_is_reference(recipient_term))) { + set_error(ctx, jit_state, 0, BADARG_ATOM); + return false; + } else if (term_is_process_reference(recipient_term)) { + int32_t process_id = term_process_ref_to_process_id(recipient_term); + globalcontext_send_message_to_alias(ctx->global, process_id, recipient_term, ctx->x[1]); + ctx->x[0] = ctx->x[1]; + } else { + // Drop the send but still return the message in x0, as OTP does for a non-active-alias + // reference. Outbound distributed aliases are unsupported. + ctx->x[0] = ctx->x[1]; } + return true; } @@ -883,7 +891,7 @@ static term *jit_extended_register_ptr(Context *ctx, unsigned int index) static Context *jit_process_signal_messages(Context *ctx, JITState *jit_state) { TRACE("jit_process_signal_messages\n"); - MailboxMessage *signal_message = mailbox_process_outer_list(&ctx->mailbox); + MailboxMessage *signal_message = mailbox_process_outer_list_with_aliases(ctx); bool handle_error = false; bool reprocess_outer = false; while (signal_message) { @@ -1010,6 +1018,7 @@ static Context *jit_process_signal_messages(Context *ctx, JITState *jit_state) #endif break; } + case AliasMessageSignal: case NormalMessage: { UNREACHABLE(); } @@ -1019,7 +1028,7 @@ static Context *jit_process_signal_messages(Context *ctx, JITState *jit_state) signal_message = next; if (UNLIKELY(reprocess_outer && signal_message == NULL)) { reprocess_outer = false; - signal_message = mailbox_process_outer_list(&ctx->mailbox); + signal_message = mailbox_process_outer_list_with_aliases(ctx); } } if (context_get_flags(ctx, Killed)) { diff --git a/src/libAtomVM/mailbox.c b/src/libAtomVM/mailbox.c index fe256df7ed..4fe636b17e 100644 --- a/src/libAtomVM/mailbox.c +++ b/src/libAtomVM/mailbox.c @@ -22,6 +22,7 @@ #include +#include "context.h" #include "memory.h" #include "scheduler.h" #include "synclist.h" @@ -101,6 +102,7 @@ void mailbox_message_dispose(MailboxMessage *m, Heap *heap) case SetGroupLeaderSignal: case LinkExitSignal: case MonitorDownSignal: + case AliasMessageSignal: case UnlinkRemoteIDSignal: case UnlinkRemoteIDAckSignal: { struct TermSignal *term_signal = CONTAINER_OF(m, struct TermSignal, base); @@ -337,6 +339,7 @@ void mailbox_send_monitor_signal(Context *c, enum MessageType type, struct Monit { struct MonitorPointerSignal *monitor_signal = malloc(sizeof(struct MonitorPointerSignal)); if (IS_NULL_PTR(monitor_signal)) { + // FIXME this function returns void, so the caller is not told the allocation failed fprintf(stderr, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); return; } @@ -364,9 +367,10 @@ void mailbox_reset(Mailbox *mbox) mbox->receive_pointer_prev = NULL; } -MailboxMessage *mailbox_process_outer_list(Mailbox *mbox) +// CAS-empty the outer list and return its raw head. The outer list is LIFO, so the head is the +// newest message and each message is older than its predecessor. +static inline MailboxMessage *detach_outer_list(Mailbox *mbox) { - // Empty outer list using CAS MailboxMessage *current = mbox->outer_first; #if !defined(AVM_NO_SMP) || defined(AVM_TASK_DRIVER_ENABLED) while (!ATOMIC_COMPARE_EXCHANGE_WEAK_PTR(&mbox->outer_first, ¤t, NULL)) { @@ -374,52 +378,173 @@ MailboxMessage *mailbox_process_outer_list(Mailbox *mbox) #else mbox->outer_first = NULL; #endif - // Reverse the list - MailboxMessage *previous_normal = NULL; - MailboxMessage *previous_signal = NULL; - MailboxMessage *last_normal = NULL; + return current; +} + +// Append a received-order run of normal messages, from first (oldest) to last (newest), at the end +// of the inner list, restoring the receive pointer when the inner list had been fully consumed. +// first and last are NULL together when no normal message was collected, making this a no-op. +static inline void append_normal_messages(Mailbox *mbox, MailboxMessage *first, MailboxMessage *last) +{ + if (last == NULL) { + return; + } + + // With no receive_pointer, it becomes the new list head. + if (mbox->receive_pointer == NULL) { + mbox->receive_pointer = first; + // If we had a prev, set the prev's next to the new current. + if (mbox->receive_pointer_prev) { + mbox->receive_pointer_prev->next = first; + } else if (mbox->inner_first == NULL) { + // If we had no first, this is the first message. + mbox->inner_first = first; + } + } + + // Append the new items at the end of the inner list. mbox->inner_last may be + // mbox->receive_pointer_prev, which is then updated a second time here. + if (mbox->inner_last) { + mbox->inner_last->next = first; + } + mbox->inner_last = last; +} + +MailboxMessage *mailbox_process_outer_list(Mailbox *mbox) +{ + MailboxMessage *current = detach_outer_list(mbox); + + // Reverse the LIFO list into received order (oldest first), splitting it into a normal sublist + // and a signal sublist in one pass. This entry is alias-blind: its callers (ports, teardown and + // crashdump) never own an active alias. + MailboxMessage *normal_first = NULL; + MailboxMessage *normal_last = NULL; + MailboxMessage *signal_first = NULL; while (current) { MailboxMessage *next = current->next; if (current->type == NormalMessage) { - // Get last normal to update inner_last. - if (last_normal == NULL) { - last_normal = current; + if (normal_last == NULL) { + normal_last = current; } - current->next = previous_normal; - previous_normal = current; + current->next = normal_first; + normal_first = current; } else { - current->next = previous_signal; - previous_signal = current; + current->next = signal_first; + signal_first = current; } current = next; } - // If we did enqueue some normal messages, lastNormal is the first - // one in outer list (last received one) - if (last_normal) { - // previousNormal is new list head - // If we had no receive_pointer, it should be this list head - if (mbox->receive_pointer == NULL) { - mbox->receive_pointer = previous_normal; - // If we had a prev, set the prev's next to the new current. - if (mbox->receive_pointer_prev) { - mbox->receive_pointer_prev->next = previous_normal; - } else if (mbox->inner_first == NULL) { - // If we had no first, this is the first message. - mbox->inner_first = previous_normal; + + append_normal_messages(mbox, normal_first, normal_last); + return signal_first; +} + +MailboxMessage *mailbox_process_outer_list_with_aliases(Context *ctx) +{ + Mailbox *mbox = &ctx->mailbox; + MailboxMessage *current = detach_outer_list(mbox); + + MailboxMessage *normal_first = NULL; + MailboxMessage *normal_last = NULL; + MailboxMessage *signal_first = NULL; + + if (ctx->active_alias_count == 0) { + // Fast path (the common case): no active alias, so no alias message can be delivered. Same + // single-pass split as mailbox_process_outer_list, except a stale AliasMessageSignal (its + // alias is inactive) is freed now so it does not reach the signal loop, which would treat + // it as unreachable. + while (current) { + MailboxMessage *next = current->next; + if (current->type == AliasMessageSignal) { + mailbox_message_dispose_unsent(CONTAINER_OF(current, Message, base), ctx->global, false); + } else if (current->type == NormalMessage) { + if (normal_last == NULL) { + normal_last = current; + } + current->next = normal_first; + normal_first = current; + } else { + current->next = signal_first; + signal_first = current; } + current = next; + } + } else { + // At least one active alias: alias side effects can deactivate the alias (e.g. + // reply_demonitor), so they must run in received order. Of several same-batch sends to one + // alias, only the first is delivered, like OTP. Reverse the LIFO list into received order. + MailboxMessage *received = NULL; + while (current) { + MailboxMessage *next = current->next; + current->next = received; + received = current; + current = next; } - // Update last and previous last's next. - // Append these new items at the end of inner list. - if (mbox->inner_last) { - // This may be mbox->receive_pointer_prev which we - // are updating a second time here. - mbox->inner_last->next = previous_normal; + // Walk oldest to newest, appending so both sublists keep received order. + MailboxMessage *signal_last = NULL; + current = received; + while (current) { + MailboxMessage *next = current->next; + if (current->type == NormalMessage) { + current->next = NULL; + if (normal_last == NULL) { + normal_first = current; + } else { + normal_last->next = current; + } + normal_last = current; + } else if (current->type == AliasMessageSignal) { + // Validate the alias (in the owner's own context) and convert to a normal message. + term message = context_process_alias_message_signal(ctx, CONTAINER_OF(current, struct TermSignal, base)); + if (!term_is_invalid_term(message)) { + // Re-type in place: struct TermSignal and struct Message share a layout + // (static-asserted above) and the message term already lives in this signal's + // storage, so nothing is copied. The conversion cannot fail on OOM, which + // matters because reply_demonitor's side effects already ran above. + Message *converted = CONTAINER_OF(current, Message, base); + converted->base.type = NormalMessage; + converted->message = message; + converted->base.next = NULL; + if (normal_last == NULL) { + normal_first = &converted->base; + } else { + normal_last->next = &converted->base; + } + normal_last = &converted->base; + } else { + // Inactive alias: never delivered, so nothing references the term. Free it + // now (sweeping refc binaries) instead of leaving it on the heap until GC. + mailbox_message_dispose_unsent(CONTAINER_OF(current, Message, base), ctx->global, false); + } + } else { + // A 'DOWN' auto-removing a {alias, _} monitor also deactivates its alias. Do that + // here, in received order, so a later same-batch alias send is dropped like OTP + // (alias messages are converted during this split, before the signal loop runs the + // 'DOWN'). The signal loop's own deactivation in context_process_monitor_down_signal + // is then an idempotent no-op. + if (current->type == MonitorDownSignal) { + struct TermSignal *down_signal = CONTAINER_OF(current, struct TermSignal, base); + uint64_t ref_ticks = term_to_ref_ticks(term_get_tuple_element(down_signal->signal_term, 1)); + struct MonitorAlias *alias = context_find_alias(ctx, ref_ticks); + if (alias != NULL && alias->alias_type != ContextMonitorAliasExplicitUnalias) { + context_unalias(ctx, alias); + } + } + current->next = NULL; + if (signal_last == NULL) { + signal_first = current; + } else { + signal_last->next = current; + } + signal_last = current; + } + current = next; } - mbox->inner_last = last_normal; } - return previous_signal; + append_normal_messages(mbox, normal_first, normal_last); + return signal_first; } void mailbox_next(Mailbox *mbox) diff --git a/src/libAtomVM/mailbox.h b/src/libAtomVM/mailbox.h index 50e7dd1c2b..45b9932829 100644 --- a/src/libAtomVM/mailbox.h +++ b/src/libAtomVM/mailbox.h @@ -99,6 +99,7 @@ enum MessageType DemonitorSignal, MonitorDownSignal, CodeServerResumeSignal, + AliasMessageSignal, }; struct MailboxMessage @@ -213,12 +214,24 @@ size_t mailbox_size(Mailbox *mbox); /** * @brief Process the outer list of messages. * - * @details To be called from the process only + * @details To be called from the process only. * @param mbox the mailbox to work with * @return the signal messages in received order. */ MailboxMessage *mailbox_process_outer_list(Mailbox *mbox); +/** + * @brief Process the outer list of messages, delivering alias messages in send order. + * + * @details Like mailbox_process_outer_list, but AliasMessageSignals are validated in ctx's own + * monitor list and, when the alias is active, converted to normal messages in place so they keep + * send order relative to plain messages from the same sender. To be called from the process only, + * while not holding the processes table lock (the reply_demonitor path takes it). + * @param ctx the context whose mailbox is processed + * @return the remaining (non-alias) signal messages in received order. + */ +MailboxMessage *mailbox_process_outer_list_with_aliases(Context *ctx); + /** * @brief Sends a message to a certain mailbox. * diff --git a/src/libAtomVM/nifs.c b/src/libAtomVM/nifs.c index 2e3f70f7eb..c9081e481a 100644 --- a/src/libAtomVM/nifs.c +++ b/src/libAtomVM/nifs.c @@ -296,6 +296,8 @@ static term nif_unicode_characters_to_binary(Context *ctx, int argc, term argv[] static term nif_erlang_lists_subtract(Context *ctx, int argc, term argv[]); static term nif_erlang_crc32(Context *ctx, int argc, term argv[]); static term nif_erlang_crc32_combine_3(Context *ctx, int argc, term argv[]); +static term nif_erlang_alias(Context *ctx, int argc, term argv[]); +static term nif_erlang_unalias(Context *ctx, int argc, term argv[]); static term nif_zlib_compress_1(Context *ctx, int argc, term argv[]); #define DECLARE_MATH_NIF_FUN(moniker) \ @@ -987,6 +989,14 @@ static const struct Nif crc32_combine_nif = { .base.type = NIFFunctionType, .nif_ptr = nif_erlang_crc32_combine_3 }; +static const struct Nif erlang_alias_nif = { + .base.type = NIFFunctionType, + .nif_ptr = nif_erlang_alias +}; +static const struct Nif erlang_unalias_nif = { + .base.type = NIFFunctionType, + .nif_ptr = nif_erlang_unalias +}; static const struct Nif zlib_compress_nif = { .base.type = NIFFunctionType, .nif_ptr = nif_zlib_compress_1 @@ -1325,7 +1335,9 @@ static NativeHandlerResult process_console_message(Context *ctx, term msg) { // msg is not in the port's heap NativeHandlerResult result = NativeContinue; - if (UNLIKELY(memory_ensure_free_opt(ctx, 12, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + // The reply builders below don't ensure_free, so this one reservation must cover their largest + // reply, the gen_call error path. + if (UNLIKELY(memory_ensure_free_opt(ctx, 2 * TUPLE_SIZE(2), MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { fprintf(stderr, "Unable to allocate sufficient memory for console driver.\n"); AVM_ABORT(); } @@ -1343,7 +1355,6 @@ static NativeHandlerResult process_console_message(Context *ctx, term msg) term pid = term_get_tuple_element(msg, 1); term ref = term_get_tuple_element(msg, 2); term req = term_get_tuple_element(msg, 3); - uint64_t ref_ticks = term_to_ref_ticks(ref); if (is_tagged_tuple(req, PUT_CHARS_ATOM, 3)) { term chars = term_get_tuple_element(req, 2); @@ -1353,11 +1364,11 @@ static NativeHandlerResult process_console_message(Context *ctx, term msg) printf("%s", str); free(str); - term refcopy = term_from_ref_ticks(ref_ticks, &ctx->heap); - term reply = term_alloc_tuple(3, &ctx->heap); term_put_tuple_element(reply, 0, IO_REPLY_ATOM); - term_put_tuple_element(reply, 1, refcopy); + // Don't rebuild ReplyAs from ref ticks: that turns an alias into a plain reference + // the requester's receive would not match, and ReplyAs need not be a reference. + term_put_tuple_element(reply, 1, ref); term_put_tuple_element(reply, 2, OK_ATOM); port_send_message(ctx->global, pid, reply); @@ -1417,6 +1428,42 @@ static NativeHandlerResult process_console_mailbox(Context *ctx) return result; } +static term parse_monitor_opts(Context *ctx, term monitor_opts, bool *is_alias, context_monitor_alias_type_t *alias_type) +{ + *is_alias = false; + while (term_is_nonempty_list(monitor_opts)) { + term option = term_get_list_head(monitor_opts); + if (term_is_tuple(option) && term_get_tuple_arity(option) == 2 && term_get_tuple_element(option, 0) == ALIAS_ATOM) { + *is_alias = true; + switch (term_get_tuple_element(option, 1)) { + case EXPLICIT_UNALIAS_ATOM: + *alias_type = ContextMonitorAliasExplicitUnalias; + break; + case DEMONITOR_ATOM: + *alias_type = ContextMonitorAliasDemonitor; + break; + case REPLY_DEMONITOR_ATOM: + *alias_type = ContextMonitorAliasReplyDemonitor; + break; + default: + RAISE_ERROR(BADARG_ATOM); + } + } else if (term_is_tuple(option) && term_get_tuple_arity(option) == 2 && term_get_tuple_element(option, 0) == TAG_ATOM) { + RAISE_ERROR(UNSUPPORTED_ATOM); + } else { + RAISE_ERROR(BADARG_ATOM); + } + + monitor_opts = term_get_list_tail(monitor_opts); + } + + if (UNLIKELY(!term_is_nil(monitor_opts))) { + RAISE_ERROR(BADARG_ATOM); + } + + return OK_ATOM; +} + // Common handling of spawn/1, spawn/3, spawn_opt/2, spawn_opt/4 // opts_term is [] for spawn/1,3 static term do_spawn(Context *ctx, Context *new_ctx, size_t arity, size_t n_freeze, term opts_term) @@ -1424,7 +1471,7 @@ static term do_spawn(Context *ctx, Context *new_ctx, size_t arity, size_t n_free term min_heap_size_term = interop_proplist_get_value(opts_term, MIN_HEAP_SIZE_ATOM); term max_heap_size_term = interop_proplist_get_value(opts_term, MAX_HEAP_SIZE_ATOM); term link_term = interop_proplist_get_value(opts_term, LINK_ATOM); - term monitor_term = interop_proplist_get_value(opts_term, MONITOR_ATOM); + term monitor_term = interop_proplist_get_value_default(opts_term, MONITOR_ATOM, term_invalid_term()); term heap_growth_strategy = interop_proplist_get_value_default(opts_term, ATOMVM_HEAP_GROWTH_ATOM, BOUNDED_FREE_ATOM); term request_term = interop_proplist_get_value_default(opts_term, REQUEST_ATOM, UNDEFINED_ATOM); term group_leader; @@ -1494,59 +1541,121 @@ static term do_spawn(Context *ctx, Context *new_ctx, size_t arity, size_t n_free context_destroy(new_ctx); RAISE_ERROR(BADARG_ATOM); } - uint64_t ref_ticks = 0; + RefData ref_data; + bool is_spawn_monitor = false; + bool is_alias = false; + context_monitor_alias_type_t alias_type; term new_pid = term_from_local_process_id(new_ctx->process_id); + // Do every fallible step before publishing any side effect: destroying a never-published + // new_ctx would send the caller a spurious {'EXIT', Pid, normal} for a spawn that raised. + struct Monitor *new_link = NULL; + struct Monitor *self_link = NULL; + struct Monitor *alias_monitor = NULL; + struct Monitor *new_monitor = NULL; + struct Monitor *self_monitor = NULL; + if (link_term == TRUE_ATOM) { - // We can call context_add_monitor directly on new process because it's not started yet - struct Monitor *new_link = monitor_link_new(term_from_local_process_id(ctx->process_id)); + new_link = monitor_link_new(term_from_local_process_id(ctx->process_id)); if (IS_NULL_PTR(new_link)) { context_destroy(new_ctx); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } - struct Monitor *self_link = monitor_link_new(new_pid); + self_link = monitor_link_new(new_pid); if (IS_NULL_PTR(self_link)) { - free(new_link); + monitor_destroy(new_link); context_destroy(new_ctx); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } - context_add_monitor(new_ctx, new_link); - context_add_monitor(ctx, self_link); } if (monitor_term == TRUE_ATOM) { - // We can call context_add_monitor directly on new process because it's not started yet - ref_ticks = globalcontext_get_ref_ticks(ctx->global); - struct Monitor *new_monitor = monitor_new(term_from_local_process_id(ctx->process_id), ref_ticks, false); + monitor_term = term_nil(); + } + if (term_is_list(monitor_term)) { + is_spawn_monitor = true; + + if (UNLIKELY(term_is_invalid_term(parse_monitor_opts(ctx, monitor_term, &is_alias, &alias_type)))) { + monitor_destroy(new_link); + monitor_destroy(self_link); + context_destroy(new_ctx); + return term_invalid_term(); + } + if (is_alias) { + ref_data = (RefData){ .ref_ticks = globalcontext_get_ref_ticks(ctx->global), .process_id = ctx->process_id }; + alias_monitor = monitor_alias_new(ref_data.ref_ticks, alias_type); + if (IS_NULL_PTR(alias_monitor)) { + monitor_destroy(new_link); + monitor_destroy(self_link); + context_destroy(new_ctx); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } + } else { + ref_data = (RefData){ .ref_ticks = globalcontext_get_ref_ticks(ctx->global), .process_id = INVALID_PROCESS_ID }; + } + + new_monitor = monitor_new(term_from_local_process_id(ctx->process_id), ref_data.ref_ticks, + is_alias ? CONTEXT_MONITOR_MONITORED_LOCAL_ALIAS : CONTEXT_MONITOR_MONITORED_LOCAL); if (IS_NULL_PTR(new_monitor)) { + monitor_destroy(new_link); + monitor_destroy(self_link); + monitor_destroy(alias_monitor); context_destroy(new_ctx); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } - struct Monitor *self_monitor = monitor_new(new_pid, ref_ticks, true); + self_monitor = monitor_new(new_pid, ref_data.ref_ticks, CONTEXT_MONITOR_MONITORING_LOCAL); if (IS_NULL_PTR(self_monitor)) { - free(new_monitor); + monitor_destroy(new_link); + monitor_destroy(self_link); + monitor_destroy(alias_monitor); + monitor_destroy(new_monitor); context_destroy(new_ctx); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } - context_add_monitor(new_ctx, new_monitor); - context_add_monitor(ctx, self_monitor); - } - if (ref_ticks) { - int res_size = REF_SIZE + TUPLE_SIZE(2); + // Reserve before publishing (see above). GC here is safe: new_pid and ref_data are immediates. + int res_size = TERM_BOXED_REFERENCE_PROCESS_SIZE + TUPLE_SIZE(2); if (UNLIKELY(memory_ensure_free_opt(ctx, res_size, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + monitor_destroy(new_link); + monitor_destroy(self_link); + monitor_destroy(alias_monitor); + monitor_destroy(self_monitor); + monitor_destroy(new_monitor); context_destroy(new_ctx); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } + } else if (UNLIKELY(!term_is_invalid_term(monitor_term))) { + // {monitor, BadTerm} where BadTerm is neither a list nor 'true': raise badarg like OTP + // instead of spawning an unmonitored process. + monitor_destroy(new_link); + monitor_destroy(self_link); + context_destroy(new_ctx); + RAISE_ERROR(BADARG_ATOM); + } + // Nothing can fail from here on. Publish in order so the entries keep their relative position + // in the monitor lists. context_add_monitor on new_ctx is safe because it is not started yet. + if (new_link != NULL) { + context_add_monitor(new_ctx, new_link); + context_add_monitor(ctx, self_link); + } + if (is_spawn_monitor) { + context_add_monitor(new_ctx, new_monitor); + context_add_monitor(ctx, self_monitor); + if (is_alias) { + context_add_monitor(ctx, alias_monitor); + } + } + + if (is_spawn_monitor) { scheduler_init_ready(new_ctx); - term ref = term_from_ref_ticks(ref_ticks, &ctx->heap); + term ref = term_from_ref_data(&ref_data, &ctx->heap); - term pid_ref_tuple = term_alloc_tuple(2, &ctx->heap); - term_put_tuple_element(pid_ref_tuple, 0, new_pid); - term_put_tuple_element(pid_ref_tuple, 1, ref); + term process_ref_tuple = term_alloc_tuple(2, &ctx->heap); + term_put_tuple_element(process_ref_tuple, 0, new_pid); + term_put_tuple_element(process_ref_tuple, 1, ref); - return pid_ref_tuple; + return process_ref_tuple; } else if (UNLIKELY(valid_request)) { // Handling of spawn_request // spawn_request requires that the reply is enqueued before @@ -1763,9 +1872,16 @@ static term nif_erlang_send_2(Context *ctx, int argc, term argv[]) globalcontext_send_message_nolock(glb, local_process_id, argv[1]); synclist_unlock(&glb->processes_table); - } else { + } else if (UNLIKELY(!term_is_reference(target))) { RAISE_ERROR(BADARG_ATOM); + } else if (term_is_process_reference(target)) { + int32_t process_id = term_process_ref_to_process_id(target); + globalcontext_send_message_to_alias(glb, process_id, target, argv[1]); } + // else: a non-local-process reference is silently dropped, as OTP drops a send to a + // non-active-alias reference. + // TODO: route sends to external references over distribution. Outbound distributed aliases are + // unsupported, so the message is currently lost. return argv[1]; } @@ -1842,7 +1958,7 @@ term nif_erlang_make_ref_0(Context *ctx, int argc, term argv[]) UNUSED(argv); // a ref is 64 bits, hence 8 bytes - if (UNLIKELY(memory_ensure_free_opt(ctx, REF_SIZE, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + if (UNLIKELY(memory_ensure_free_opt(ctx, TERM_BOXED_REFERENCE_SHORT_SIZE, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { RAISE_ERROR(OUT_OF_MEMORY_ATOM); } @@ -4920,17 +5036,22 @@ static term nif_erlang_memory(Context *ctx, int argc, term argv[]) static term nif_erlang_monitor(Context *ctx, int argc, term argv[]) { - UNUSED(argc); term object_type = argv[0]; term target_proc = argv[1]; + term options = argc == 3 ? argv[2] : term_nil(); term target_pid; size_t target_proc_size = 0; + bool is_alias; + context_monitor_alias_type_t alias_type; if (object_type != PROCESS_ATOM && object_type != PORT_ATOM) { RAISE_ERROR(BADARG_ATOM); } + if (UNLIKELY(term_is_invalid_term(parse_monitor_opts(ctx, options, &is_alias, &alias_type)))) { + return term_invalid_term(); + } if (term_is_atom(target_proc)) { target_pid = globalcontext_get_registered_process(ctx->global, term_to_atom_index(target_proc)); target_proc_size = TUPLE_SIZE(2); @@ -4949,26 +5070,39 @@ static term nif_erlang_monitor(Context *ctx, int argc, term argv[]) target = NULL; } else { local_process_id = term_to_local_process_id(target_pid); - // Monitoring self is possible but no monitor is actually created + // Monitoring self installs nothing, like OTP: no monitor and (with {alias, _}) no alias, so + // sends to the returned ref are dropped and unalias/1, demonitor(Ref, [info]) return false. if (UNLIKELY(local_process_id == ctx->process_id)) { - if (UNLIKELY(memory_ensure_free_opt(ctx, REF_SIZE, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + if (UNLIKELY(memory_ensure_free_opt(ctx, TERM_BOXED_REFERENCE_SHORT_SIZE, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { RAISE_ERROR(OUT_OF_MEMORY_ATOM); } uint64_t ref_ticks = globalcontext_get_ref_ticks(ctx->global); - term ref = term_from_ref_ticks(ref_ticks, &ctx->heap); - return ref; + return term_from_ref_ticks(ref_ticks, &ctx->heap); } target = globalcontext_get_process_lock(ctx->global, local_process_id); } if (IS_NULL_PTR(target)) { - int res_size = REF_SIZE + TUPLE_SIZE(5) + target_proc_size; + int res_size = TERM_BOXED_REFERENCE_PROCESS_SIZE + TUPLE_SIZE(5) + target_proc_size; + RefData ref_data = { .ref_ticks = globalcontext_get_ref_ticks(ctx->global), .process_id = is_alias ? ctx->process_id : INVALID_PROCESS_ID }; + // The monitor is immediately removed by the noproc DOWN, so only an explicit_unalias alias + // stays active (demonitor / reply_demonitor would be deactivated right away, as at a DOWN). + struct Monitor *alias_monitor = NULL; + if (is_alias && alias_type == ContextMonitorAliasExplicitUnalias) { + alias_monitor = monitor_alias_new(ref_data.ref_ticks, alias_type); + if (IS_NULL_PTR(alias_monitor)) { + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } + } if (UNLIKELY(memory_ensure_free_opt(ctx, res_size, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + monitor_destroy(alias_monitor); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } - uint64_t ref_ticks = globalcontext_get_ref_ticks(ctx->global); - term ref = term_from_ref_ticks(ref_ticks, &ctx->heap); + if (alias_monitor != NULL) { + context_add_monitor(ctx, alias_monitor); + } + term ref = term_from_ref_data(&ref_data, &ctx->heap); term down_message_tuple = term_alloc_tuple(5, &ctx->heap); term_put_tuple_element(down_message_tuple, 0, DOWN_ATOM); term_put_tuple_element(down_message_tuple, 1, ref); @@ -4987,36 +5121,63 @@ static term nif_erlang_monitor(Context *ctx, int argc, term argv[]) } if ((object_type == PROCESS_ATOM && target->native_handler != NULL) || (object_type == PORT_ATOM && target->native_handler == NULL)) { + globalcontext_get_process_unlock(ctx->global, target); RAISE_ERROR(BADARG_ATOM); } - uint64_t ref_ticks = globalcontext_get_ref_ticks(ctx->global); - term monitoring_pid = term_from_local_process_id(ctx->process_id); + + RefData ref_data; + struct Monitor *alias_monitor = NULL; + if (is_alias) { + ref_data = (RefData){ .ref_ticks = globalcontext_get_ref_ticks(ctx->global), .process_id = ctx->process_id }; + alias_monitor = monitor_alias_new(ref_data.ref_ticks, alias_type); + if (IS_NULL_PTR(alias_monitor)) { + globalcontext_get_process_unlock(ctx->global, target); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } + } else { + ref_data = (RefData){ .ref_ticks = globalcontext_get_ref_ticks(ctx->global), .process_id = INVALID_PROCESS_ID }; + } struct Monitor *self_monitor; if (term_is_atom(target_proc)) { - self_monitor = monitor_registeredname_monitor_new(local_process_id, target_proc, ref_ticks); + self_monitor = monitor_registeredname_monitor_new(local_process_id, target_proc, ref_data.ref_ticks); } else { - self_monitor = monitor_new(target_pid, ref_ticks, true); + self_monitor = monitor_new(target_pid, ref_data.ref_ticks, CONTEXT_MONITOR_MONITORING_LOCAL); } if (IS_NULL_PTR(self_monitor)) { globalcontext_get_process_unlock(ctx->global, target); + monitor_destroy(alias_monitor); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } - struct Monitor *other_monitor = monitor_new(monitoring_pid, ref_ticks, false); + term monitoring_pid = term_from_local_process_id(ctx->process_id); + struct Monitor *other_monitor = monitor_new(monitoring_pid, ref_data.ref_ticks, + is_alias ? CONTEXT_MONITOR_MONITORED_LOCAL_ALIAS : CONTEXT_MONITOR_MONITORED_LOCAL); if (IS_NULL_PTR(other_monitor)) { - free(self_monitor); + monitor_destroy(alias_monitor); + monitor_destroy(self_monitor); globalcontext_get_process_unlock(ctx->global, target); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } + + // Reserve the result reference *before* publishing any monitor/alias state, so an OOM here + // cannot leave the target with a queued MonitorSignal while the caller gets an exception and + // never receives the reference. GC here is safe: the monitor structs hold only immediates. + if (UNLIKELY(memory_ensure_free_opt(ctx, TERM_BOXED_REFERENCE_PROCESS_SIZE, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + monitor_destroy(alias_monitor); + monitor_destroy(self_monitor); + monitor_destroy(other_monitor); + globalcontext_get_process_unlock(ctx->global, target); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } + mailbox_send_monitor_signal(target, MonitorSignal, other_monitor); globalcontext_get_process_unlock(ctx->global, target); context_add_monitor(ctx, self_monitor); - - if (UNLIKELY(memory_ensure_free_opt(ctx, REF_SIZE, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { - RAISE_ERROR(OUT_OF_MEMORY_ATOM); + if (is_alias) { + context_add_monitor(ctx, alias_monitor); } - return term_from_ref_ticks(ref_ticks, &ctx->heap); + return term_from_ref_data(&ref_data, &ctx->heap); } static term nif_erlang_demonitor(Context *ctx, int argc, term argv[]) @@ -5092,7 +5253,7 @@ static term nif_erlang_link(Context *ctx, int argc, term argv[]) if (UNLIKELY(!context_add_monitor(ctx, self_link))) { globalcontext_get_process_unlock(ctx->global, target); - free(other_link); + monitor_destroy(other_link); return TRUE_ATOM; } @@ -7448,6 +7609,68 @@ static term nif_erlang_crc32_combine_3(Context *ctx, int argc, term argv[]) return make_maybe_boxed_int64(ctx, crc); } +static term nif_erlang_alias(Context *ctx, int argc, term argv[]) +{ + // The reply option reuses the reply_demonitor machinery: with no monitor to remove, the alias + // is deactivated when the first message via it is delivered. + context_monitor_alias_type_t alias_type = ContextMonitorAliasExplicitUnalias; + if (argc == 1) { + term opts = argv[0]; + VALIDATE_VALUE(opts, term_is_list); + while (term_is_nonempty_list(opts)) { + term option = term_get_list_head(opts); + if (option == EXPLICIT_UNALIAS_ATOM) { + alias_type = ContextMonitorAliasExplicitUnalias; + } else if (option == REPLY_ATOM) { + alias_type = ContextMonitorAliasReplyDemonitor; + } else if (UNLIKELY(option == PRIORITY_ATOM)) { + RAISE_ERROR(UNSUPPORTED_ATOM); + } else { + RAISE_ERROR(BADARG_ATOM); + } + opts = term_get_list_tail(opts); + } + if (UNLIKELY(!term_is_nil(opts))) { + RAISE_ERROR(BADARG_ATOM); + } + } + + if (UNLIKELY(memory_ensure_free_opt(ctx, TERM_BOXED_REFERENCE_PROCESS_SIZE, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } + + RefData ref_data = { .ref_ticks = globalcontext_get_ref_ticks(ctx->global), .process_id = ctx->process_id }; + term process_ref = term_from_ref_data(&ref_data, &ctx->heap); + struct Monitor *monitor = monitor_alias_new(ref_data.ref_ticks, alias_type); + if (IS_NULL_PTR(monitor)) { + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } + context_add_monitor(ctx, monitor); + return process_ref; +} + +static term nif_erlang_unalias(Context *ctx, int argc, term argv[]) +{ + UNUSED(argc); + + term process_ref = argv[0]; + VALIDATE_VALUE(process_ref, term_is_reference); + if (UNLIKELY(!term_is_local_reference(process_ref))) { + // An external reference cannot be an alias of the calling process: return false like + // OTP, instead of raising badarg. + return FALSE_ATOM; + } + uint64_t ref_ticks = term_to_ref_ticks(process_ref); + + struct MonitorAlias *alias = context_find_alias(ctx, ref_ticks); + if (IS_NULL_PTR(alias)) { + return FALSE_ATOM; + } else { + context_unalias(ctx, alias); + return TRUE_ATOM; + } +} + #ifdef WITH_ZLIB static term nif_zlib_compress_1(Context *ctx, int argc, term argv[]) { diff --git a/src/libAtomVM/nifs.gperf b/src/libAtomVM/nifs.gperf index c7ce64cf7e..b02cb5ac6a 100644 --- a/src/libAtomVM/nifs.gperf +++ b/src/libAtomVM/nifs.gperf @@ -101,6 +101,7 @@ erlang:make_ref/0, &make_ref_nif erlang:make_tuple/2, &make_tuple_nif erlang:memory/1, &memory_nif erlang:monitor/2, &monitor_nif +erlang:monitor/3, &monitor_nif erlang:demonitor/1, &demonitor_nif erlang:demonitor/2, &demonitor_nif erlang:is_process_alive/1, &is_process_alive_nif @@ -158,6 +159,9 @@ erlang:loaded/0, &erlang_loaded_nif erlang:module_loaded/1,&module_loaded_nif erlang:nif_error/1,&nif_error_nif erlang:list_to_bitstring/1, &list_to_bitstring_nif +erlang:alias/0, &erlang_alias_nif +erlang:alias/1, &erlang_alias_nif +erlang:unalias/1, &erlang_unalias_nif erts_debug:flat_size/1, &flat_size_nif erts_internal:cmp_term/2, &erts_internal_cmp_term_nif file:get_cwd/0, IF_HAVE_GETCWD_PATHMAX(&file_get_cwd_nif) diff --git a/src/libAtomVM/opcodesswitch.h b/src/libAtomVM/opcodesswitch.h index 889ff319a8..2e867a17b6 100644 --- a/src/libAtomVM/opcodesswitch.h +++ b/src/libAtomVM/opcodesswitch.h @@ -788,7 +788,7 @@ static void destroy_extended_registers(Context *ctx, unsigned int live) #define PROCESS_SIGNAL_MESSAGES() \ { \ - MailboxMessage *signal_message = mailbox_process_outer_list(&ctx->mailbox); \ + MailboxMessage *signal_message = mailbox_process_outer_list_with_aliases(ctx); \ bool handle_error = false; \ bool reprocess_outer = false; \ while (signal_message) { \ @@ -905,6 +905,7 @@ static void destroy_extended_registers(Context *ctx, unsigned int live) RESUME(); \ break; \ } \ + case AliasMessageSignal: \ case NormalMessage: { \ UNREACHABLE(); \ } \ @@ -914,7 +915,7 @@ static void destroy_extended_registers(Context *ctx, unsigned int live) signal_message = next; \ if (UNLIKELY(reprocess_outer && signal_message == NULL)) { \ reprocess_outer = false; \ - signal_message = mailbox_process_outer_list(&ctx->mailbox); \ + signal_message = mailbox_process_outer_list_with_aliases(ctx); \ } \ } \ if (context_get_flags(ctx, Killed)) { \ @@ -2374,12 +2375,19 @@ HOT_FUNC int scheduler_entry_point(GlobalContext *glb) int local_process_id; if (term_is_local_pid_or_port(recipient_term)) { local_process_id = term_to_local_process_id(recipient_term); - } else { + TRACE("send/0 target_pid=%i\n", local_process_id); + TRACE_SEND(ctx, x_regs[0], x_regs[1]); + globalcontext_send_message(ctx->global, local_process_id, x_regs[1]); + } else if (UNLIKELY(!term_is_reference(recipient_term))) { RAISE_ERROR(BADARG_ATOM); + } else if (term_is_process_reference(recipient_term)) { + int32_t target_process_id = term_process_ref_to_process_id(recipient_term); + TRACE("send/0 target_pid=%i\n", target_process_id); + TRACE_SEND(ctx, x_regs[0], x_regs[1]); + globalcontext_send_message_to_alias(ctx->global, target_process_id, recipient_term, x_regs[1]); } - TRACE("send/0 target_pid=%i\n", local_process_id); - TRACE_SEND(ctx, x_regs[0], x_regs[1]); - globalcontext_send_message(ctx->global, local_process_id, x_regs[1]); + // Silently dropped, as OTP does for a send to a non-active-alias reference. + // Outbound distributed aliases are unsupported. x_regs[0] = x_regs[1]; } break; diff --git a/src/libAtomVM/otp_socket.c b/src/libAtomVM/otp_socket.c index 4195a96241..a561e0392d 100644 --- a/src/libAtomVM/otp_socket.c +++ b/src/libAtomVM/otp_socket.c @@ -246,7 +246,7 @@ static const AtomStringIntPair otp_socket_setopt_level_table[] = { static ErlNifResourceType *socket_resource_type; -#define SOCKET_MAKE_SELECT_NOTIFICATION_SIZE (TUPLE_SIZE(4) + REF_SIZE + TUPLE_SIZE(2) + REF_SIZE + TERM_BOXED_REFERENCE_RESOURCE_SIZE) +#define SOCKET_MAKE_SELECT_NOTIFICATION_SIZE (TUPLE_SIZE(4) + TERM_BOXED_REFERENCE_SHORT_SIZE + TUPLE_SIZE(2) + TERM_BOXED_REFERENCE_SHORT_SIZE + TERM_BOXED_REFERENCE_RESOURCE_SIZE) static term socket_make_select_notification(struct SocketResource *rsrc_obj, Heap *heap); // @@ -644,7 +644,7 @@ static term nif_socket_open(Context *ctx, int argc, term argv[]) term obj = term_from_resource(rsrc_obj, &ctx->heap); enif_release_resource(rsrc_obj); // decrement refcount after enif_alloc_resource - size_t requested_size = TUPLE_SIZE(2) + TUPLE_SIZE(2) + REF_SIZE; + size_t requested_size = TUPLE_SIZE(2) + TUPLE_SIZE(2) + TERM_BOXED_REFERENCE_SHORT_SIZE; if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, &obj, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); RAISE_ERROR(OUT_OF_MEMORY_ATOM); @@ -695,7 +695,7 @@ bool term_is_otp_socket(term socket_term) static int send_closed_notification(Context *ctx, term socket_term, int32_t selecting_process_id, struct SocketResource *rsrc_obj) { // send a {'$socket', Socket, abort, {Ref | undefined, closed}} message to the pid - if (UNLIKELY(memory_ensure_free_with_roots(ctx, TUPLE_SIZE(4) + TUPLE_SIZE(2) + REF_SIZE, 1, &socket_term, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + if (UNLIKELY(memory_ensure_free_with_roots(ctx, TUPLE_SIZE(4) + TUPLE_SIZE(2) + TERM_BOXED_REFERENCE_SHORT_SIZE, 1, &socket_term, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); return -1; } @@ -1032,6 +1032,10 @@ static term nif_socket_select_read(Context *ctx, int argc, term argv[]) term select_ref_term = argv[1]; if (select_ref_term != UNDEFINED_ATOM) { VALIDATE_VALUE(select_ref_term, term_is_local_reference); + // Unlike the BEAM, AtomVM does not support a process alias as a select handle. + if (UNLIKELY(term_is_process_reference(select_ref_term))) { + RAISE_ERROR(BADARG_ATOM); + } } struct SocketResource *rsrc_obj; if (UNLIKELY(!term_to_otp_socket(argv[0], &rsrc_obj, ctx))) { @@ -1814,7 +1818,7 @@ static term nif_socket_listen(Context *ctx, int argc, term argv[]) #if OTP_SOCKET_LWIP static term make_accepted_socket_term(Context *ctx, struct SocketResource *conn_rsrc_obj) { - if (UNLIKELY(memory_ensure_free(ctx, TERM_BOXED_REFERENCE_RESOURCE_SIZE + TUPLE_SIZE(2) + REF_SIZE) != MEMORY_GC_OK)) { + if (UNLIKELY(memory_ensure_free(ctx, TERM_BOXED_REFERENCE_RESOURCE_SIZE + TUPLE_SIZE(2) + TERM_BOXED_REFERENCE_SHORT_SIZE) != MEMORY_GC_OK)) { RAISE_ERROR(OUT_OF_MEMORY_ATOM); } term obj = term_from_resource(conn_rsrc_obj, &ctx->heap); @@ -1909,7 +1913,7 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[]) term new_resource = term_from_resource(conn_rsrc_obj, &ctx->heap); enif_release_resource(conn_rsrc_obj); // decrement refcount after enif_alloc_resource - size_t requested_size = TUPLE_SIZE(2) + TUPLE_SIZE(2) + REF_SIZE; + size_t requested_size = TUPLE_SIZE(2) + TUPLE_SIZE(2) + TERM_BOXED_REFERENCE_SHORT_SIZE; if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, &new_resource, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); RAISE_ERROR(OUT_OF_MEMORY_ATOM); @@ -1940,7 +1944,7 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[]) SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } - size_t requested_size = TERM_BOXED_REFERENCE_RESOURCE_SIZE + TUPLE_SIZE(2) + TUPLE_SIZE(2) + REF_SIZE; + size_t requested_size = TERM_BOXED_REFERENCE_RESOURCE_SIZE + TUPLE_SIZE(2) + TUPLE_SIZE(2) + TERM_BOXED_REFERENCE_SHORT_SIZE; if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, argv, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); LWIP_END(); @@ -2319,7 +2323,7 @@ static term nif_socket_recv_lwip(Context *ctx, term resource_term, struct Socket } size_t ensure_packet_avail = term_binary_heap_size(buffer_size); - size_t requested_size = REF_SIZE + 2 * TUPLE_SIZE(2) + ensure_packet_avail + (is_recvfrom ? (TUPLE_SIZE(2) + INET_ADDR4_TUPLE_SIZE + TERM_MAP_SIZE(2)) : 0); + size_t requested_size = TERM_BOXED_REFERENCE_SHORT_SIZE + 2 * TUPLE_SIZE(2) + ensure_packet_avail + (is_recvfrom ? (TUPLE_SIZE(2) + INET_ADDR4_TUPLE_SIZE + TERM_MAP_SIZE(2)) : 0); // Because resource is locked, we must ensure it's not garbage collected if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, &resource_term, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); diff --git a/src/libAtomVM/resources.c b/src/libAtomVM/resources.c index 8ebabb5166..87ca0b5eaa 100644 --- a/src/libAtomVM/resources.c +++ b/src/libAtomVM/resources.c @@ -238,6 +238,10 @@ int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, if (UNLIKELY(mode & (ERL_NIF_SELECT_READ | ERL_NIF_SELECT_WRITE) && !term_is_local_reference(ref) && ref != UNDEFINED_ATOM)) { return ERL_NIF_SELECT_BADARG; } + // Unlike the BEAM, AtomVM does not support a process alias as an enif_select ref. + if (UNLIKELY(term_is_process_reference(ref))) { + return ERL_NIF_SELECT_BADARG; + } return enif_select_common(env, event, mode, obj, pid, ref, NULL); } diff --git a/src/libAtomVM/resources.h b/src/libAtomVM/resources.h index 5c95a5c4ed..58c97cf9b6 100644 --- a/src/libAtomVM/resources.h +++ b/src/libAtomVM/resources.h @@ -164,7 +164,7 @@ void select_event_count_and_destroy_closed(struct ListHead *select_events, size_ */ void destroy_resource_monitors(struct RefcBinary *resource, GlobalContext *global); -#define SELECT_EVENT_NOTIFICATION_SIZE (TUPLE_SIZE(4) + REF_SIZE + TERM_BOXED_REFERENCE_RESOURCE_SIZE) +#define SELECT_EVENT_NOTIFICATION_SIZE (TUPLE_SIZE(4) + TERM_BOXED_REFERENCE_SHORT_SIZE + TERM_BOXED_REFERENCE_RESOURCE_SIZE) /** * @brief Build a select event notification. diff --git a/src/libAtomVM/scheduler.c b/src/libAtomVM/scheduler.c index a856d60012..f7b3e800ec 100644 --- a/src/libAtomVM/scheduler.c +++ b/src/libAtomVM/scheduler.c @@ -137,6 +137,7 @@ static void scheduler_process_native_signal_messages(Context *ctx) case UnlinkRemoteIDSignal: // ports can't be part of distributed links case UnlinkRemoteIDAckSignal: // id. case CodeServerResumeSignal: // ports do not wait for code server + case AliasMessageSignal: // ports cannot own aliases break; case NormalMessage: { UNREACHABLE(); @@ -289,6 +290,8 @@ Context *scheduler_run(GlobalContext *global) if (UNLIKELY(result->flags & Killed)) { SMP_SPINLOCK_LOCK(&global->processes_spinlock); list_remove(&result->processes_list_head); + // Reset the queue item so the dequeue in context_destroy is a no-op. + list_init(&result->processes_list_head); SMP_SPINLOCK_UNLOCK(&global->processes_spinlock); context_destroy(result); } else { @@ -429,6 +432,9 @@ void scheduler_terminate(Context *ctx) SMP_SPINLOCK_LOCK(&ctx->global->processes_spinlock); context_update_flags(ctx, ~NoFlags, Killed); list_remove(&ctx->processes_list_head); + // Reset the queue item so the dequeue in context_destroy is a no-op. For a leader process, + // context_destroy is called after the scheduler loop returns. + list_init(&ctx->processes_list_head); SMP_SPINLOCK_UNLOCK(&ctx->global->processes_spinlock); if (!ctx->leader) { context_destroy(ctx); diff --git a/src/libAtomVM/term.c b/src/libAtomVM/term.c index f8b1a0cce3..03a1cadf47 100644 --- a/src/libAtomVM/term.c +++ b/src/libAtomVM/term.c @@ -405,6 +405,12 @@ int term_funprint(PrinterFun *fun, term t, const GlobalContext *global) uint64_t resource_ptr = (uintptr_t) refc_binary->data; return fun->print(fun, "#Ref<0.%" PRIu32 ".%" PRIu32 ".%" PRIu32 ".%" PRIu32 ">", (uint32_t) (resource_type_ptr >> 32), (uint32_t) resource_type_ptr, (uint32_t) (resource_ptr >> 32), (uint32_t) resource_ptr); + } else if (term_is_process_reference(t)) { + int32_t process_id = term_process_ref_to_process_id(t); + uint64_t ref_ticks = term_to_ref_ticks(t); + + // Update also REF_AS_CSTRING_LEN when changing this format string + return fun->print(fun, "#Ref<%" PRId32 ".%" PRIu32 ".%" PRIu32 ">", process_id, (uint32_t) (ref_ticks >> 32), (uint32_t) ref_ticks); } else if (term_is_local_reference(t)) { // Update also REF_AS_CSTRING_LEN when changing this format string uint64_t ref_ticks = term_to_ref_ticks(t); @@ -676,11 +682,15 @@ TermCompareResult term_compare(term t, term other, TermCompareOpts opts, GlobalC uint32_t len, other_len; if (term_is_resource_reference(t)) { len = 4; + } else if (term_is_process_reference(t)) { + len = 3; } else { len = 2; } if (term_is_resource_reference(other)) { other_len = 4; + } else if (term_is_process_reference(other)) { + other_len = 3; } else { other_len = 2; } @@ -694,6 +704,15 @@ TermCompareResult term_compare(term t, term other, TermCompareOpts opts, GlobalC int64_t other_ticks = term_to_ref_ticks(other); other_data[0] = other_ticks >> 32; other_data[1] = (uint32_t) other_ticks; + } else if (len == 3) { + int64_t t_ticks = term_to_ref_ticks(t); + data[0] = t_ticks >> 32; + data[1] = (uint32_t) t_ticks; + data[2] = term_process_ref_to_process_id(t); + int64_t other_ticks = term_to_ref_ticks(other); + other_data[0] = other_ticks >> 32; + other_data[1] = (uint32_t) other_ticks; + other_data[2] = term_process_ref_to_process_id(other); } else { // len == 4 struct RefcBinary *refc = term_resource_refc_binary_ptr(t); @@ -736,6 +755,8 @@ TermCompareResult term_compare(term t, term other, TermCompareOpts opts, GlobalC len = term_get_external_reference_len(t); } else if (term_is_resource_reference(t)) { len = 4; + } else if (term_is_process_reference(t)) { + len = 3; } else { len = 2; } @@ -743,6 +764,8 @@ TermCompareResult term_compare(term t, term other, TermCompareOpts opts, GlobalC other_len = term_get_external_reference_len(other); } else if (term_is_resource_reference(other)) { other_len = 4; + } else if (term_is_process_reference(other)) { + other_len = 3; } else { other_len = 2; } @@ -767,6 +790,26 @@ TermCompareResult term_compare(term t, term other, TermCompareOpts opts, GlobalC local_data[1] = (uint32_t) ref_ticks; other_data = local_data; } + } else if (len == 3) { + uint32_t local_data[3]; + if (term_is_external(t)) { + data = term_get_external_reference_words(t); + } else { + int64_t ref_ticks = term_to_ref_ticks(t); + local_data[0] = ref_ticks >> 32; + local_data[1] = (uint32_t) ref_ticks; + local_data[2] = term_process_ref_to_process_id(t); + data = local_data; + } + if (term_is_external(other)) { + other_data = term_get_external_reference_words(other); + } else { + int64_t ref_ticks = term_to_ref_ticks(other); + local_data[0] = ref_ticks >> 32; + local_data[1] = (uint32_t) ref_ticks; + local_data[2] = term_process_ref_to_process_id(other); + other_data = local_data; + } } else { // len == 4 (one is a resource) uint32_t local_data[4]; diff --git a/src/libAtomVM/term.h b/src/libAtomVM/term.h index df1d411354..1175d7f9dc 100644 --- a/src/libAtomVM/term.h +++ b/src/libAtomVM/term.h @@ -151,7 +151,20 @@ extern "C" { #define BOXED_INT64_SIZE (BOXED_TERMS_REQUIRED_FOR_INT64 + 1) #define BOXED_FUN_SIZE 3 #define FLOAT_SIZE (sizeof(float_term_t) / sizeof(term) + 1) -#define REF_SIZE ((int) ((sizeof(uint64_t) / sizeof(term)) + 1)) +// Reference types are distinguished by their size. +// If you change a reference size, make sure it doesn't +// conflict with other reference sizes on all architectures. +#define TERM_BOXED_REFERENCE_SHORT_SIZE ((int) ((sizeof(uint64_t) / sizeof(term)) + 1)) +#define REF_SIZE _Pragma("GCC warning \"REF_SIZE is deprecated, use TERM_BOXED_REFERENCE_SHORT_SIZE instead\"") TERM_BOXED_REFERENCE_SHORT_SIZE +#if TERM_BYTES == 8 +#define TERM_BOXED_REFERENCE_PROCESS_SIZE (TERM_BOXED_REFERENCE_SHORT_SIZE + 1) +#else +// Enough size would be 3 + 1, but that is the resource reference size on 32-bit. Pad the +// process reference instead of the resource reference: process references only exist when +// aliases are used, while resource references are everywhere on the embedded targets. +#define TERM_BOXED_REFERENCE_PROCESS_SIZE (TERM_BOXED_REFERENCE_SHORT_SIZE + 2) +#endif +#define TERM_BOXED_REFERENCE_PROCESS_HEADER (((TERM_BOXED_REFERENCE_PROCESS_SIZE - 1) << 6) | TERM_BOXED_REF) #if TERM_BYTES == 8 #define EXTERNAL_PID_SIZE 3 #elif TERM_BYTES == 4 @@ -167,10 +180,24 @@ extern "C" { #else #error #endif +#define EXTERNAL_REF_MAX_WORDS 5 +#define TERM_BOXED_REFERENCE_MAX_SIZE EXTERNAL_REF_SIZE(EXTERNAL_REF_MAX_WORDS) +_Static_assert(TERM_BOXED_REFERENCE_SHORT_SIZE != TERM_BOXED_REFERENCE_PROCESS_SIZE, "Short ref size must differ from process ref size"); +_Static_assert(TERM_BOXED_REFERENCE_SHORT_SIZE != TERM_BOXED_REFERENCE_RESOURCE_SIZE, "Short ref size must differ from reference resource size"); +_Static_assert(TERM_BOXED_REFERENCE_PROCESS_SIZE != TERM_BOXED_REFERENCE_RESOURCE_SIZE, "Process ref size must differ from reference resource size"); +_Static_assert(TERM_BOXED_REFERENCE_PROCESS_SIZE <= TERM_BOXED_REFERENCE_MAX_SIZE, "Max ref size can't be smaller than all other ref sizes"); +_Static_assert(TERM_BOXED_REFERENCE_RESOURCE_SIZE <= TERM_BOXED_REFERENCE_MAX_SIZE, "Max ref size can't be smaller than all other ref sizes"); #define TUPLE_SIZE(elems) ((int) (elems + 1)) #define CONS_SIZE 2 #define REFC_BINARY_CONS_OFFSET 4 #define REFERENCE_RESOURCE_CONS_OFFSET 2 + +#if TERM_BYTES == 4 +#define REFERENCE_PROCESS_PID_OFFSET 3 +#elif TERM_BYTES == 8 +#define REFERENCE_PROCESS_PID_OFFSET 2 +#endif + #define LIST_SIZE(num_elements, element_size) ((num_elements) * ((element_size) + CONS_SIZE)) #define TERM_STRING_SIZE(length) (2 * (length)) #define TERM_MAP_SIZE(num_elements) (3 + 2 * (num_elements)) @@ -179,6 +206,8 @@ extern "C" { #define LIST_HEAD_INDEX 1 #define LIST_TAIL_INDEX 0 +#define INVALID_PROCESS_ID 0 + #define TERM_BINARY_SIZE_IS_HEAP(size) ((size) < REFC_BINARY_MIN) #if TERM_BYTES == 4 @@ -210,6 +239,9 @@ extern "C" { // Local ref is at most 30 bytes: // 2^32-1 = 4294967295 (10 chars) // "#Ref<0." "." ">\0" (10 chars) +// Process ref is at most 39 bytes: +// 2^32-1 = 4294967295 (10 chars) +// "#Ref<" "." "." ">\0" (9 chars) // Resource ref is at most 52 bytes: // 2^32-1 = 4294967295 (10 chars) // "#Ref<0." "." "." "." ">\0" (12 chars) @@ -243,6 +275,13 @@ extern "C" { typedef struct GlobalContext GlobalContext; #endif +typedef struct RefData RefData; +struct RefData +{ + uint64_t ref_ticks; + int32_t process_id; +}; + typedef struct PrinterFun PrinterFun; typedef int (*printer_function_t)(PrinterFun *fun, const char *fmt, ...) PRINTF_FORMAT_ARGS(2, 3); @@ -915,6 +954,25 @@ static inline bool term_is_local_reference(term t) return false; } +/** + * @brief Checks if a term is a process reference + * + * @details See \c term_make_process_reference(). + * @param t the term that will be checked. + * @return \c true if check succeeds, \c false otherwise. + */ +static inline bool term_is_process_reference(term t) +{ + if (term_is_boxed(t)) { + const term *boxed_value = term_to_const_term_ptr(t); + if (boxed_value[0] == TERM_BOXED_REFERENCE_PROCESS_HEADER) { + return true; + } + } + + return false; +} + /** * @brief Checks if a term is an external reference * @@ -2166,8 +2224,8 @@ static inline int term_bs_insert_binary(term t, int offset, term src, int n) */ static inline term term_from_ref_ticks(uint64_t ref_ticks, Heap *heap) { - term *boxed_value = memory_heap_alloc(heap, REF_SIZE); - boxed_value[0] = ((REF_SIZE - 1) << 6) | TERM_BOXED_REF; + term *boxed_value = memory_heap_alloc(heap, TERM_BOXED_REFERENCE_SHORT_SIZE); + boxed_value[0] = ((TERM_BOXED_REFERENCE_SHORT_SIZE - 1) << 6) | TERM_BOXED_REF; #if TERM_BYTES == 8 boxed_value[1] = (term) ref_ticks; @@ -2200,6 +2258,55 @@ static inline uint64_t term_to_ref_ticks(term rt) #endif } +/** + * @brief Creates a process reference + * @details Process reference contains ref_ticks and process_id of a process. + * They are used by process aliases and monitors. The ticks occupy the same + * words as in a short reference, so term_to_ref_ticks works on both shapes. + * + * @param process_id process_id of a process that the reference will identify. + * @param ref_ticks a unique uint64 value that will be used to create ref term. + * @param heap the heap to allocate memory in + * @return a ref term created using given ref ticks. + */ +static inline term term_make_process_reference(int32_t process_id, uint64_t ref_ticks, Heap *heap) +{ + term *boxed_value = memory_heap_alloc(heap, TERM_BOXED_REFERENCE_PROCESS_SIZE); + boxed_value[0] = TERM_BOXED_REFERENCE_PROCESS_HEADER; + +#if TERM_BYTES == 4 + boxed_value[1] = (ref_ticks >> 32); + boxed_value[2] = (ref_ticks & 0xFFFFFFFF); + +#elif TERM_BYTES == 8 + boxed_value[1] = (term) ref_ticks; + +#else +#error "terms must be either 32 or 64 bit wide" +#endif + boxed_value[REFERENCE_PROCESS_PID_OFFSET] = process_id; +#if TERM_BYTES == 4 + // Initialize the trailing padding word so GC copies a defined value instead of uninitialized + // memory. + boxed_value[REFERENCE_PROCESS_PID_OFFSET + 1] = term_nil(); +#endif + + return ((term) boxed_value) | TERM_PRIMARY_BOXED; +} + +/** + * @brief Get the process id out of a process reference + * + * @param rt the process reference term + * @return the process id of the process the reference identifies + */ +static inline int32_t term_process_ref_to_process_id(term rt) +{ + TERM_DEBUG_ASSERT(term_is_process_reference(rt)); + const term *boxed_value = term_to_const_term_ptr(rt); + return (int32_t) boxed_value[REFERENCE_PROCESS_PID_OFFSET]; +} + /** * @brief Make an external pid term from node, process_id, serial and creation * @@ -2336,7 +2443,7 @@ static inline uint64_t term_get_external_port_number(term t) * @param heap the heap to allocate memory in * @return an external heap term created using given parameters. */ -static inline term term_make_external_reference(term node, uint16_t len, uint32_t *data, uint32_t creation, Heap *heap) +static inline term term_make_external_reference(term node, uint16_t len, const uint32_t *data, uint32_t creation, Heap *heap) { TERM_DEBUG_ASSERT(term_is_atom(node)); @@ -2981,6 +3088,24 @@ static inline term term_from_resource(void *resource, Heap *heap) return ret; } +/** + * @brief Create a reference term from a RefData + * @details Builds a short reference when process_id is INVALID_PROCESS_ID, + * and a process reference carrying the owner process id otherwise. + * + * @param ref_data ref ticks and owner process id of the reference + * @param heap the heap to allocate memory in + * @return a reference term created from the given ref data + */ +static inline term term_from_ref_data(const RefData *ref_data, Heap *heap) +{ + if (ref_data->process_id == INVALID_PROCESS_ID) { + return term_from_ref_ticks(ref_data->ref_ticks, heap); + } else { + return term_make_process_reference(ref_data->process_id, ref_data->ref_ticks, heap); + } +} + /** * @brief Get a resource term from a resource type and a serialization reference * number. diff --git a/src/platforms/emscripten/src/lib/websocket_nifs.c b/src/platforms/emscripten/src/lib/websocket_nifs.c index a6d0bc8c89..ab8e084652 100644 --- a/src/platforms/emscripten/src/lib/websocket_nifs.c +++ b/src/platforms/emscripten/src/lib/websocket_nifs.c @@ -95,7 +95,7 @@ static void websocket_down(ErlNifEnv *caller_env, void *obj, ErlNifPid *pid, Erl } } -#define TERM_WEBSOCKET_RESOURCE_SIZE (TERM_BOXED_RESOURCE_SIZE + REF_SIZE + TUPLE_SIZE(3)) +#define TERM_WEBSOCKET_RESOURCE_SIZE (TERM_BOXED_RESOURCE_SIZE + TERM_BOXED_REFERENCE_SHORT_SIZE + TUPLE_SIZE(3)) static term term_make_websocket_resource(struct WebsocketResource *rsrc, Heap *heap) { diff --git a/src/platforms/esp32/components/avm_builtins/adc_driver.c b/src/platforms/esp32/components/avm_builtins/adc_driver.c index 2ad34bccd0..210c8e8b98 100644 --- a/src/platforms/esp32/components/avm_builtins/adc_driver.c +++ b/src/platforms/esp32/components/avm_builtins/adc_driver.c @@ -344,7 +344,7 @@ static term nif_adc_init(Context *ctx, int argc, term argv[]) #endif // {ok, {'$adc', Unit :: resource(), ref()}} - size_t requested_size = TUPLE_SIZE(2) + TUPLE_SIZE(3) + REF_SIZE + TERM_BOXED_REFERENCE_RESOURCE_SIZE; + size_t requested_size = TUPLE_SIZE(2) + TUPLE_SIZE(3) + TERM_BOXED_REFERENCE_SHORT_SIZE + TERM_BOXED_REFERENCE_RESOURCE_SIZE; ESP_LOGD(TAG, "Requesting memory size %u for return message", requested_size); if (UNLIKELY(memory_ensure_free(ctx, requested_size) != MEMORY_GC_OK)) { enif_release_resource(unit_rsrc); @@ -492,7 +492,7 @@ static term nif_adc_acquire(Context *ctx, int argc, term argv[]) chan_rsrc->calibration = calibration; // {ok, {'$adc', resource(), ref()}} - size_t requested_size = TUPLE_SIZE(2) + TUPLE_SIZE(3) + REF_SIZE + TERM_BOXED_REFERENCE_RESOURCE_SIZE; + size_t requested_size = TUPLE_SIZE(2) + TUPLE_SIZE(3) + TERM_BOXED_REFERENCE_SHORT_SIZE + TERM_BOXED_REFERENCE_RESOURCE_SIZE; ESP_LOGD(TAG, "Requesting memory size %u for return message", requested_size); if (UNLIKELY(memory_ensure_free(ctx, requested_size) != MEMORY_GC_OK)) { enif_release_resource(chan_rsrc); diff --git a/src/platforms/esp32/components/avm_builtins/dac_driver.c b/src/platforms/esp32/components/avm_builtins/dac_driver.c index 878cd15454..c06a5777af 100644 --- a/src/platforms/esp32/components/avm_builtins/dac_driver.c +++ b/src/platforms/esp32/components/avm_builtins/dac_driver.c @@ -127,7 +127,7 @@ static term nif_oneshot_new_channel_p(Context *ctx, int argc, term argv[]) enif_release_resource(chan_rsrc); if (!err) { - if (UNLIKELY(memory_ensure_free_with_roots(ctx, TUPLE_SIZE(3) + REF_SIZE + TUPLE_SIZE(2), 1, &chan_obj, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + if (UNLIKELY(memory_ensure_free_with_roots(ctx, TUPLE_SIZE(3) + TERM_BOXED_REFERENCE_SHORT_SIZE + TUPLE_SIZE(2), 1, &chan_obj, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { ESP_LOGE(TAG, "failed to allocate memory for result: %s:%i.", __FILE__, __LINE__); dac_oneshot_del_channel(chan_rsrc->handle); chan_rsrc->handle = NULL; diff --git a/src/platforms/esp32/components/avm_builtins/i2c_resource.c b/src/platforms/esp32/components/avm_builtins/i2c_resource.c index c8f906c85e..08bb9c21dc 100644 --- a/src/platforms/esp32/components/avm_builtins/i2c_resource.c +++ b/src/platforms/esp32/components/avm_builtins/i2c_resource.c @@ -269,7 +269,7 @@ static term nif_i2c_open(Context *ctx, int argc, term argv[]) // // {'$i2c', Resource :: resource(), Ref :: reference()} :: i2c() - size_t requested_size = TUPLE_SIZE(3) + REF_SIZE; + size_t requested_size = TUPLE_SIZE(3) + TERM_BOXED_REFERENCE_SHORT_SIZE; if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, &obj, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { close_i2c_resource(rsrc_obj); ESP_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); diff --git a/src/platforms/esp32/components/avm_builtins/network_driver.c b/src/platforms/esp32/components/avm_builtins/network_driver.c index ad0f3e924b..dfe0fc5941 100644 --- a/src/platforms/esp32/components/avm_builtins/network_driver.c +++ b/src/platforms/esp32/components/avm_builtins/network_driver.c @@ -75,7 +75,7 @@ #define BSSID_SIZE 6 #define TAG "network_driver" -#define PORT_REPLY_SIZE (TUPLE_SIZE(2) + REF_SIZE) +#define PORT_REPLY_SIZE (TUPLE_SIZE(2) + TERM_BOXED_REFERENCE_SHORT_SIZE) static const char *const ap_atom = ATOM_STR("\x2", "ap"); static const char *const ap_channel_atom = ATOM_STR("\xA", "ap_channel"); @@ -1934,7 +1934,7 @@ static NativeHandlerResult consume_mailbox(Context *ctx) default: { ESP_LOGE(TAG, "Unrecognized command: %x", cmd); // {Ref, {error, badarg}} - size_t heap_size = TUPLE_SIZE(2) + REF_SIZE + TUPLE_SIZE(2); + size_t heap_size = TUPLE_SIZE(2) + TERM_BOXED_REFERENCE_SHORT_SIZE + TUPLE_SIZE(2); if (UNLIKELY(memory_ensure_free(ctx, heap_size) != MEMORY_GC_OK)) { ESP_LOGE(TAG, "Unable to allocate heap space for error; no message sent"); return NativeContinue; @@ -1944,7 +1944,7 @@ static NativeHandlerResult consume_mailbox(Context *ctx) } } else { // {Ref, {error, badarg}} - size_t heap_size = TUPLE_SIZE(2) + REF_SIZE + TUPLE_SIZE(2); + size_t heap_size = TUPLE_SIZE(2) + TERM_BOXED_REFERENCE_SHORT_SIZE + TUPLE_SIZE(2); if (UNLIKELY(memory_ensure_free(ctx, heap_size) != MEMORY_GC_OK)) { ESP_LOGE(TAG, "Unable to allocate heap space for error; no message sent"); return NativeContinue; diff --git a/src/platforms/esp32/components/avm_builtins/socket_driver.c b/src/platforms/esp32/components/avm_builtins/socket_driver.c index 99f7933677..9330c69ce1 100644 --- a/src/platforms/esp32/components/avm_builtins/socket_driver.c +++ b/src/platforms/esp32/components/avm_builtins/socket_driver.c @@ -443,7 +443,7 @@ static struct UDPSocketData *udp_socket_data_new(Context *ctx, struct netconn *c } // When this method is called, ensure free was called with REPLY_SIZE -#define REPLY_SIZE (TUPLE_SIZE(2) + REF_SIZE) +#define REPLY_SIZE (TUPLE_SIZE(2) + TERM_BOXED_REFERENCE_SHORT_SIZE) static void do_send_reply(Context *ctx, term reply, uint64_t ref_ticks, int32_t pid) { GlobalContext *glb = ctx->global; diff --git a/src/platforms/esp32/components/avm_builtins/uart_driver.c b/src/platforms/esp32/components/avm_builtins/uart_driver.c index 761370840c..ace2aba894 100644 --- a/src/platforms/esp32/components/avm_builtins/uart_driver.c +++ b/src/platforms/esp32/components/avm_builtins/uart_driver.c @@ -237,7 +237,7 @@ EventListener *uart_interrupt_callback(GlobalContext *glb, EventListener *listen int bin_size = term_binary_heap_size(count); Heap heap; - if (UNLIKELY(memory_init_heap(&heap, bin_size + REF_SIZE + TUPLE_SIZE(2) * 2) != MEMORY_GC_OK)) { + if (UNLIKELY(memory_init_heap(&heap, bin_size + TERM_BOXED_REFERENCE_SHORT_SIZE + TUPLE_SIZE(2) * 2) != MEMORY_GC_OK)) { free(usj_buf); fprintf(stderr, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); AVM_ABORT(); @@ -804,7 +804,7 @@ static NativeHandlerResult uart_driver_consume_mailbox(Context *ctx) int local_pid = term_to_local_process_id(gen_message.pid); if (is_closed) { - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) * 2 + REF_SIZE) != MEMORY_GC_OK)) { + if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) * 2 + TERM_BOXED_REFERENCE_SHORT_SIZE) != MEMORY_GC_OK)) { ESP_LOGE(TAG, "[uart_driver_consume_mailbox] Failed to allocate space for error tuple"); globalcontext_send_message(glb, local_pid, OUT_OF_MEMORY_ATOM); } diff --git a/src/platforms/esp32/components/avm_builtins/usb_cdc_driver.c b/src/platforms/esp32/components/avm_builtins/usb_cdc_driver.c index af5701ad70..365cd6bb14 100644 --- a/src/platforms/esp32/components/avm_builtins/usb_cdc_driver.c +++ b/src/platforms/esp32/components/avm_builtins/usb_cdc_driver.c @@ -169,7 +169,7 @@ EventListener *usb_cdc_interrupt_callback(GlobalContext *glb, EventListener *lis size_t rx_size = 0; esp_err_t ret = tinyusb_cdcacm_read(cdc_data->itf, buf, sizeof(buf), &rx_size); if (ret != ESP_OK || rx_size == 0) { - BEGIN_WITH_STACK_HEAP(REF_SIZE + TUPLE_SIZE(2) * 2, err_heap) + BEGIN_WITH_STACK_HEAP(TERM_BOXED_REFERENCE_SHORT_SIZE + TUPLE_SIZE(2) * 2, err_heap) term ref = term_from_ref_ticks(reader_ref_ticks, &err_heap); term error_tuple = term_alloc_tuple(2, &err_heap); term_put_tuple_element(error_tuple, 0, ERROR_ATOM); @@ -186,7 +186,7 @@ EventListener *usb_cdc_interrupt_callback(GlobalContext *glb, EventListener *lis int local_pid = term_to_local_process_id(reader_pid); Heap heap; - if (UNLIKELY(memory_init_heap(&heap, bin_size + REF_SIZE + TUPLE_SIZE(2) * 2) != MEMORY_GC_OK)) { + if (UNLIKELY(memory_init_heap(&heap, bin_size + TERM_BOXED_REFERENCE_SHORT_SIZE + TUPLE_SIZE(2) * 2) != MEMORY_GC_OK)) { fprintf(stderr, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); globalcontext_send_message(glb, local_pid, OUT_OF_MEMORY_ATOM); return listener; @@ -578,7 +578,7 @@ static void usb_cdc_driver_do_close(Context *ctx, GenMessage gen_message) SMP_MUTEX_UNLOCK(cdc_data->reader_lock); if (pending_reader_pid != term_invalid_term()) { - BEGIN_WITH_STACK_HEAP(REF_SIZE + TUPLE_SIZE(2) * 2, heap) + BEGIN_WITH_STACK_HEAP(TERM_BOXED_REFERENCE_SHORT_SIZE + TUPLE_SIZE(2) * 2, heap) term error_tuple = term_alloc_tuple(2, &heap); term_put_tuple_element(error_tuple, 0, ERROR_ATOM); term_put_tuple_element(error_tuple, 1, globalcontext_make_atom(glb, ATOM_STR("\x6", "closed"))); @@ -623,7 +623,7 @@ static NativeHandlerResult usb_cdc_driver_consume_mailbox(Context *ctx) int local_pid = term_to_local_process_id(gen_message.pid); if (is_closed) { - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) * 2 + REF_SIZE) != MEMORY_GC_OK)) { + if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) * 2 + TERM_BOXED_REFERENCE_SHORT_SIZE) != MEMORY_GC_OK)) { ESP_LOGE(TAG, "Failed to allocate space for error tuple"); globalcontext_send_message(glb, local_pid, OUT_OF_MEMORY_ATOM); } else { diff --git a/src/platforms/rp2/src/lib/networkdriver.c b/src/platforms/rp2/src/lib/networkdriver.c index 84f280fed4..f7ceafbabb 100644 --- a/src/platforms/rp2/src/lib/networkdriver.c +++ b/src/platforms/rp2/src/lib/networkdriver.c @@ -41,7 +41,7 @@ #pragma GCC diagnostic pop -#define PORT_REPLY_SIZE (TUPLE_SIZE(2) + REF_SIZE) +#define PORT_REPLY_SIZE (TUPLE_SIZE(2) + TERM_BOXED_REFERENCE_SHORT_SIZE) #define DEFAULT_HOSTNAME_FMT "atomvm-%02x%02x%02x%02x%02x%02x" #define DEFAULT_HOSTNAME_SIZE (strlen("atomvm-") + 12 + 1) @@ -758,7 +758,7 @@ static NativeHandlerResult consume_mailbox(Context *ctx) default: { // {Ref, {error, badarg}} - size_t heap_size = TUPLE_SIZE(2) + REF_SIZE + TUPLE_SIZE(2); + size_t heap_size = TUPLE_SIZE(2) + TERM_BOXED_REFERENCE_SHORT_SIZE + TUPLE_SIZE(2); if (UNLIKELY(memory_ensure_free(ctx, heap_size) != MEMORY_GC_OK)) { return NativeContinue; } @@ -767,7 +767,7 @@ static NativeHandlerResult consume_mailbox(Context *ctx) } } else { // {Ref, {error, badarg}} - size_t heap_size = TUPLE_SIZE(2) + REF_SIZE + TUPLE_SIZE(2); + size_t heap_size = TUPLE_SIZE(2) + TERM_BOXED_REFERENCE_SHORT_SIZE + TUPLE_SIZE(2); if (UNLIKELY(memory_ensure_free(ctx, heap_size) != MEMORY_GC_OK)) { return NativeContinue; } diff --git a/src/platforms/rp2/src/lib/usb_cdc_driver.c b/src/platforms/rp2/src/lib/usb_cdc_driver.c index 89a11a2bee..2569dda785 100644 --- a/src/platforms/rp2/src/lib/usb_cdc_driver.c +++ b/src/platforms/rp2/src/lib/usb_cdc_driver.c @@ -150,7 +150,7 @@ static EventListener *usb_cdc_listener_handler(GlobalContext *glb, EventListener int bin_size = term_binary_heap_size(rx_size); Heap heap; - if (UNLIKELY(memory_init_heap(&heap, bin_size + REF_SIZE + TUPLE_SIZE(2) * 2) != MEMORY_GC_OK)) { + if (UNLIKELY(memory_init_heap(&heap, bin_size + TERM_BOXED_REFERENCE_SHORT_SIZE + TUPLE_SIZE(2) * 2) != MEMORY_GC_OK)) { fprintf(stderr, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); AVM_ABORT(); } @@ -464,7 +464,7 @@ static void usb_cdc_driver_do_close(Context *ctx, GenMessage gen_message) SMP_MUTEX_UNLOCK(cdc_data->reader_lock); if (pending_reader_pid != term_invalid_term()) { - BEGIN_WITH_STACK_HEAP(REF_SIZE + TUPLE_SIZE(2) * 2, heap) + BEGIN_WITH_STACK_HEAP(TERM_BOXED_REFERENCE_SHORT_SIZE + TUPLE_SIZE(2) * 2, heap) term error_tuple = term_alloc_tuple(2, &heap); term_put_tuple_element(error_tuple, 0, ERROR_ATOM); term_put_tuple_element(error_tuple, 1, globalcontext_make_atom(glb, ATOM_STR("\x6", "closed"))); @@ -508,7 +508,7 @@ static NativeHandlerResult usb_cdc_driver_consume_mailbox(Context *ctx) int local_pid = term_to_local_process_id(gen_message.pid); if (is_closed) { - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) * 2 + REF_SIZE) != MEMORY_GC_OK)) { + if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) * 2 + TERM_BOXED_REFERENCE_SHORT_SIZE) != MEMORY_GC_OK)) { fprintf(stderr, "usb_cdc: Failed to allocate error tuple\n"); globalcontext_send_message(glb, local_pid, OUT_OF_MEMORY_ATOM); } else { diff --git a/src/platforms/stm32/src/lib/usb_cdc_driver.c b/src/platforms/stm32/src/lib/usb_cdc_driver.c index fe53f87431..a5d9af0672 100644 --- a/src/platforms/stm32/src/lib/usb_cdc_driver.c +++ b/src/platforms/stm32/src/lib/usb_cdc_driver.c @@ -119,7 +119,7 @@ static void usb_cdc_check_rx(struct USBCDCData *cdc_data) int bin_size = term_binary_heap_size(rx_size); Heap heap; - if (UNLIKELY(memory_init_heap(&heap, bin_size + REF_SIZE + TUPLE_SIZE(2) * 2) != MEMORY_GC_OK)) { + if (UNLIKELY(memory_init_heap(&heap, bin_size + TERM_BOXED_REFERENCE_SHORT_SIZE + TUPLE_SIZE(2) * 2) != MEMORY_GC_OK)) { fprintf(stderr, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); AVM_ABORT(); } @@ -367,7 +367,7 @@ static void usb_cdc_driver_do_close(Context *ctx, GenMessage gen_message) cdc_data->reader_process_pid = term_invalid_term(); if (pending_reader_pid != term_invalid_term()) { - BEGIN_WITH_STACK_HEAP(REF_SIZE + TUPLE_SIZE(2) * 2, heap) + BEGIN_WITH_STACK_HEAP(TERM_BOXED_REFERENCE_SHORT_SIZE + TUPLE_SIZE(2) * 2, heap) term error_tuple = term_alloc_tuple(2, &heap); term_put_tuple_element(error_tuple, 0, ERROR_ATOM); term_put_tuple_element(error_tuple, 1, globalcontext_make_atom(glb, ATOM_STR("\x6", "closed"))); @@ -436,7 +436,7 @@ static NativeHandlerResult usb_cdc_driver_consume_mailbox(Context *ctx) if (is_closed) { GlobalContext *glb = ctx->global; - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) * 2 + REF_SIZE) != MEMORY_GC_OK)) { + if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) * 2 + TERM_BOXED_REFERENCE_SHORT_SIZE) != MEMORY_GC_OK)) { fprintf(stderr, "usb_cdc: Failed to allocate error tuple\n"); globalcontext_send_message(glb, local_pid, OUT_OF_MEMORY_ATOM); } else { diff --git a/tests/erlang_tests/test_binary_to_term.erl b/tests/erlang_tests/test_binary_to_term.erl index 17d507c4a1..3a5a8007f1 100644 --- a/tests/erlang_tests/test_binary_to_term.erl +++ b/tests/erlang_tests/test_binary_to_term.erl @@ -157,6 +157,7 @@ start() -> ok = test_safe_option(), ok = test_invalid_export_fun_encoding(), ok = test_atom_utf8_ext_node(), + ok = test_encode_process_ref(), 0. test_reverse(T, Interop) -> @@ -995,6 +996,13 @@ test_encode_resource(OTPVersion) -> false = AlteredResource4 =:= Resource, ok. +test_encode_process_ref() -> + ProcessRef = erlang:alias(), + Bin = term_to_binary(ProcessRef), + <<131, 90, _Len:16, 119, 13, "nonode@nohost", 0:32, _/binary>> = Bin, + ProcessRef = binary_to_term(?MODULE:id(Bin)), + ok. + % Verify term_to_binary(binary_to_term(Bin)) is idempotent. binary_to_term_idempotent(Binary, _OTPVersion) -> Term = binary_to_term(Binary), diff --git a/tests/erlang_tests/test_monitor.erl b/tests/erlang_tests/test_monitor.erl index 55b070e74a..7231f7a90e 100644 --- a/tests/erlang_tests/test_monitor.erl +++ b/tests/erlang_tests/test_monitor.erl @@ -34,8 +34,85 @@ start() -> ok = test_monitor_demonitor_from_other(), ok = test_monitor_registered(), ok = test_monitor_registered_noproc(), + + ok = test_alias(), + ok = test_multiple_aliases(), + ok = test_multiple_unaliases(), + ok = test_unalias_from_wrong_process(), + ok = test_monitor_alias_dead_process(), + ok = test_monitor_multiple_aliases_monitors(fun spawn_opt_monitor/2), + ok = test_monitor_multiple_aliases_monitors(fun spawn_and_monitor/2), + ok = test_monitor_alias_demonitor(fun spawn_opt_monitor/2), + ok = test_monitor_alias_demonitor(fun spawn_and_monitor/2), + ok = test_monitor_alias_explicit_unalias(fun spawn_opt_monitor/2), + ok = test_monitor_alias_explicit_unalias(fun spawn_and_monitor/2), + ok = test_monitor_alias_reply_demonitor(fun spawn_opt_monitor/2), + ok = test_monitor_alias_reply_demonitor(fun spawn_and_monitor/2), + ok = test_reply_demonitor_removes_monitor(fun spawn_opt_monitor/2), + ok = test_reply_demonitor_removes_monitor(fun spawn_and_monitor/2), + ok = test_monitor_down_alias(fun spawn_opt_monitor/2), + ok = test_monitor_down_alias(fun spawn_and_monitor/2), + ok = test_monitor_alias_demonitor_deactivates_on_down(fun spawn_opt_monitor/2), + ok = test_monitor_alias_demonitor_deactivates_on_down(fun spawn_and_monitor/2), + ok = test_alias_pid_send_order(), + ok = test_reply_demonitor_same_batch_order(), + ok = test_monitor_alias_noproc_returns_alias(), + ok = test_monitor_alias_self_installs_nothing(), + ok = test_spawn_opt_link_monitor_badarg_is_atomic(), + ok = test_spawn_opt_monitor_non_list_badarg(), + ok = test_monitor_alias_down_before_send_same_batch(), + ok = test_unalias_and_send_non_local_refs(), + ok = test_io_request_alias_reply(), + ok = test_alias_as_key(), + ok = test_monitor_alias_demonitor_flush(), + ok = test_monitor_alias_duplicate_option(), + ok = test_monitor_alias_registered_self_installs_nothing(), + ok = test_alias_1(), + ok = test_alias_reply_mode(), + ok = test_alias_send_after_owner_died(), + ok = test_alias_multi_sender_unalias(), + ok = test_alias_duplicate_options(), + ok = test_unalias_non_reference_badarg(), + ok = test_binary_to_term_invalid_process_ref(), + ok = test_alias_ref_ordering(), 0. +%% An alias sorts after every plain reference whichever was created first. Two +%% owners' aliases are distinct and strictly ordered, but that direction follows +%% the internal pid (not pid term order) and is implementation defined, so it is +%% not pinned here. +test_alias_ref_ordering() -> + R0 = make_ref(), + A0 = erlang:alias(), + true = R0 < A0, + A1 = erlang:alias(), + R1 = make_ref(), + true = R1 < A1, + Ea = erlang:alias(), + Eb = erlang:alias(), + true = Ea < Eb, + true = Eb =:= binary_to_term(term_to_binary(Eb)), + Parent = self(), + Child = spawn_opt( + fun() -> + receive + {get, P} -> P ! {child_alias, erlang:alias()} + end + end, + [] + ), + Child ! {get, Parent}, + ChildAlias = + receive + {child_alias, Ca} -> Ca + after 5000 -> error(child_alias_timeout) + end, + SelfAlias = erlang:alias(), + true = ChildAlias =/= SelfAlias, + true = (ChildAlias < SelfAlias) xor (SelfAlias < ChildAlias), + _ = [erlang:unalias(R) || R <- [A0, A1, Ea, Eb, SelfAlias]], + ok. + test_monitor_normal() -> Pid = spawn_opt(fun() -> normal_loop() end, []), Ref = monitor(process, Pid), @@ -229,7 +306,617 @@ test_monitor_demonitor_from_other() -> end, ok. +test_alias() -> + P = spawn_opt(fun echo_loop/0, []), + Alias = erlang:alias(), + do_test_alias(P, Alias), + P ! quit, + ok. + +test_multiple_aliases() -> + P = spawn_opt(fun echo_loop/0, []), + A1 = erlang:alias(), + A2 = erlang:alias(), + A3 = erlang:alias(), + do_test_alias(P, A1), + do_test_alias(P, A3), + do_test_alias(P, A2), + P ! quit, + ok. + +test_multiple_unaliases() -> + A = erlang:alias(), + true = erlang:unalias(A), + false = erlang:unalias(A), + false = erlang:unalias(A), + ok. + +test_unalias_from_wrong_process() -> + A = erlang:alias(), + TestProcess = self(), + spawn_opt(fun() -> TestProcess ! erlang:unalias(A) end, [link]), + false = recv_one(), + P = spawn_opt(fun echo_loop/0, []), + do_test_alias(P, A), + P ! quit, + ok. + +do_test_alias(P, Alias) -> + do_test_alias(P, Alias, fun erlang:unalias/1). + +do_test_alias(P, Alias, UnaliasFun) -> + Ref = make_ref(), + P ! {{m1, Ref}, Alias}, + {m1, Ref} = recv_one(), + UnaliasFun(Alias), + P ! {{m2, Ref}, Alias}, + P ! {{m3, Ref}, self()}, + {m3, Ref} = recv_one(), + ok. + +test_monitor_alias_demonitor(SpawnFun) -> + {P, Mon} = SpawnFun(fun echo_loop/0, [{alias, demonitor}]), + do_test_alias(P, Mon, fun demonitor/1), + P ! quit, + ok. + +test_monitor_alias_explicit_unalias(SpawnFun) -> + {P, Mon} = SpawnFun(fun echo_loop/0, [{alias, explicit_unalias}]), + P ! {m1, Mon}, + m1 = recv_one(), + demonitor(Mon), + do_test_alias(P, Mon), + P ! quit, + ok. + +test_monitor_alias_reply_demonitor(SpawnFun) -> + {P, Mon} = SpawnFun(fun echo_loop/0, [{alias, reply_demonitor}]), + do_test_alias(P, Mon, fun(_Mon) -> ok end), + P ! quit, + ok. + +test_reply_demonitor_removes_monitor(SpawnFun) -> + {P, Mon} = SpawnFun(fun echo_loop/0, [{alias, reply_demonitor}]), + Ref = make_ref(), + P ! {{reply, Ref}, Mon}, + {reply, Ref} = recv_one(), + %% Monitors fire in installation order, so a stale 'DOWN' from a not-removed monitor + %% would arrive before this fence's. On the BEAM no such 'DOWN' exists. + Fence = monitor(process, P), + P ! quit, + {'DOWN', Fence, process, P, normal} = recv_one(), + ok = assert_no_message(), + ok. + +%% Self-sending to our own alias and only then receiving guarantees both alias signals +%% drain in a single outer-list batch, so this is deterministic on SMP and non-SMP. +test_reply_demonitor_same_batch_order() -> + P = spawn_opt(fun echo_loop/0, []), + Mon = erlang:monitor(process, P, [{alias, reply_demonitor}]), + Mon ! first, + Mon ! second, + first = recv_one(), + ok = assert_no_message(), + P ! quit, + ok. + +test_monitor_down_alias(SpawnFun) -> + {P, Mon} = SpawnFun(fun echo_loop/0, [{alias, demonitor}]), + erlang:unalias(Mon), + P ! {m1, Mon}, + P ! {m2, self()}, + m2 = recv_one(), + P ! quit, + {'DOWN', Mon, process, P, normal} = recv_one(), + ok. + +test_monitor_alias_demonitor_deactivates_on_down(SpawnFun) -> + {P, Mon} = SpawnFun(fun echo_loop/0, [{alias, demonitor}]), + P ! quit, + {'DOWN', Mon, process, P, normal} = recv_one(), + Echo = spawn_opt(fun echo_loop/0, []), + Echo ! {should_drop, Mon}, + %% Sends from one process keep their order, so once the fence reply arrives a dropped + %% alias message can no longer show up afterwards. + Echo ! {fence, self()}, + fence = recv_one(), + ok = assert_no_message(), + Echo ! quit, + ok. + +test_alias_pid_send_order() -> + Parent = self(), + P = spawn_opt( + fun() -> + Alias = erlang:alias(), + Parent ! {ready, self(), Alias}, + receive + A -> Parent ! {got, A} + end, + receive + B -> Parent ! {got, B} + end + end, + [] + ), + {ready, P, Alias} = recv_one(), + Alias ! m1, + P ! m2, + {got, m1} = recv_one(), + {got, m2} = recv_one(), + ok. + +test_monitor_alias_noproc_returns_alias() -> + {P, _} = spawn_opt(fun() -> ok end, [monitor]), + ok = + receive + {'DOWN', _, _, P, _} -> ok + after 5000 -> timeout + end, + Mon = erlang:monitor(process, P, [{alias, explicit_unalias}]), + {'DOWN', Mon, process, P, noproc} = recv_one(), + Echo = spawn_opt(fun echo_loop/0, []), + Echo ! {via_alias, Mon}, + via_alias = recv_one(), + true = erlang:unalias(Mon), + Echo ! quit, + ok. + +test_monitor_alias_self_installs_nothing() -> + Mon = erlang:monitor(process, self(), [{alias, explicit_unalias}]), + Mon ! hello, + ok = assert_no_message(), + false = erlang:unalias(Mon), + false = erlang:demonitor(Mon, [info]), + ok. + +%% The link is installed before the monitor options are parsed, so the badarg must +%% still unwind it: a surviving link would later deliver a spurious {'EXIT', Pid, normal}. +test_spawn_opt_link_monitor_badarg_is_atomic() -> + %% On the BEAM the test process is linked to init, so compare against the initial links + %% instead of []. + {links, LinksBefore} = erlang:process_info(self(), links), + false = erlang:process_flag(trap_exit, true), + ok = + try spawn_opt(fun() -> ok end, [link, {monitor, [bad_option]}]) of + Result -> {unexpected, Result} + catch + error:badarg -> ok + end, + {links, LinksBefore} = erlang:process_info(self(), links), + ok = + receive + Other -> {unexpected_message, Other} + after 200 -> ok + end, + true = erlang:process_flag(trap_exit, false), + ok. + +test_monitor_multiple_aliases_monitors(SpawnFun) -> + {P, Mon1} = SpawnFun(fun echo_loop/0, [{alias, demonitor}]), + Mon2 = erlang:monitor(process, P, [{alias, reply_demonitor}]), + Mon3 = erlang:monitor(process, P, [{alias, explicit_unalias}]), + Mon4 = erlang:monitor(process, P), + A1 = erlang:alias(), + A2 = erlang:alias(), + do_test_alias(P, A2), + do_test_alias(P, Mon3), + do_test_alias(P, A1), + do_test_alias(P, Mon1, fun demonitor/1), + P ! quit, + {'DOWN', Mon2, process, P, normal} = recv_one(), + {'DOWN', Mon3, process, P, normal} = recv_one(), + {'DOWN', Mon4, process, P, normal} = recv_one(), + ok. + +test_monitor_alias_dead_process() -> + {P, Mon0} = spawn_opt(fun() -> ok end, [monitor]), + {'DOWN', Mon0, process, P, normal} = recv_one(), + Mon1 = erlang:monitor(process, P, [{alias, demonitor}]), + {'DOWN', Mon1, process, P, noproc} = recv_one(), + Mon2 = erlang:monitor(process, P, [{alias, reply_demonitor}]), + {'DOWN', Mon2, process, P, noproc} = recv_one(), + Mon3 = erlang:monitor(process, P, [{alias, explicit_unalias}]), + {'DOWN', Mon3, process, P, noproc} = recv_one(), + ok. + +%% A non-list, non-'true' monitor value fails before the monitor-option parser, unlike +%% {monitor, [BadOption]}, so it is exercised separately here. +test_spawn_opt_monitor_non_list_badarg() -> + ok = + try spawn_opt(fun() -> ok end, [{monitor, foo}]) of + R1 -> {unexpected, R1} + catch + error:badarg -> ok + end, + ok = + try spawn_opt(fun() -> ok end, [{monitor, 123}]) of + R2 -> {unexpected, R2} + catch + error:badarg -> ok + end, + ok. + +%% A 'DOWN' that deactivates a {alias, demonitor} alias must drop an alias send that lands +%% in the SAME mailbox drain. The relay sends the alias message only after seeing the owner's +%% 'DOWN', so both reach the owner in one batch. The owner busy-waits on whereis/1 rather than +%% receiving, because a receive would drain its mailbox before the batch is assembled. +%% On a single scheduler the owner may drain the 'DOWN' alone first. The test then passes +%% through the cross-batch deactivation path instead. The same-batch path is reliably +%% exercised only on SMP builds. +test_monitor_alias_down_before_send_same_batch() -> + P = spawn_opt( + fun() -> + receive + quit -> ok + end + end, + [] + ), + %% Monitor P before the relay does, so the owner's 'DOWN' is posted before the relay's. + Mon = erlang:monitor(process, P, [{alias, demonitor}]), + Relay = spawn_opt( + fun() -> + erlang:monitor(process, P), + receive + {'DOWN', _, process, P, _} -> + Mon ! should_drop, + register(down_batch_relay, self()), + receive + release -> ok + end + end + end, + [] + ), + P ! quit, + %% The huge spin bound absorbs valgrind's unfair scheduling, which can starve the relay + %% while this process spins. It must busy-wait: receiving would drain its mailbox. + ok = wait_registered(down_batch_relay, 50000000), + {'DOWN', Mon, process, P, normal} = recv_one(), + ok = assert_no_message(), + Relay ! release, + ok. + +test_unalias_and_send_non_local_refs() -> + %% NEWER_REFERENCE_EXT (90): Len:16, Node atom, Creation:32, Len x 4-byte words. + ExtRef = binary_to_term( + <<131, 90, 2:16/integer-unsigned-big, 119, 3, "x@x", 1:32/integer-unsigned-big, + 1:32/integer-unsigned-big, 2:32/integer-unsigned-big>> + ), + true = is_reference(ExtRef), + false = unalias(ExtRef), + hello = (ExtRef ! hello), + false = unalias(make_ref()), + hello = (make_ref() ! hello), + ok = + receive + Unexpected -> {unexpected_message, Unexpected} + after 100 -> ok + end, + ok. + +%% An alias passed as ReplyAs must come back verbatim. A short reference rebuilt from its +%% ticks would not match it, so the receive below would not fire. +test_io_request_alias_reply() -> + %% On the BEAM the group leader is a full io server. On AtomVM the test process has no + %% group leader, so talk to the console port driver directly. + IoServer = + case erlang:system_info(machine) of + "BEAM" -> group_leader(); + _ -> open_port({spawn, "console"}, []) + end, + Alias = erlang:alias(), + IoServer ! {io_request, self(), Alias, {put_chars, unicode, <<>>}}, + ok = + receive + {io_reply, Alias, ok} -> ok + after 5000 -> io_reply_did_not_match_alias + end, + true = erlang:unalias(Alias), + ok. + +test_alias_as_key() -> + Alias = erlang:alias(), + Plain = make_ref(), + Map = #{Alias => alias_value, Plain => plain_value}, + alias_value = maps:get(Alias, Map), + plain_value = maps:get(Plain, Map), + Tid = ets:new(alias_key_table, []), + true = ets:insert(Tid, {Alias, alias_value}), + true = ets:insert(Tid, {Plain, plain_value}), + [{Alias, alias_value}] = ets:lookup(Tid, Alias), + [{Plain, plain_value}] = ets:lookup(Tid, Plain), + true = ets:delete(Tid, Alias), + [] = ets:lookup(Tid, Alias), + [{Plain, plain_value}] = ets:lookup(Tid, Plain), + true = erlang:unalias(Alias), + ok. + +test_monitor_alias_demonitor_flush() -> + P = spawn_opt( + fun() -> + receive + quit -> ok + end + end, + [] + ), + Mon = erlang:monitor(process, P, [{alias, demonitor}]), + Fence = monitor(process, P), + P ! quit, + %% Mon's 'DOWN' was enqueued first (installation order). This selective receive leaves it + %% queued for the flush below to remove. + ok = + receive + {'DOWN', Fence, process, P, normal} -> ok + after 5000 -> timeout + end, + true = demonitor(Mon, [flush]), + ok = assert_no_message(), + Echo = spawn_opt(fun echo_loop/0, []), + Echo ! {should_drop, Mon}, + Echo ! {fence, self()}, + fence = recv_one(), + ok = assert_no_message(), + Echo ! quit, + ok. + +%% With duplicate {alias, _} options the last one wins, like OTP 29. +test_monitor_alias_duplicate_option() -> + P = spawn_opt(fun echo_loop/0, []), + Mon = erlang:monitor(process, P, [{alias, demonitor}, {alias, explicit_unalias}]), + true = demonitor(Mon), + do_test_alias(P, Mon), + P ! quit, + ok. + +test_monitor_alias_registered_self_installs_nothing() -> + true = register(alias_self_name, self()), + Mon = erlang:monitor(process, alias_self_name, [{alias, explicit_unalias}]), + Mon ! hello, + ok = assert_no_message(), + false = erlang:unalias(Mon), + false = erlang:demonitor(Mon, [info]), + true = unregister(alias_self_name), + ok. + +test_alias_1() -> + A1 = alias([]), + A1 ! x1, + x1 = recv_one(), + true = unalias(A1), + A2 = alias([explicit_unalias]), + A2 ! x2, + x2 = recv_one(), + true = unalias(A2), + ok = + try alias([bogus]) of + R1 -> {unexpected, R1} + catch + error:badarg -> ok + end, + ok = + try alias(explicit_unalias) of + R2 -> {unexpected, R2} + catch + error:badarg -> ok + end, + ok. + +%% A reply alias is deactivated when its first message is delivered, so a second message in +%% the same batch is dropped too, not just delayed. +test_alias_reply_mode() -> + A = alias([reply]), + A ! m1, + A ! m2, + m1 = recv_one(), + ok = assert_no_message(), + A ! m3, + %% A dropped refc binary exercises the mso sweep of the immediately-freed signal. + A ! <<0:1600>>, + ok = assert_no_message(), + false = unalias(A), + ok. + +%% AtomVM assigns process ids monotonically and ref ticks are globally unique, so a dead +%% owner's stale alias stays unmatchable even by a later process. The churn loop spawns such +%% processes to confirm a stale-alias send never surfaces anywhere. +test_alias_send_after_owner_died() -> + Parent = self(), + {P, Fence} = spawn_opt(fun() -> Parent ! {alias, erlang:alias()} end, [monitor]), + {alias, A} = recv_one(), + {'DOWN', Fence, process, P, normal} = recv_one(), + hello = (A ! hello), + ok = churn_and_send_stale(A, 20), + ok = assert_no_message(), + ok. + +churn_and_send_stale(_A, 0) -> + ok; +churn_and_send_stale(A, N) -> + Parent = self(), + {Q, Mon} = spawn_opt(fun() -> stale_alias_probe(Parent) end, [monitor]), + drop = (A ! drop), + Q ! quit, + {'DOWN', Mon, process, Q, normal} = recv_one(), + churn_and_send_stale(A, N - 1). + +%% A stale-alias signal is dropped against this process's empty alias list, so it must never +%% surface here as a plain message. Any non-quit message is reported as a misdelivery. +stale_alias_probe(Parent) -> + receive + quit -> + ok; + Other -> + Parent ! {misdelivered, Other}, + stale_alias_probe(Parent) + end. + +%% Several senders hammer one alias while the owner unaliases mid-stream. Synchronization is +%% per-sender send order plus explicit acks, with no sleeps or timing windows, so the test +%% cannot flake on slow hosts. A sender's alias messages all precede its sent_all fence in the +%% owner's queue, so once the last phase-1 fence is consumed Count1 has counted them all. +test_alias_multi_sender_unalias() -> + NSenders = 4, + NMsgs = 25, + A = erlang:alias(), + Senders = spawn_alias_senders(self(), A, NMsgs, NSenders), + ok = send_to_each(Senders, go), + Count1 = drain_alias_msgs(NSenders, sent_all, 0), + Count1 = NSenders * NMsgs, + ok = send_to_each(Senders, go2), + true = erlang:unalias(A), + Count2 = drain_alias_msgs(NSenders, sent_all2, 0), + true = Count2 =< NSenders * NMsgs, + false = erlang:unalias(A), + dead = (A ! dead), + ok = assert_no_message(), + ok. + +spawn_alias_senders(_Owner, _A, _NMsgs, 0) -> + []; +spawn_alias_senders(Owner, A, NMsgs, K) -> + Pid = spawn_opt(fun() -> alias_sender(Owner, A, NMsgs) end, []), + [Pid | spawn_alias_senders(Owner, A, NMsgs, K - 1)]. + +alias_sender(Owner, A, NMsgs) -> + receive + go -> ok + end, + ok = alias_blast(A, NMsgs), + Owner ! sent_all, + receive + go2 -> ok + end, + ok = alias_blast(A, NMsgs), + Owner ! sent_all2, + ok. + +alias_blast(_A, 0) -> + ok; +alias_blast(A, N) -> + {am, N} = (A ! {am, N}), + alias_blast(A, N - 1). + +send_to_each([], _Msg) -> + ok; +send_to_each([Pid | Rest], Msg) -> + Pid ! Msg, + send_to_each(Rest, Msg). + +drain_alias_msgs(0, _FenceMsg, Count) -> + Count; +drain_alias_msgs(FencesLeft, FenceMsg, Count) -> + case recv_one() of + {am, _} -> drain_alias_msgs(FencesLeft, FenceMsg, Count + 1); + FenceMsg -> drain_alias_msgs(FencesLeft - 1, FenceMsg, Count); + Other -> {unexpected, Other} + end. + +%% With duplicate alias/1 options the last one wins, like OTP 29. Both orders are checked. +test_alias_duplicate_options() -> + A1 = alias([explicit_unalias, reply]), + A1 ! r1, + A1 ! r2, + %% A dropped refc binary exercises the mso sweep on the received-order drop path too. + A1 ! <<0:1600>>, + r1 = recv_one(), + ok = assert_no_message(), + false = unalias(A1), + A2 = alias([reply, explicit_unalias]), + A2 ! e1, + A2 ! e2, + e1 = recv_one(), + e2 = recv_one(), + true = unalias(A2), + ok. + +test_unalias_non_reference_badarg() -> + ok = + try unalias(42) of + R -> {unexpected, R} + catch + error:badarg -> ok + end, + ok. + +%% The owner pid word of a wire-format alias is untrusted input: decoding must reject pid 0 +%% (the short-ref sentinel) and pids above the 28-bit maximum. The BEAM treats reference words +%% as opaque payload, so it decodes the patched binaries as plain references instead of +%% rejecting them. Only AtomVM gives the third word pid semantics, so the test forks on machine. +test_binary_to_term_invalid_process_ref() -> + A = erlang:alias(), + B = term_to_binary(A), + A = binary_to_term(B), + PrefixSize = byte_size(B) - 4, + <> = B, + TooBigPid = 1 bsl 28, + BadZero = <>, + BadBig = <>, + case erlang:system_info(machine) of + "BEAM" -> + true = is_reference(binary_to_term(BadZero)), + true = is_reference(binary_to_term(BadBig)); + _ -> + %% On AtomVM the alias serializes as a len-3 reference whose last word is the pid. + <<131, 90, 3:16, _/binary>> = B, + ok = + try binary_to_term(BadZero) of + R1 -> {unexpected, R1} + catch + error:badarg -> ok + end, + ok = + try binary_to_term(BadBig) of + R2 -> {unexpected, R2} + catch + error:badarg -> ok + end + end, + true = erlang:unalias(A), + ok. + +spawn_opt_monitor(LoopFun, Opts) -> + spawn_opt(LoopFun, [{monitor, Opts}]). + +spawn_and_monitor(LoopFun, Opts) -> + P = spawn_opt(LoopFun, []), + Mon = erlang:monitor(process, P, Opts), + {P, Mon}. + normal_loop() -> receive {Caller, quit} -> Caller ! {self(), finished} end. + +echo_loop() -> + receive + quit -> + ok; + {Msg, ReplyTo} -> + ReplyTo ! Msg, + echo_loop() + end. + +recv_one() -> + receive + Msg -> Msg + after 5000 -> timeout + end. + +%% Only call this once the would-be message is already settled, behind a fence reply or after +%% a same-process send, so the short timeout window is not a race. +assert_no_message() -> + receive + Msg -> {unexpected_message, Msg} + after 100 -> ok + end. + +wait_registered(_Name, 0) -> + timeout; +wait_registered(Name, N) -> + case whereis(Name) of + undefined -> wait_registered(Name, N - 1); + _ -> ok + end. diff --git a/tests/erlang_tests/test_refs_ordering.erl b/tests/erlang_tests/test_refs_ordering.erl index 5071ae5633..f1766506b4 100644 --- a/tests/erlang_tests/test_refs_ordering.erl +++ b/tests/erlang_tests/test_refs_ordering.erl @@ -20,17 +20,17 @@ -module(test_refs_ordering). --export([start/0, sort/1, insert/2, check/2, get_ref/2]). +-export([start/0, sort/1, insert/2, check/2, get_ref/3, make_alias_ref/0]). start() -> - A = get_ref(3, []), - B = get_ref(7, []), - C = get_ref(1, []), - D = get_ref(3, []), - E = get_ref(4, []), + A = get_ref(3, [], fun make_ref/0), + B = get_ref(7, [], fun make_alias_ref/0), + C = get_ref(1, [], fun make_ref/0), + D = get_ref(3, [], fun make_alias_ref/0), + E = get_ref(4, [], fun make_ref/0), Sorted = sort([E, C, D, A, B]), - check(Sorted, [A, B, C, D, E]) + - bool_to_n(Sorted < [make_ref()]) * 2 + + check(Sorted, [A, C, E, B, D]) + + bool_to_n(Sorted < [make_alias_ref()]) * 2 + bool_to_n(Sorted > {make_ref()}) * 4. sort(L) -> @@ -57,12 +57,15 @@ check(T, Expected) when T == Expected -> check(T, Expected) when T /= Expected -> 0. -get_ref(0, Acc) -> +get_ref(0, Acc, _Generator) -> Acc; -get_ref(N, _Acc) -> - get_ref(N - 1, make_ref()). +get_ref(N, _Acc, Generator) -> + get_ref(N - 1, Generator(), Generator). bool_to_n(true) -> 1; bool_to_n(false) -> 0. + +make_alias_ref() -> + erlang:alias(). diff --git a/tests/libs/eavmlib/test_file.erl b/tests/libs/eavmlib/test_file.erl index c69a3a9f24..480e8f3db2 100644 --- a/tests/libs/eavmlib/test_file.erl +++ b/tests/libs/eavmlib/test_file.erl @@ -29,6 +29,7 @@ test() -> HasExecve = atomvm:platform() =/= emscripten, ok = test_basic_file(), ok = test_fifo_select(HasSelect), + ok = test_alias_select_handle_rejected(HasSelect), ok = test_gc(HasSelect), ok = test_crash_no_leak(HasSelect), ok = test_select_with_gone_process(HasSelect), @@ -126,6 +127,25 @@ test_fifo_select(_HasSelect) -> ok = atomvm:posix_close(WrFd), ok = atomvm:posix_unlink(Path). +test_alias_select_handle_rejected(false) -> + ok; +test_alias_select_handle_rejected(_HasSelect) -> + Path = "/tmp/atomvm.tmp." ++ integer_to_list(erlang:system_time(millisecond)), + ok = atomvm:posix_mkfifo(Path, 8#644), + {ok, RdFd} = atomvm:posix_open(Path, [o_rdonly]), + {ok, WrFd} = atomvm:posix_open(Path, [o_wronly]), + Alias = alias(), + ok = + try atomvm:posix_select_write(WrFd, self(), Alias) of + R -> {unexpected, R} + catch + error:badarg -> ok + end, + true = unalias(Alias), + ok = atomvm:posix_close(RdFd), + ok = atomvm:posix_close(WrFd), + ok = atomvm:posix_unlink(Path). + % Test is based on the fact that `erlang:memory(binary)` count resources. test_gc(HasSelect) -> Path = "/tmp/atomvm.tmp." ++ integer_to_list(erlang:system_time(millisecond)), diff --git a/tests/libs/estdlib/test_net_kernel.erl b/tests/libs/estdlib/test_net_kernel.erl index f67f7c5417..209ac58f28 100644 --- a/tests/libs/estdlib/test_net_kernel.erl +++ b/tests/libs/estdlib/test_net_kernel.erl @@ -44,6 +44,7 @@ test() -> ok = test_link_remote_exit_local(Platform), ok = test_link_local_unlink_remote(Platform), ok = test_link_local_unlink_local(Platform), + ok = test_alias_send_from_beam(Platform), ok = test_is_alive(), ok = test_ping_with_avm_dist_opts(Platform), ok; @@ -498,6 +499,32 @@ test_link_local_unlink_local(Platform) -> ok = stop_apply_loop(BeamMainPid, Pid, MonitorRef), ok. +test_alias_send_from_beam(Platform) -> + {BeamMainPid, Pid, MonitorRef} = start_apply_loop(Platform), + Alias = alias(), + {via_alias, Alias} = call_apply_loop( + BeamMainPid, {self(), apply, erlang, send, [Alias, {via_alias, Alias}]} + ), + ok = + receive + {via_alias, Alias} -> ok + after 30000 -> alias_message_timeout + end, + true = unalias(Alias), + should_be_dropped = call_apply_loop( + BeamMainPid, {self(), apply, erlang, send, [Alias, should_be_dropped]} + ), + %% The dropped send precedes this reply on the same connection, so `after 0` is + %% race-free. Match only alias messages: a stale 'EXIT' may be queued on BEAM. + ok = + receive + should_be_dropped -> unexpected_alias_message; + {via_alias, _} = Unexpected -> {unexpected_alias_message, Unexpected} + after 0 -> ok + end, + ok = stop_apply_loop(BeamMainPid, Pid, MonitorRef), + ok. + test_is_alive() -> false = is_alive(), {ok, _NetKernelPid} = net_kernel:start(atomvm, #{name_domain => shortnames}), diff --git a/tests/libs/estdlib/test_udp_socket.erl b/tests/libs/estdlib/test_udp_socket.erl index 359b001bee..ff57169669 100644 --- a/tests/libs/estdlib/test_udp_socket.erl +++ b/tests/libs/estdlib/test_udp_socket.erl @@ -213,6 +213,7 @@ test_nowait() -> ok = test_nowait(fun receive_loop_nowait_ref/2), ok = test_nowait(fun receive_loop_recvfrom_nowait/2), ok = test_nowait(fun receive_loop_recvfrom_nowait_ref/2), + ok = test_alias_select_handle_rejected(), ok. test_nowait(ReceiveFun) -> @@ -267,6 +268,23 @@ receive_loop_nowait_ref(Socket, Packet) -> Error end. +test_alias_select_handle_rejected() -> + case erlang:system_info(machine) of + "BEAM" -> + ok; + _ -> + {ok, Socket} = socket:open(inet, dgram, udp), + Alias = alias(), + ok = + try socket:recv(Socket, 1, Alias) of + R -> {unexpected, R} + catch + error:badarg -> ok + end, + true = unalias(Alias), + ok = socket:close(Socket) + end. + receive_loop_recvfrom_nowait(Socket, Packet) -> case socket:recvfrom(Socket, byte_size(Packet), nowait) of {ok, {_Source, ReceivedPacket}} when ReceivedPacket =:= Packet ->