From a3a434e1add557cc176484478a07124b2e7562a0 Mon Sep 17 00:00:00 2001 From: lucarin91 Date: Fri, 28 Nov 2025 19:00:13 +0100 Subject: [PATCH 01/14] feat: add bridge monitor command --- cmd/arduino-app-cli/app/app.go | 1 - cmd/arduino-app-cli/main.go | 2 + .../{app => monitor}/monitor.go | 42 +++++- internal/api/handlers/monitor.go | 122 ++++++------------ internal/monitor/monitor.go | 97 ++++++++++++++ 5 files changed, 171 insertions(+), 93 deletions(-) rename cmd/arduino-app-cli/{app => monitor}/monitor.go (54%) create mode 100644 internal/monitor/monitor.go diff --git a/cmd/arduino-app-cli/app/app.go b/cmd/arduino-app-cli/app/app.go index fca105b8..48c98c7a 100644 --- a/cmd/arduino-app-cli/app/app.go +++ b/cmd/arduino-app-cli/app/app.go @@ -38,7 +38,6 @@ func NewAppCmd(cfg config.Configuration) *cobra.Command { appCmd.AddCommand(newRestartCmd(cfg)) appCmd.AddCommand(newLogsCmd(cfg)) appCmd.AddCommand(newListCmd(cfg)) - appCmd.AddCommand(newMonitorCmd(cfg)) appCmd.AddCommand(newCacheCleanCmd(cfg)) return appCmd diff --git a/cmd/arduino-app-cli/main.go b/cmd/arduino-app-cli/main.go index c9dfddca..849e4178 100644 --- a/cmd/arduino-app-cli/main.go +++ b/cmd/arduino-app-cli/main.go @@ -30,6 +30,7 @@ import ( "github.com/arduino/arduino-app-cli/cmd/arduino-app-cli/config" "github.com/arduino/arduino-app-cli/cmd/arduino-app-cli/daemon" "github.com/arduino/arduino-app-cli/cmd/arduino-app-cli/internal/servicelocator" + "github.com/arduino/arduino-app-cli/cmd/arduino-app-cli/monitor" "github.com/arduino/arduino-app-cli/cmd/arduino-app-cli/properties" "github.com/arduino/arduino-app-cli/cmd/arduino-app-cli/system" "github.com/arduino/arduino-app-cli/cmd/arduino-app-cli/version" @@ -78,6 +79,7 @@ func run(configuration cfg.Configuration) error { config.NewConfigCmd(configuration), system.NewSystemCmd(configuration), version.NewVersionCmd(Version), + monitor.NewMonitorCmd(), ) ctx := context.Background() diff --git a/cmd/arduino-app-cli/app/monitor.go b/cmd/arduino-app-cli/monitor/monitor.go similarity index 54% rename from cmd/arduino-app-cli/app/monitor.go rename to cmd/arduino-app-cli/monitor/monitor.go index cdf057e1..28b27a7c 100644 --- a/cmd/arduino-app-cli/app/monitor.go +++ b/cmd/arduino-app-cli/monitor/monitor.go @@ -13,22 +13,52 @@ // Arduino software without disclosing the source code of your own applications. // To purchase a commercial license, send an email to license@arduino.cc. -package app +package monitor import ( + "io" + "os" + "github.com/spf13/cobra" - "github.com/arduino/arduino-app-cli/cmd/arduino-app-cli/completion" - "github.com/arduino/arduino-app-cli/internal/orchestrator/config" + "github.com/arduino/arduino-app-cli/internal/monitor" ) -func newMonitorCmd(cfg config.Configuration) *cobra.Command { +func NewMonitorCmd() *cobra.Command { return &cobra.Command{ Use: "monitor", Short: "Monitor the Arduino app", RunE: func(cmd *cobra.Command, args []string) error { - panic("not implemented") + start, err := monitor.NewMonitorHandler(&stdInOutProxy{stdin: os.Stdin, stdout: os.Stdout}) // nolint:forbidigo + if err != nil { + return err + } + go start() + <-cmd.Context().Done() + return nil }, - ValidArgsFunction: completion.ApplicationNames(cfg), } } + +type stdInOutProxy struct { + stdin io.Reader + stdout io.Writer +} + +func (s stdInOutProxy) ReadMessage() (int, []byte, error) { + var p [1024]byte + n, err := s.stdin.Read(p[:]) + if err != nil { + return 0, nil, err + } + return 1, p[:n], nil +} + +func (s stdInOutProxy) WriteMessage(messageType int, data []byte) error { + _, err := s.stdout.Write(data) + return err +} + +func (s stdInOutProxy) Close() error { + return nil +} diff --git a/internal/api/handlers/monitor.go b/internal/api/handlers/monitor.go index 5aaf8f4d..97f1ad98 100644 --- a/internal/api/handlers/monitor.go +++ b/internal/api/handlers/monitor.go @@ -16,9 +16,7 @@ package handlers import ( - "errors" "fmt" - "io" "log/slog" "net" "net/http" @@ -28,59 +26,50 @@ import ( "github.com/gorilla/websocket" "github.com/arduino/arduino-app-cli/internal/api/models" + "github.com/arduino/arduino-app-cli/internal/monitor" "github.com/arduino/arduino-app-cli/internal/render" ) -func monitorStream(mon net.Conn, ws *websocket.Conn) { - logWebsocketError := func(msg string, err error) { - // Do not log simple close or interruption errors - if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived, websocket.CloseAbnormalClosure) { - if e, ok := err.(*websocket.CloseError); ok { - slog.Error(msg, slog.String("closecause", fmt.Sprintf("%d: %s", e.Code, err))) - } else { - slog.Error(msg, slog.String("error", err.Error())) - } - } +func HandleMonitorWS(allowedOrigins []string) http.HandlerFunc { + upgrader := websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return checkOrigin(r.Header.Get("Origin"), allowedOrigins) + }, } - logSocketError := func(msg string, err error) { - if !errors.Is(err, net.ErrClosed) && !errors.Is(err, io.EOF) { - slog.Error(msg, slog.String("error", err.Error())) + + return func(w http.ResponseWriter, r *http.Request) { + // Connect to monitor + mon, err := net.DialTimeout("tcp", "127.0.0.1:7500", time.Second) + if err != nil { + slog.Error("Unable to connect to monitor", slog.String("error", err.Error())) + render.EncodeResponse(w, http.StatusServiceUnavailable, models.ErrorResponse{Details: "Unable to connect to monitor: " + err.Error()}) + return } - } - go func() { - defer mon.Close() - defer ws.Close() - for { - // Read from websocket and write to monitor - _, msg, err := ws.ReadMessage() - if err != nil { - logWebsocketError("Error reading from websocket", err) - return - } - if _, err := mon.Write(msg); err != nil { - logSocketError("Error writing to monitor", err) - return - } + + // Upgrade the connection to websocket + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + // Remember to close monitor connection if websocket upgrade fails. + mon.Close() + + slog.Error("Failed to upgrade connection", slog.String("error", err.Error())) + render.EncodeResponse(w, http.StatusInternalServerError, map[string]string{"error": "Failed to upgrade connection: " + err.Error()}) + return } - }() - go func() { - defer mon.Close() - defer ws.Close() - buff := [1024]byte{} - for { - // Read from monitor and write to websocket - n, err := mon.Read(buff[:]) - if err != nil { - logSocketError("Error reading from monitor", err) - return - } - if err := ws.WriteMessage(websocket.BinaryMessage, buff[:n]); err != nil { - logWebsocketError("Error writing to websocket", err) - return - } + // Now the connection is managed by the websocket library, let's move the handlers in the goroutine + start, err := monitor.NewMonitorHandler(conn) + if err != nil { + slog.Error("Unable to start monitor handler", slog.String("error", err.Error())) + render.EncodeResponse(w, http.StatusInternalServerError, models.ErrorResponse{Details: "Unable to start monitor handler: " + err.Error()}) + return } - }() + go start() + + // and return nothing to the http library + } } func splitOrigin(origin string) (scheme, host, port string, err error) { @@ -125,42 +114,3 @@ func checkOrigin(origin string, allowedOrigins []string) bool { slog.Error("WebSocket origin check failed", slog.String("origin", origin)) return false } - -func HandleMonitorWS(allowedOrigins []string) http.HandlerFunc { - // Do a dry-run of checkorigin, so it can panic if misconfigured now, not on first request - _ = checkOrigin("http://localhost", allowedOrigins) - - upgrader := websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { - return checkOrigin(r.Header.Get("Origin"), allowedOrigins) - }, - } - - return func(w http.ResponseWriter, r *http.Request) { - // Connect to monitor - mon, err := net.DialTimeout("tcp", "127.0.0.1:7500", time.Second) - if err != nil { - slog.Error("Unable to connect to monitor", slog.String("error", err.Error())) - render.EncodeResponse(w, http.StatusServiceUnavailable, models.ErrorResponse{Details: "Unable to connect to monitor: " + err.Error()}) - return - } - - // Upgrade the connection to websocket - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - // Remember to close monitor connection if websocket upgrade fails. - mon.Close() - - slog.Error("Failed to upgrade connection", slog.String("error", err.Error())) - render.EncodeResponse(w, http.StatusInternalServerError, map[string]string{"error": "Failed to upgrade connection: " + err.Error()}) - return - } - - // Now the connection is managed by the websocket library, let's move the handlers in the goroutine - go monitorStream(mon, conn) - - // and return nothing to the http library - } -} diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go new file mode 100644 index 00000000..973d0eae --- /dev/null +++ b/internal/monitor/monitor.go @@ -0,0 +1,97 @@ +// This file is part of arduino-app-cli. +// +// Copyright 2025 ARDUINO SA (http://www.arduino.cc/) +// +// This software is released under the GNU General Public License version 3, +// which covers the main part of arduino-app-cli. +// The terms of this license can be found at: +// https://www.gnu.org/licenses/gpl-3.0.en.html +// +// You can be released from the requirements of the above licenses by purchasing +// a commercial license. Buying such a license is mandatory if you want to +// modify or otherwise use the software for commercial activities involving the +// Arduino software without disclosing the source code of your own applications. +// To purchase a commercial license, send an email to license@arduino.cc. + +package monitor + +import ( + "errors" + "fmt" + "io" + "log/slog" + "net" + "time" + + "github.com/gorilla/websocket" +) + +type MessageReaderWriter interface { + ReadMessage() (messageType int, p []byte, err error) + WriteMessage(messageType int, data []byte) error + Close() error +} + +func NewMonitorHandler(ws MessageReaderWriter) (func(), error) { + // Connect to monitor + mon, err := net.DialTimeout("tcp", "127.0.0.1:7500", time.Second) + if err != nil { + return func() {}, err + } + + return func() { + monitorStream(mon, ws) + }, nil +} + +func monitorStream(mon net.Conn, ws MessageReaderWriter) { + logWebsocketError := func(msg string, err error) { + // Do not log simple close or interruption errors + if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived, websocket.CloseAbnormalClosure) { + if e, ok := err.(*websocket.CloseError); ok { + slog.Error(msg, slog.String("closecause", fmt.Sprintf("%d: %s", e.Code, err))) + } else { + slog.Error(msg, slog.String("error", err.Error())) + } + } + } + logSocketError := func(msg string, err error) { + if !errors.Is(err, net.ErrClosed) && !errors.Is(err, io.EOF) { + slog.Error(msg, slog.String("error", err.Error())) + } + } + go func() { + defer mon.Close() + defer ws.Close() + for { + // Read from websocket and write to monitor + _, msg, err := ws.ReadMessage() + if err != nil { + logWebsocketError("Error reading from websocket", err) + return + } + if _, err := mon.Write(msg); err != nil { + logSocketError("Error writing to monitor", err) + return + } + } + }() + go func() { + defer mon.Close() + defer ws.Close() + buff := [1024]byte{} + for { + // Read from monitor and write to websocket + n, err := mon.Read(buff[:]) + if err != nil { + logSocketError("Error reading from monitor", err) + return + } + + if err := ws.WriteMessage(websocket.BinaryMessage, buff[:n]); err != nil { + logWebsocketError("Error writing to websocket", err) + return + } + } + }() +} From 3e3726c7ef29e58c3235973707a98042823ddf4c Mon Sep 17 00:00:00 2001 From: Luca Rinaldi Date: Mon, 1 Dec 2025 11:51:08 +0100 Subject: [PATCH 02/14] Update internal/monitor/monitor.go Co-authored-by: Cristian Maglie --- internal/monitor/monitor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go index 973d0eae..619609e6 100644 --- a/internal/monitor/monitor.go +++ b/internal/monitor/monitor.go @@ -36,7 +36,7 @@ func NewMonitorHandler(ws MessageReaderWriter) (func(), error) { // Connect to monitor mon, err := net.DialTimeout("tcp", "127.0.0.1:7500", time.Second) if err != nil { - return func() {}, err + return nil, err } return func() { From 9d7e112a2f6007469e1c14705649215e7dfdf342 Mon Sep 17 00:00:00 2001 From: lucarin91 Date: Mon, 1 Dec 2025 12:08:06 +0100 Subject: [PATCH 03/14] improve description --- cmd/arduino-app-cli/monitor/monitor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/arduino-app-cli/monitor/monitor.go b/cmd/arduino-app-cli/monitor/monitor.go index 28b27a7c..3e2ff49a 100644 --- a/cmd/arduino-app-cli/monitor/monitor.go +++ b/cmd/arduino-app-cli/monitor/monitor.go @@ -27,7 +27,7 @@ import ( func NewMonitorCmd() *cobra.Command { return &cobra.Command{ Use: "monitor", - Short: "Monitor the Arduino app", + Short: "Attach to the microcontroller serial monitor", RunE: func(cmd *cobra.Command, args []string) error { start, err := monitor.NewMonitorHandler(&stdInOutProxy{stdin: os.Stdin, stdout: os.Stdout}) // nolint:forbidigo if err != nil { From 32c06ae3b31f754da1a9835f4a763779381f0bd1 Mon Sep 17 00:00:00 2001 From: lucarin91 Date: Mon, 1 Dec 2025 12:23:37 +0100 Subject: [PATCH 04/14] apply code review suggestions --- internal/monitor/monitor.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go index 619609e6..5d02aaa7 100644 --- a/internal/monitor/monitor.go +++ b/internal/monitor/monitor.go @@ -32,7 +32,7 @@ type MessageReaderWriter interface { Close() error } -func NewMonitorHandler(ws MessageReaderWriter) (func(), error) { +func NewMonitorHandler(rw MessageReaderWriter) (func(), error) { // Connect to monitor mon, err := net.DialTimeout("tcp", "127.0.0.1:7500", time.Second) if err != nil { @@ -40,11 +40,11 @@ func NewMonitorHandler(ws MessageReaderWriter) (func(), error) { } return func() { - monitorStream(mon, ws) + monitorStream(mon, rw) }, nil } -func monitorStream(mon net.Conn, ws MessageReaderWriter) { +func monitorStream(mon net.Conn, rw MessageReaderWriter) { logWebsocketError := func(msg string, err error) { // Do not log simple close or interruption errors if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived, websocket.CloseAbnormalClosure) { @@ -62,10 +62,10 @@ func monitorStream(mon net.Conn, ws MessageReaderWriter) { } go func() { defer mon.Close() - defer ws.Close() + defer rw.Close() for { // Read from websocket and write to monitor - _, msg, err := ws.ReadMessage() + _, msg, err := rw.ReadMessage() if err != nil { logWebsocketError("Error reading from websocket", err) return @@ -78,7 +78,7 @@ func monitorStream(mon net.Conn, ws MessageReaderWriter) { }() go func() { defer mon.Close() - defer ws.Close() + defer rw.Close() buff := [1024]byte{} for { // Read from monitor and write to websocket @@ -88,7 +88,7 @@ func monitorStream(mon net.Conn, ws MessageReaderWriter) { return } - if err := ws.WriteMessage(websocket.BinaryMessage, buff[:n]); err != nil { + if err := rw.WriteMessage(websocket.BinaryMessage, buff[:n]); err != nil { logWebsocketError("Error writing to websocket", err) return } From 14a718e3bd9ece921de545b9827567688beccf4e Mon Sep 17 00:00:00 2001 From: lucarin91 Date: Mon, 1 Dec 2025 15:32:37 +0100 Subject: [PATCH 05/14] propertly implement the internal function --- cmd/arduino-app-cli/monitor/monitor.go | 29 ++++++------- internal/api/handlers/monitor.go | 59 +++++++++++++++++++++----- internal/monitor/monitor.go | 38 +++++------------ 3 files changed, 73 insertions(+), 53 deletions(-) diff --git a/cmd/arduino-app-cli/monitor/monitor.go b/cmd/arduino-app-cli/monitor/monitor.go index 3e2ff49a..df6946e6 100644 --- a/cmd/arduino-app-cli/monitor/monitor.go +++ b/cmd/arduino-app-cli/monitor/monitor.go @@ -21,6 +21,7 @@ import ( "github.com/spf13/cobra" + "github.com/arduino/arduino-app-cli/cmd/feedback" "github.com/arduino/arduino-app-cli/internal/monitor" ) @@ -29,7 +30,11 @@ func NewMonitorCmd() *cobra.Command { Use: "monitor", Short: "Attach to the microcontroller serial monitor", RunE: func(cmd *cobra.Command, args []string) error { - start, err := monitor.NewMonitorHandler(&stdInOutProxy{stdin: os.Stdin, stdout: os.Stdout}) // nolint:forbidigo + stdout, _, err := feedback.DirectStreams() + if err != nil { + return err + } + start, err := monitor.NewMonitorHandler(&combinedReadWrite{r: os.Stdin, w: stdout}) // nolint:forbidigo if err != nil { return err } @@ -40,25 +45,19 @@ func NewMonitorCmd() *cobra.Command { } } -type stdInOutProxy struct { - stdin io.Reader - stdout io.Writer +type combinedReadWrite struct { + r io.Reader + w io.Writer } -func (s stdInOutProxy) ReadMessage() (int, []byte, error) { - var p [1024]byte - n, err := s.stdin.Read(p[:]) - if err != nil { - return 0, nil, err - } - return 1, p[:n], nil +func (crw *combinedReadWrite) Read(p []byte) (n int, err error) { + return crw.r.Read(p) } -func (s stdInOutProxy) WriteMessage(messageType int, data []byte) error { - _, err := s.stdout.Write(data) - return err +func (crw *combinedReadWrite) Write(p []byte) (n int, err error) { + return crw.w.Write(p) } -func (s stdInOutProxy) Close() error { +func (crw *combinedReadWrite) Close() error { return nil } diff --git a/internal/api/handlers/monitor.go b/internal/api/handlers/monitor.go index 97f1ad98..00912cc1 100644 --- a/internal/api/handlers/monitor.go +++ b/internal/api/handlers/monitor.go @@ -21,7 +21,6 @@ import ( "net" "net/http" "strings" - "time" "github.com/gorilla/websocket" @@ -40,19 +39,10 @@ func HandleMonitorWS(allowedOrigins []string) http.HandlerFunc { } return func(w http.ResponseWriter, r *http.Request) { - // Connect to monitor - mon, err := net.DialTimeout("tcp", "127.0.0.1:7500", time.Second) - if err != nil { - slog.Error("Unable to connect to monitor", slog.String("error", err.Error())) - render.EncodeResponse(w, http.StatusServiceUnavailable, models.ErrorResponse{Details: "Unable to connect to monitor: " + err.Error()}) - return - } - // Upgrade the connection to websocket conn, err := upgrader.Upgrade(w, r, nil) if err != nil { // Remember to close monitor connection if websocket upgrade fails. - mon.Close() slog.Error("Failed to upgrade connection", slog.String("error", err.Error())) render.EncodeResponse(w, http.StatusInternalServerError, map[string]string{"error": "Failed to upgrade connection: " + err.Error()}) @@ -60,7 +50,7 @@ func HandleMonitorWS(allowedOrigins []string) http.HandlerFunc { } // Now the connection is managed by the websocket library, let's move the handlers in the goroutine - start, err := monitor.NewMonitorHandler(conn) + start, err := monitor.NewMonitorHandler(&wsReadWriteCloser{conn: conn}) if err != nil { slog.Error("Unable to start monitor handler", slog.String("error", err.Error())) render.EncodeResponse(w, http.StatusInternalServerError, models.ErrorResponse{Details: "Unable to start monitor handler: " + err.Error()}) @@ -114,3 +104,50 @@ func checkOrigin(origin string, allowedOrigins []string) bool { slog.Error("WebSocket origin check failed", slog.String("origin", origin)) return false } + +type wsReadWriteCloser struct { + conn *websocket.Conn + + buff []byte +} + +func (w *wsReadWriteCloser) Read(p []byte) (n int, err error) { + if len(w.buff) > 0 { + n = copy(p, w.buff) + w.buff = w.buff[n:] + return n, nil + } + + ty, message, err := w.conn.ReadMessage() + if err != nil { + return 0, mapWebSocketErrors(err) + } + if ty != websocket.BinaryMessage { + return 0, fmt.Errorf("unexpected websocket message type: %d", ty) + } + w.buff = message + + n = copy(p, w.buff) + w.buff = w.buff[n:] + return n, nil +} + +func (w *wsReadWriteCloser) Write(p []byte) (n int, err error) { + err = w.conn.WriteMessage(websocket.BinaryMessage, p) + if err != nil { + return 0, mapWebSocketErrors(err) + } + return len(p), nil +} + +func (w *wsReadWriteCloser) Close() error { + w.buff = nil + return w.conn.Close() +} + +func mapWebSocketErrors(err error) error { + if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived, websocket.CloseAbnormalClosure) { + return net.ErrClosed + } + return err +} diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go index 5d02aaa7..012efc7a 100644 --- a/internal/monitor/monitor.go +++ b/internal/monitor/monitor.go @@ -17,44 +17,27 @@ package monitor import ( "errors" - "fmt" "io" "log/slog" "net" "time" - - "github.com/gorilla/websocket" ) -type MessageReaderWriter interface { - ReadMessage() (messageType int, p []byte, err error) - WriteMessage(messageType int, data []byte) error - Close() error -} +const monitorAddr = "127.0.0.1:7500" -func NewMonitorHandler(rw MessageReaderWriter) (func(), error) { +func NewMonitorHandler(rw io.ReadWriteCloser) (func(), error) { // Connect to monitor - mon, err := net.DialTimeout("tcp", "127.0.0.1:7500", time.Second) + monitor, err := net.DialTimeout("tcp", monitorAddr, time.Second) if err != nil { return nil, err } return func() { - monitorStream(mon, rw) + monitorStream(monitor, rw) }, nil } -func monitorStream(mon net.Conn, rw MessageReaderWriter) { - logWebsocketError := func(msg string, err error) { - // Do not log simple close or interruption errors - if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived, websocket.CloseAbnormalClosure) { - if e, ok := err.(*websocket.CloseError); ok { - slog.Error(msg, slog.String("closecause", fmt.Sprintf("%d: %s", e.Code, err))) - } else { - slog.Error(msg, slog.String("error", err.Error())) - } - } - } +func monitorStream(mon net.Conn, rw io.ReadWriteCloser) { logSocketError := func(msg string, err error) { if !errors.Is(err, net.ErrClosed) && !errors.Is(err, io.EOF) { slog.Error(msg, slog.String("error", err.Error())) @@ -63,14 +46,15 @@ func monitorStream(mon net.Conn, rw MessageReaderWriter) { go func() { defer mon.Close() defer rw.Close() + buff := [1024]byte{} for { // Read from websocket and write to monitor - _, msg, err := rw.ReadMessage() + n, err := rw.Read(buff[:]) if err != nil { - logWebsocketError("Error reading from websocket", err) + logSocketError("Error reading from websocket", err) return } - if _, err := mon.Write(msg); err != nil { + if _, err := mon.Write(buff[:n]); err != nil { logSocketError("Error writing to monitor", err) return } @@ -88,8 +72,8 @@ func monitorStream(mon net.Conn, rw MessageReaderWriter) { return } - if err := rw.WriteMessage(websocket.BinaryMessage, buff[:n]); err != nil { - logWebsocketError("Error writing to websocket", err) + if _, err := rw.Write(buff[:n]); err != nil { + logSocketError("Error writing to websocket", err) return } } From 976efb49d9f45298e0eac56380a2a25b3f90bb9c Mon Sep 17 00:00:00 2001 From: lucarin91 Date: Mon, 1 Dec 2025 16:45:58 +0100 Subject: [PATCH 06/14] handle also text type --- internal/api/handlers/monitor.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/internal/api/handlers/monitor.go b/internal/api/handlers/monitor.go index 00912cc1..08492cd1 100644 --- a/internal/api/handlers/monitor.go +++ b/internal/api/handlers/monitor.go @@ -122,13 +122,11 @@ func (w *wsReadWriteCloser) Read(p []byte) (n int, err error) { if err != nil { return 0, mapWebSocketErrors(err) } - if ty != websocket.BinaryMessage { - return 0, fmt.Errorf("unexpected websocket message type: %d", ty) + if ty != websocket.BinaryMessage && ty != websocket.TextMessage { + return } - w.buff = message - - n = copy(p, w.buff) - w.buff = w.buff[n:] + n = copy(p, message) + w.buff = message[n:] return n, nil } From 9196360f5ae844aa5066164fb45409bac0790727 Mon Sep 17 00:00:00 2001 From: Luca Rinaldi Date: Tue, 2 Dec 2025 10:50:09 +0100 Subject: [PATCH 07/14] Update internal/monitor/monitor.go Co-authored-by: Davide --- internal/monitor/monitor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go index 012efc7a..0fe2fd7c 100644 --- a/internal/monitor/monitor.go +++ b/internal/monitor/monitor.go @@ -73,7 +73,7 @@ func monitorStream(mon net.Conn, rw io.ReadWriteCloser) { } if _, err := rw.Write(buff[:n]); err != nil { - logSocketError("Error writing to websocket", err) + logSocketError("Error writing to buffer", err) return } } From 37c2d5ec2f70fb17bac9f97e0faace1af5eb2fe4 Mon Sep 17 00:00:00 2001 From: Luca Rinaldi Date: Tue, 2 Dec 2025 10:50:29 +0100 Subject: [PATCH 08/14] Update internal/monitor/monitor.go Co-authored-by: Davide --- internal/monitor/monitor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go index 0fe2fd7c..99bf21df 100644 --- a/internal/monitor/monitor.go +++ b/internal/monitor/monitor.go @@ -65,7 +65,7 @@ func monitorStream(mon net.Conn, rw io.ReadWriteCloser) { defer rw.Close() buff := [1024]byte{} for { - // Read from monitor and write to websocket + // Read from monitor and write to writer n, err := mon.Read(buff[:]) if err != nil { logSocketError("Error reading from monitor", err) From c6afb8c5b257deb05919018ebcd7ee9211158626 Mon Sep 17 00:00:00 2001 From: Luca Rinaldi Date: Tue, 2 Dec 2025 10:50:37 +0100 Subject: [PATCH 09/14] Update internal/monitor/monitor.go Co-authored-by: Davide --- internal/monitor/monitor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go index 99bf21df..c3ae8005 100644 --- a/internal/monitor/monitor.go +++ b/internal/monitor/monitor.go @@ -48,7 +48,7 @@ func monitorStream(mon net.Conn, rw io.ReadWriteCloser) { defer rw.Close() buff := [1024]byte{} for { - // Read from websocket and write to monitor + // Read from reader and write to monitor n, err := rw.Read(buff[:]) if err != nil { logSocketError("Error reading from websocket", err) From ee0d6963524ca4b8264f5e5fa5894ce66481fcc2 Mon Sep 17 00:00:00 2001 From: lucarin91 Date: Fri, 5 Dec 2025 15:19:56 +0100 Subject: [PATCH 10/14] add monitor test --- internal/monitor/monitor.go | 15 +++++-- internal/monitor/monitor_test.go | 69 ++++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 3 deletions(-) create mode 100644 internal/monitor/monitor_test.go diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go index c3ae8005..1c4a33b0 100644 --- a/internal/monitor/monitor.go +++ b/internal/monitor/monitor.go @@ -21,13 +21,22 @@ import ( "log/slog" "net" "time" + + "go.bug.st/f" ) -const monitorAddr = "127.0.0.1:7500" +const defaultArduinoRouterMonitorAddress = "127.0.0.1:7500" + +func NewMonitorHandler(rw io.ReadWriteCloser, address ...string) (func(), error) { + f.Assert(len(address) <= 1, "NewMonitorHandler accepts at most one address argument") + + addr := defaultArduinoRouterMonitorAddress + if len(address) == 1 { + addr = address[0] + } -func NewMonitorHandler(rw io.ReadWriteCloser) (func(), error) { // Connect to monitor - monitor, err := net.DialTimeout("tcp", monitorAddr, time.Second) + monitor, err := net.DialTimeout("tcp", addr, time.Second) if err != nil { return nil, err } diff --git a/internal/monitor/monitor_test.go b/internal/monitor/monitor_test.go new file mode 100644 index 00000000..a342a39e --- /dev/null +++ b/internal/monitor/monitor_test.go @@ -0,0 +1,69 @@ +package monitor + +import ( + "fmt" + "io" + "net" + "testing" + + "github.com/arduino/arduino-app-cli/pkg/x/ports" + "github.com/stretchr/testify/assert" +) + +func TestMonitorHandler(t *testing.T) { + addr := startEcoMonitor(t) + + t.Logf("Started echo monitor at %s", addr.String()) + + // Use pipes to simulate ReadWriteCloser + rOut, wIn := io.Pipe() + rIn, wOut := io.Pipe() + type pipeReadWriteCloser struct { + io.Reader + io.Writer + io.Closer + } + pr := &pipeReadWriteCloser{ + Reader: rOut, + Writer: wOut, + Closer: io.NopCloser(nil), + } + + handler, err := NewMonitorHandler(pr, addr.String()) + assert.NoError(t, err) + go handler() + + // Write data to the pipe writer + message := "Hello, Monitor!" + n, err := wIn.Write([]byte(message)) + assert.NoError(t, err) + assert.Equal(t, len(message), n) + + // Read data from the pipe reader + buf := [128]byte{} + n, err = rIn.Read(buf[:]) + assert.NoError(t, err) + assert.Equal(t, len(message), n) + assert.Equal(t, message, string(buf[:n])) +} + +func startEcoMonitor(t *testing.T) net.Addr { + t.Helper() + + port, err := ports.GetAvailable() + assert.NoError(t, err) + + ln, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port)) + assert.NoError(t, err) + t.Cleanup(func() { _ = ln.Close() }) + + go func() { + for { + conn, err := ln.Accept() + assert.NoError(t, err) + go io.Copy(conn, conn) // Echo server + } + }() + + return ln.Addr() +} From 21a9f2ef8c8fd78c600d998a04a0afb705761f58 Mon Sep 17 00:00:00 2001 From: lucarin91 Date: Fri, 5 Dec 2025 15:29:59 +0100 Subject: [PATCH 11/14] fixup! add monitor test --- internal/monitor/monitor_test.go | 44 +++++++++++++++++++------------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/internal/monitor/monitor_test.go b/internal/monitor/monitor_test.go index a342a39e..b8ce3d27 100644 --- a/internal/monitor/monitor_test.go +++ b/internal/monitor/monitor_test.go @@ -6,30 +6,17 @@ import ( "net" "testing" - "github.com/arduino/arduino-app-cli/pkg/x/ports" "github.com/stretchr/testify/assert" + + "github.com/arduino/arduino-app-cli/pkg/x/ports" ) func TestMonitorHandler(t *testing.T) { addr := startEcoMonitor(t) - t.Logf("Started echo monitor at %s", addr.String()) - - // Use pipes to simulate ReadWriteCloser - rOut, wIn := io.Pipe() - rIn, wOut := io.Pipe() - type pipeReadWriteCloser struct { - io.Reader - io.Writer - io.Closer - } - pr := &pipeReadWriteCloser{ - Reader: rOut, - Writer: wOut, - Closer: io.NopCloser(nil), - } + rIn, wIn, rwOut := getReadWriteCloser() - handler, err := NewMonitorHandler(pr, addr.String()) + handler, err := NewMonitorHandler(rwOut, addr.String()) assert.NoError(t, err) go handler() @@ -47,6 +34,23 @@ func TestMonitorHandler(t *testing.T) { assert.Equal(t, message, string(buf[:n])) } +func getReadWriteCloser() (io.Reader, io.Writer, io.ReadWriteCloser) { + rOut, wIn := io.Pipe() + rIn, wOut := io.Pipe() + + type pipeReadWriteCloser struct { + io.Reader + io.Writer + io.Closer + } + pr := &pipeReadWriteCloser{ + Reader: rOut, + Writer: wOut, + Closer: io.NopCloser(nil), + } + return rIn, wIn, pr +} + func startEcoMonitor(t *testing.T) net.Addr { t.Helper() @@ -61,7 +65,11 @@ func startEcoMonitor(t *testing.T) net.Addr { for { conn, err := ln.Accept() assert.NoError(t, err) - go io.Copy(conn, conn) // Echo server + go func() { + defer conn.Close() + _, err = io.Copy(conn, conn) // Echo server + assert.NoError(t, err) + }() } }() From 403b922ce626aba741e02f5ae167aeb911d23edd Mon Sep 17 00:00:00 2001 From: lucarin91 Date: Fri, 5 Dec 2025 16:07:29 +0100 Subject: [PATCH 12/14] fixup! fixup! add monitor test --- internal/monitor/monitor_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/monitor/monitor_test.go b/internal/monitor/monitor_test.go index b8ce3d27..326db685 100644 --- a/internal/monitor/monitor_test.go +++ b/internal/monitor/monitor_test.go @@ -67,8 +67,7 @@ func startEcoMonitor(t *testing.T) net.Addr { assert.NoError(t, err) go func() { defer conn.Close() - _, err = io.Copy(conn, conn) // Echo server - assert.NoError(t, err) + _, _ = io.Copy(conn, conn) // Echo server }() } }() From c2274ecc1f240ec5c361db93718f23697c95e4fb Mon Sep 17 00:00:00 2001 From: lucarin91 Date: Fri, 5 Dec 2025 16:22:58 +0100 Subject: [PATCH 13/14] fixup! fixup! fixup! add monitor test --- internal/monitor/monitor_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/monitor/monitor_test.go b/internal/monitor/monitor_test.go index 326db685..effa9c90 100644 --- a/internal/monitor/monitor_test.go +++ b/internal/monitor/monitor_test.go @@ -63,8 +63,7 @@ func startEcoMonitor(t *testing.T) net.Addr { go func() { for { - conn, err := ln.Accept() - assert.NoError(t, err) + conn, _ := ln.Accept() go func() { defer conn.Close() _, _ = io.Copy(conn, conn) // Echo server From 84bdeb6ee524bc550b5215aeb851495f6ae9981c Mon Sep 17 00:00:00 2001 From: lucarin91 Date: Fri, 5 Dec 2025 17:43:40 +0100 Subject: [PATCH 14/14] fixup! fixup! fixup! fixup! add monitor test --- internal/monitor/monitor_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/monitor/monitor_test.go b/internal/monitor/monitor_test.go index effa9c90..2906a347 100644 --- a/internal/monitor/monitor_test.go +++ b/internal/monitor/monitor_test.go @@ -63,7 +63,11 @@ func startEcoMonitor(t *testing.T) net.Addr { go func() { for { - conn, _ := ln.Accept() + conn, err := ln.Accept() + if err != nil { + return + } + go func() { defer conn.Close() _, _ = io.Copy(conn, conn) // Echo server