From 6e804447708676afab299b505f17ad87ea448eec Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Thu, 25 Dec 2025 05:19:35 +0000 Subject: [PATCH] fix(grpc): fix goroutine leak in ListenCommand and race conditions - Close `cmdChan` in `ListenCommand` when the client disconnects to allow the sender goroutine to exit. - Fix race condition in `SendCommand` by holding `RLock` during the channel send operation. - Fix race condition in `ListenCommand` by protecting `stream.Send` with a `sync.Mutex`. - Ensure `UnregisterClientChannel` runs before `close(cmdChan)` via correct defer order. - Add regression test for `SendCommand` thread safety. --- internal/api/grpc/command.go | 15 +++++++-- internal/service/command.go | 2 +- internal/service/command_test.go | 54 ++++++++++++++++++++++++++++++++ 3 files changed, 68 insertions(+), 3 deletions(-) create mode 100644 internal/service/command_test.go diff --git a/internal/api/grpc/command.go b/internal/api/grpc/command.go index 2323052..cc291a2 100644 --- a/internal/api/grpc/command.go +++ b/internal/api/grpc/command.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "log" + "sync" "github.com/MINIOpenSource/CIMS-backend/internal/proto/Protobuf/Enum" "github.com/MINIOpenSource/CIMS-backend/internal/proto/Protobuf/Server" @@ -45,12 +46,19 @@ func (s *ClientCommandDeliverServer) ListenCommand(stream Service.ClientCommandD cmdChan := make(chan *Server.ClientCommandDeliverScRsp, 10) s.commandService.RegisterClientChannel(clientID, cmdChan) + // Ensure Unregister happens before Close to avoid race conditions in SendCommand + defer close(cmdChan) defer s.commandService.UnregisterClientChannel(clientID) + var streamMu sync.Mutex + // Send commands go func() { for cmd := range cmdChan { - if err := stream.Send(cmd); err != nil { + streamMu.Lock() + err := stream.Send(cmd) + streamMu.Unlock() + if err != nil { log.Printf("Error sending command to %s: %v", clientID, err) return } @@ -75,7 +83,10 @@ func (s *ClientCommandDeliverServer) ListenCommand(stream Service.ClientCommandD RetCode: Enum.Retcode_Success, Type: Enum.CommandTypes_Pong, } - if err := stream.Send(pong); err != nil { + streamMu.Lock() + err := stream.Send(pong) + streamMu.Unlock() + if err != nil { log.Printf("Error sending Pong to %s: %v", clientID, err) return err } diff --git a/internal/service/command.go b/internal/service/command.go index 7b69ac9..a56124b 100644 --- a/internal/service/command.go +++ b/internal/service/command.go @@ -41,8 +41,8 @@ func (s *CommandService) UnregisterClientChannel(clientID string) { // SendCommand sends a command to a specific client func (s *CommandService) SendCommand(clientID string, cmdType Enum.CommandTypes, payload []byte) bool { s.mu.RLock() + defer s.mu.RUnlock() ch, ok := s.clientChans[clientID] - s.mu.RUnlock() if !ok { return false diff --git a/internal/service/command_test.go b/internal/service/command_test.go new file mode 100644 index 0000000..dc04a45 --- /dev/null +++ b/internal/service/command_test.go @@ -0,0 +1,54 @@ +package service + +import ( + "sync" + "testing" + "time" + + "github.com/MINIOpenSource/CIMS-backend/internal/proto/Protobuf/Enum" + "github.com/MINIOpenSource/CIMS-backend/internal/proto/Protobuf/Server" +) + +func TestCommandService_SendCommand_ThreadSafety(t *testing.T) { + s := &CommandService{ + clientChans: make(map[string]chan *Server.ClientCommandDeliverScRsp), + } + + clientID := "test-client" + ch := make(chan *Server.ClientCommandDeliverScRsp, 10) + + s.RegisterClientChannel(clientID, ch) + + // Simulate concurrent access: SendCommand vs Unregister/Close + + var wg sync.WaitGroup + wg.Add(2) + + // Sender routine + go func() { + defer wg.Done() + for i := 0; i < 1000; i++ { + s.SendCommand(clientID, Enum.CommandTypes_Ping, []byte{}) + time.Sleep(100 * time.Microsecond) + } + }() + + // Unregister/Closer routine + go func() { + defer wg.Done() + time.Sleep(50 * time.Millisecond) // Let some sends happen + + // Simulate the sequence in ListenCommand + s.UnregisterClientChannel(clientID) + close(ch) + }() + + wg.Wait() + + // If the code is buggy (SendCommand does not hold lock), we might see a panic: "send on closed channel" + // Because SendCommand might get the channel, then Unregister/Close happens, then SendCommand sends. + // With the fix, SendCommand holds lock. + // If it gets lock before Unregister, it sends. + // If Unregister gets lock, it removes from map. SendCommand (later) will not find it. + // close(ch) happens after Unregister. +}