Skip to content

Commit 9f5f598

Browse files
authored
Merge pull request #39 from arduino-libraries/rpccall_thread_safety
fix: RpcCall error not private and thread safe
2 parents a85adb7 + 91fdc9d commit 9f5f598

File tree

9 files changed

+246
-38
lines changed

9 files changed

+246
-38
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ void loop() {
5757
if (!async_rpc.result(sum)) {
5858
Monitor.println("Error calling method: add");
5959
Monitor.print("Error code: ");
60-
Monitor.println(async_rpc.error.code);
60+
Monitor.println(async_rpc.getErrorCode());
6161
Monitor.print("Error message: ");
62-
Monitor.println(async_rpc.error.traceback);
62+
Monitor.println(async_rpc.getErrorMessage());
6363
}
6464

6565
// Implicit boolean cast. Use with caution as in this case the call is indeed

examples/simple_bridge/simple_bridge.ino

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ void loop() {
5959
Serial.print("Result of the operation is: ");
6060
Serial.println(res);
6161
} else {
62-
Serial.println(outcome.error.code);
63-
Serial.println(outcome.error.traceback);
62+
Serial.println(outcome.getErrorCode());
63+
Serial.println(outcome.getErrorMessage());
6464
}
6565

6666
Bridge.notify("signal", 200);

examples/test/test.ino

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,34 +64,34 @@ void loop() {
6464
if (async_res.result(pow)) {
6565
Monitor.println("Result of assignment and then result: "+String(pow)); // returns true, so the right result
6666
} else {
67-
Monitor.println("Error code: "+String(async_res.error.code));
68-
Monitor.println("Error message: "+async_res.error.traceback);
67+
Monitor.println("Error code: "+String(async_res.getErrorCode()));
68+
Monitor.println("Error message: "+async_res.getErrorMessage());
6969
}
7070

7171
float div = 0;
7272
RpcCall async_res1 = Bridge.call("2_args_float_result", 2.0); // passing 1 arg when 2 are expected
7373
if (async_res1.result(div)) {
7474
Monitor.println("Result of assignment and then result: "+String(div)); // returns true, so the right result
7575
} else {
76-
Monitor.println("Error code: "+String(async_res1.error.code));
77-
Monitor.println("Error message: "+async_res1.error.traceback);
76+
Monitor.println("Error code: "+String(async_res1.getErrorCode()));
77+
Monitor.println("Error message: "+async_res1.getErrorMessage());
7878
}
7979

8080
div = 0;
8181
RpcCall async_res2 = Bridge.call("2_args_float_result", 2.0, "invalid"); // passing a wrong type arg
8282
if (async_res2.result(div)) {
8383
Monitor.println("Result of assignment and then result: "+String(div)); // returns true, so the right result
8484
} else {
85-
Monitor.println("Error code: "+String(async_res2.error.code));
86-
Monitor.println("Error message: "+async_res2.error.traceback);
85+
Monitor.println("Error code: "+String(async_res2.getErrorCode()));
86+
Monitor.println("Error message: "+async_res2.getErrorMessage());
8787
}
8888

8989
x = false;
9090
RpcCall async_res3 = Bridge.call("0_args_bool_result");
9191
if (async_res3.result(x)) {
9292
Monitor.println("Result of assignment and then result: "+String(x)); // returns true, so the right result
9393
} else {
94-
Monitor.println("Error expecting bool result: "+String(async_res3.error.code));
94+
Monitor.println("Error expecting bool result: "+String(async_res3.getErrorCode()));
9595
}
9696

9797
// Avoid the following:
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA <http://www.arduino.cc>
2+
#
3+
# SPDX-License-Identifier: MPL-2.0
4+
5+
import time
6+
from arduino.app_utils import *
7+
8+
led_state = False
9+
10+
def loopback(message):
11+
time.sleep(1)
12+
return message
13+
14+
Bridge.provide("loopback", loopback)
15+
App.run()
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
#include <Arduino_RouterBridge.h>
2+
#include <zephyr/kernel.h>
3+
4+
// Thread config
5+
#define THREAD_STACK_SIZE 500
6+
#define THREAD_PRIORITY 5
7+
8+
9+
void rpc_thread_entry(void *p1, void *p2, void *p3) {
10+
RpcCall<MsgPack::str_t> *call = reinterpret_cast<RpcCall<MsgPack::str_t>*>(p1);
11+
struct k_mutex *mtx = reinterpret_cast<struct k_mutex*>(p2);
12+
13+
// Give setup() time to complete first result()
14+
k_sleep(K_MSEC(400));
15+
16+
Serial.println("\n--- Second Thread ---");
17+
Serial.println("Calling result() again...");
18+
19+
k_mutex_lock(mtx, K_FOREVER);
20+
21+
MsgPack::str_t msg;
22+
bool ok = call->result(msg);
23+
24+
if (ok) {
25+
Serial.println("ERR - Second call succeeded (unexpected!)");
26+
Serial.print("Message: ");
27+
Serial.println(msg.c_str());
28+
} else {
29+
Serial.println("OK - Second call FAILED as expected (already executed)");
30+
Serial.print("Error Code: 0x");
31+
Serial.println(call->getErrorCode(), HEX);
32+
Serial.print("Error Message: ");
33+
Serial.println(call->getErrorMessage().c_str());
34+
}
35+
36+
k_mutex_unlock(mtx);
37+
38+
Serial.println("--- Second Thread End ---\n");
39+
}
40+
41+
42+
void setup() {
43+
Serial.begin(115200);
44+
k_sleep(K_MSEC(2000));
45+
46+
Serial.println("\n=== Threaded RPC Test ===\n");
47+
48+
Serial.println("*** Main Thread (setup) ***");
49+
50+
Bridge.begin();
51+
Monitor.begin();
52+
53+
static struct k_mutex loop_mtx;
54+
k_mutex_init(&loop_mtx);
55+
56+
RpcCall loopback_call = Bridge.call("loopback", "TEST");
57+
58+
if (loopback_call.isError()) {
59+
Serial.println("OK - RPC call in Error mode before execution");
60+
Serial.print("Error Code: 0x");
61+
Serial.println(loopback_call.getErrorCode(), HEX);
62+
Serial.print("Error Message: ");
63+
Serial.println(loopback_call.getErrorMessage().c_str());
64+
} else {
65+
Serial.println("ERR - RPC call not in Error mode before execution (unexpected)");
66+
}
67+
68+
Serial.println("Waiting for the other side...\n");
69+
delay(2000);
70+
71+
Serial.println("calling .result() on RPC call (main thread)");
72+
73+
MsgPack::str_t msg;
74+
k_mutex_lock(&loop_mtx, K_FOREVER);
75+
bool ok = loopback_call.result(msg);
76+
k_mutex_unlock(&loop_mtx);
77+
78+
if (ok) {
79+
Serial.println("OK - First call succeeded.");
80+
Serial.print("Message: ");
81+
Serial.println(msg.c_str());
82+
} else {
83+
Serial.println("ERR - First call FAILED (unexpected).");
84+
}
85+
86+
// ---- Launch second thread ----
87+
Serial.println("\nStarting second thread...");
88+
89+
struct k_thread rpc_thread;
90+
91+
k_thread_stack_t *rpc_stack_area = k_thread_stack_alloc(THREAD_STACK_SIZE, 0);
92+
93+
k_tid_t rpc_tid = k_thread_create(
94+
&rpc_thread,
95+
rpc_stack_area,
96+
THREAD_STACK_SIZE,
97+
rpc_thread_entry,
98+
&loopback_call, // p1 → RpcCall*
99+
&loop_mtx, // p2 → mutex
100+
NULL,
101+
THREAD_PRIORITY,
102+
0,
103+
K_FOREVER
104+
);
105+
106+
k_thread_start(rpc_tid);
107+
Serial.println("Second thread launched... joining");
108+
k_thread_join(&rpc_thread, K_FOREVER);
109+
Serial.println("*** Main thread end ending setup ***");
110+
111+
}
112+
113+
void loop() {
114+
k_sleep(K_MSEC(5000));
115+
}

src/bridge.h

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
#define RESET_METHOD "$/reset"
1818
#define BIND_METHOD "$/register"
19+
#define GET_VERSION_METHOD "$/version"
20+
1921
//#define BRIDGE_ERROR "$/bridgeLog"
2022

2123
#define UPDATE_THREAD_STACK_SIZE 500
@@ -27,22 +29,56 @@
2729
#include <zephyr/sys/atomic.h>
2830
#include <Arduino_RPClite.h>
2931

32+
#include <utility>
33+
3034

3135
void updateEntryPoint(void *, void *, void *);
3236

3337
template<typename... Args>
3438
class RpcCall {
39+
40+
RpcError error;
41+
42+
void setError(int code, MsgPack::str_t text) {
43+
k_mutex_lock(&call_mutex, K_FOREVER);
44+
error.code = code;
45+
error.traceback = std::move(text);
46+
k_mutex_unlock(&call_mutex);
47+
}
48+
3549
public:
36-
RpcError error{GENERIC_ERR, "This call is not executed yet"};
3750

38-
RpcCall(const MsgPack::str_t& m, RPCClient* c, struct k_mutex* rm, struct k_mutex* wm, Args&&... args): method(m), client(c), read_mutex(rm), write_mutex(wm), callback_params(std::forward_as_tuple(std::forward<Args>(args)...)) {}
51+
RpcCall(const MsgPack::str_t& m, RPCClient* c, struct k_mutex* rm, struct k_mutex* wm, Args&&... args): method(m), client(c), read_mutex(rm), write_mutex(wm), callback_params(std::forward_as_tuple(std::forward<Args>(args)...)) {
52+
k_mutex_init(&call_mutex);
53+
setError(GENERIC_ERR, "This call is not yet executed");
54+
}
55+
56+
bool isError() {
57+
k_mutex_lock(&call_mutex, K_FOREVER);
58+
const bool out = error.code > NO_ERR;
59+
k_mutex_unlock(&call_mutex);
60+
return out;
61+
}
62+
63+
int getErrorCode() {
64+
k_mutex_lock(&call_mutex, K_FOREVER);
65+
const int out = error.code;
66+
k_mutex_unlock(&call_mutex);
67+
return out;
68+
}
69+
70+
MsgPack::str_t getErrorMessage() {
71+
k_mutex_lock(&call_mutex, K_FOREVER);
72+
MsgPack::str_t out = error.traceback;
73+
k_mutex_unlock(&call_mutex);
74+
return out;
75+
}
3976

4077
template<typename RType> bool result(RType& result) {
4178

4279
if (!atomic_cas(&_executed, 0, 1)){
4380
// this thread lost the race
44-
error.code = GENERIC_ERR;
45-
error.traceback = "This call result is no longer available";
81+
setError(GENERIC_ERR, "This call is no longer available");
4682
return false;
4783
}
4884

@@ -60,13 +96,15 @@ class RpcCall {
6096

6197
while(true) {
6298
if (k_mutex_lock(read_mutex, K_MSEC(10)) == 0 ) {
63-
if (client->get_response(msg_id_wait, result, error)) {
99+
RpcError temp_err;
100+
if (client->get_response(msg_id_wait, result, temp_err)) {
64101
k_mutex_unlock(read_mutex);
65102
// if (error.code == PARSING_ERR) {
66103
// k_mutex_lock(write_mutex, K_FOREVER);
67104
// client->notify(BRIDGE_ERROR, error.traceback);
68105
// k_mutex_unlock(write_mutex);
69106
// }
107+
setError(temp_err.code, temp_err.traceback);
70108
break;
71109
}
72110
k_mutex_unlock(read_mutex);
@@ -76,7 +114,7 @@ class RpcCall {
76114
}
77115
}
78116

79-
return error.code == NO_ERR;
117+
return !isError();
80118
}
81119

82120
bool result() {
@@ -100,6 +138,7 @@ class RpcCall {
100138
RPCClient* client;
101139
struct k_mutex* read_mutex;
102140
struct k_mutex* write_mutex;
141+
struct k_mutex call_mutex{};
103142
std::tuple<Args...> callback_params;
104143
};
105144

@@ -120,6 +159,8 @@ class BridgeClass {
120159

121160
bool started = false;
122161

162+
MsgPack::str_t router_ver;
163+
123164
public:
124165

125166
explicit BridgeClass(HardwareSerial& serial) {
@@ -145,6 +186,8 @@ class BridgeClass {
145186

146187
if (is_started()) return true;
147188

189+
k_mutex_lock(&bridge_mutex, K_FOREVER);
190+
148191
serial_ptr->begin(baud);
149192
transport = new SerialTransport(*serial_ptr);
150193

@@ -159,13 +202,16 @@ class BridgeClass {
159202
UPDATE_THREAD_PRIORITY, 0, K_NO_WAIT);
160203
k_thread_name_set(upd_tid, "bridge");
161204

162-
k_mutex_lock(&bridge_mutex, K_FOREVER);
163205
bool res = false;
164206
started = call(RESET_METHOD).result(res) && res;
165207
k_mutex_unlock(&bridge_mutex);
166208
return res;
167209
}
168210

211+
bool getRouterVersion(MsgPack::str_t& version) {
212+
return call(GET_VERSION_METHOD).result(version);
213+
}
214+
169215
template<typename F>
170216
bool provide(const MsgPack::str_t& name, F&& func) {
171217
k_mutex_lock(&bridge_mutex, K_FOREVER);

0 commit comments

Comments
 (0)