Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ void loop() {
if (!async_rpc.result(sum)) {
Monitor.println("Error calling method: add");
Monitor.print("Error code: ");
Monitor.println(async_rpc.error.code);
Monitor.println(async_rpc.getErrorCode());
Monitor.print("Error message: ");
Monitor.println(async_rpc.error.traceback);
Monitor.println(async_rpc.getErrorMessage());
}

// Implicit boolean cast. Use with caution as in this case the call is indeed
Expand Down
4 changes: 2 additions & 2 deletions examples/simple_bridge/simple_bridge.ino
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ void loop() {
Serial.print("Result of the operation is: ");
Serial.println(res);
} else {
Serial.println(outcome.error.code);
Serial.println(outcome.error.traceback);
Serial.println(outcome.getErrorCode());
Serial.println(outcome.getErrorMessage());
}

Bridge.notify("signal", 200);
Expand Down
14 changes: 7 additions & 7 deletions examples/test/test.ino
Original file line number Diff line number Diff line change
Expand Up @@ -64,34 +64,34 @@ void loop() {
if (async_res.result(pow)) {
Monitor.println("Result of assignment and then result: "+String(pow)); // returns true, so the right result
} else {
Monitor.println("Error code: "+String(async_res.error.code));
Monitor.println("Error message: "+async_res.error.traceback);
Monitor.println("Error code: "+String(async_res.getErrorCode()));
Monitor.println("Error message: "+async_res.getErrorMessage());
}

float div = 0;
RpcCall async_res1 = Bridge.call("2_args_float_result", 2.0); // passing 1 arg when 2 are expected
if (async_res1.result(div)) {
Monitor.println("Result of assignment and then result: "+String(div)); // returns true, so the right result
} else {
Monitor.println("Error code: "+String(async_res1.error.code));
Monitor.println("Error message: "+async_res1.error.traceback);
Monitor.println("Error code: "+String(async_res1.getErrorCode()));
Monitor.println("Error message: "+async_res1.getErrorMessage());
}

div = 0;
RpcCall async_res2 = Bridge.call("2_args_float_result", 2.0, "invalid"); // passing a wrong type arg
if (async_res2.result(div)) {
Monitor.println("Result of assignment and then result: "+String(div)); // returns true, so the right result
} else {
Monitor.println("Error code: "+String(async_res2.error.code));
Monitor.println("Error message: "+async_res2.error.traceback);
Monitor.println("Error code: "+String(async_res2.getErrorCode()));
Monitor.println("Error message: "+async_res2.getErrorMessage());
}

x = false;
RpcCall async_res3 = Bridge.call("0_args_bool_result");
if (async_res3.result(x)) {
Monitor.println("Result of assignment and then result: "+String(x)); // returns true, so the right result
} else {
Monitor.println("Error expecting bool result: "+String(async_res3.error.code));
Monitor.println("Error expecting bool result: "+String(async_res3.getErrorCode()));
}

// Avoid the following:
Expand Down
15 changes: 15 additions & 0 deletions examples/test_rpc_thread/python/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA <http://www.arduino.cc>
#
# SPDX-License-Identifier: MPL-2.0

import time
from arduino.app_utils import *

led_state = False

def loopback(message):
time.sleep(1)
return message

Bridge.provide("loopback", loopback)
App.run()
115 changes: 115 additions & 0 deletions examples/test_rpc_thread/test_rpc_thread.ino
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#include <Arduino_RouterBridge.h>
#include <zephyr/kernel.h>

// Thread config
#define THREAD_STACK_SIZE 500
#define THREAD_PRIORITY 5


void rpc_thread_entry(void *p1, void *p2, void *p3) {
RpcCall<MsgPack::str_t> *call = reinterpret_cast<RpcCall<MsgPack::str_t>*>(p1);
struct k_mutex *mtx = reinterpret_cast<struct k_mutex*>(p2);

// Give setup() time to complete first result()
k_sleep(K_MSEC(400));

Serial.println("\n--- Second Thread ---");
Serial.println("Calling result() again...");

k_mutex_lock(mtx, K_FOREVER);

MsgPack::str_t msg;
bool ok = call->result(msg);

if (ok) {
Serial.println("ERR - Second call succeeded (unexpected!)");
Serial.print("Message: ");
Serial.println(msg.c_str());
} else {
Serial.println("OK - Second call FAILED as expected (already executed)");
Serial.print("Error Code: 0x");
Serial.println(call->getErrorCode(), HEX);
Serial.print("Error Message: ");
Serial.println(call->getErrorMessage().c_str());
}

k_mutex_unlock(mtx);

Serial.println("--- Second Thread End ---\n");
}


void setup() {
Serial.begin(115200);
k_sleep(K_MSEC(2000));

Serial.println("\n=== Threaded RPC Test ===\n");

Serial.println("*** Main Thread (setup) ***");

Bridge.begin();
Monitor.begin();

static struct k_mutex loop_mtx;
k_mutex_init(&loop_mtx);

RpcCall loopback_call = Bridge.call("loopback", "TEST");

if (loopback_call.isError()) {
Serial.println("OK - RPC call in Error mode before execution");
Serial.print("Error Code: 0x");
Serial.println(loopback_call.getErrorCode(), HEX);
Serial.print("Error Message: ");
Serial.println(loopback_call.getErrorMessage().c_str());
} else {
Serial.println("ERR - RPC call not in Error mode before execution (unexpected)");
}

Serial.println("Waiting for the other side...\n");
delay(2000);

Serial.println("calling .result() on RPC call (main thread)");

MsgPack::str_t msg;
k_mutex_lock(&loop_mtx, K_FOREVER);
bool ok = loopback_call.result(msg);
k_mutex_unlock(&loop_mtx);

if (ok) {
Serial.println("OK - First call succeeded.");
Serial.print("Message: ");
Serial.println(msg.c_str());
} else {
Serial.println("ERR - First call FAILED (unexpected).");
}

// ---- Launch second thread ----
Serial.println("\nStarting second thread...");

struct k_thread rpc_thread;

k_thread_stack_t *rpc_stack_area = k_thread_stack_alloc(THREAD_STACK_SIZE, 0);

k_tid_t rpc_tid = k_thread_create(
&rpc_thread,
rpc_stack_area,
THREAD_STACK_SIZE,
rpc_thread_entry,
&loopback_call, // p1 → RpcCall*
&loop_mtx, // p2 → mutex
NULL,
THREAD_PRIORITY,
0,
K_FOREVER
);

k_thread_start(rpc_tid);
Serial.println("Second thread launched... joining");
k_thread_join(&rpc_thread, K_FOREVER);
Serial.println("*** Main thread end ending setup ***");

}

void loop() {
k_sleep(K_MSEC(5000));
}
52 changes: 45 additions & 7 deletions src/bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,56 @@
#include <zephyr/sys/atomic.h>
#include <Arduino_RPClite.h>

#include <utility>


void updateEntryPoint(void *, void *, void *);

template<typename... Args>
class RpcCall {

RpcError error;

void setError(int code, MsgPack::str_t text) {
k_mutex_lock(&call_mutex, K_FOREVER);
error.code = code;
error.traceback = std::move(text);
k_mutex_unlock(&call_mutex);
}

public:
RpcError error{GENERIC_ERR, "This call is not executed yet"};

RpcCall(const MsgPack::str_t& m, RPCClient* c, struct k_mutex* rm, struct k_mutex* wm, Args&&... args): method(m), client(c), read_mutex(rm), write_mutex(wm), callback_params(std::forward_as_tuple(std::forward<Args>(args)...)) {}
RpcCall(const MsgPack::str_t& m, RPCClient* c, struct k_mutex* rm, struct k_mutex* wm, Args&&... args): method(m), client(c), read_mutex(rm), write_mutex(wm), callback_params(std::forward_as_tuple(std::forward<Args>(args)...)) {
k_mutex_init(&call_mutex);
setError(GENERIC_ERR, "This call is not yet executed");
}

bool isError() {
k_mutex_lock(&call_mutex, K_FOREVER);
const bool out = error.code > NO_ERR;
k_mutex_unlock(&call_mutex);
return out;
}

int getErrorCode() {
k_mutex_lock(&call_mutex, K_FOREVER);
const int out = error.code;
k_mutex_unlock(&call_mutex);
return out;
}

MsgPack::str_t getErrorMessage() {
k_mutex_lock(&call_mutex, K_FOREVER);
MsgPack::str_t out = error.traceback;
k_mutex_unlock(&call_mutex);
return out;
}

template<typename RType> bool result(RType& result) {

if (!atomic_cas(&_executed, 0, 1)){
// this thread lost the race
error.code = GENERIC_ERR;
error.traceback = "This call result is no longer available";
setError(GENERIC_ERR, "This call is no longer available");
return false;
}

Expand All @@ -60,13 +94,15 @@ class RpcCall {

while(true) {
if (k_mutex_lock(read_mutex, K_MSEC(10)) == 0 ) {
if (client->get_response(msg_id_wait, result, error)) {
RpcError temp_err;
if (client->get_response(msg_id_wait, result, temp_err)) {
k_mutex_unlock(read_mutex);
// if (error.code == PARSING_ERR) {
// k_mutex_lock(write_mutex, K_FOREVER);
// client->notify(BRIDGE_ERROR, error.traceback);
// k_mutex_unlock(write_mutex);
// }
setError(temp_err.code, temp_err.traceback);
break;
}
k_mutex_unlock(read_mutex);
Expand All @@ -76,7 +112,7 @@ class RpcCall {
}
}

return error.code == NO_ERR;
return !isError();
}

bool result() {
Expand All @@ -100,6 +136,7 @@ class RpcCall {
RPCClient* client;
struct k_mutex* read_mutex;
struct k_mutex* write_mutex;
struct k_mutex call_mutex{};
std::tuple<Args...> callback_params;
};

Expand Down Expand Up @@ -145,6 +182,8 @@ class BridgeClass {

if (is_started()) return true;

k_mutex_lock(&bridge_mutex, K_FOREVER);

serial_ptr->begin(baud);
transport = new SerialTransport(*serial_ptr);

Expand All @@ -159,7 +198,6 @@ class BridgeClass {
UPDATE_THREAD_PRIORITY, 0, K_NO_WAIT);
k_thread_name_set(upd_tid, "bridge");

k_mutex_lock(&bridge_mutex, K_FOREVER);
bool res = false;
started = call(RESET_METHOD).result(res) && res;
k_mutex_unlock(&bridge_mutex);
Expand Down
Loading
Loading