Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions internal/api/grpc/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"io"
"log"
"sync"

"github.com/MINIOpenSource/CIMS-backend/internal/proto/Protobuf/Enum"
"github.com/MINIOpenSource/CIMS-backend/internal/proto/Protobuf/Server"
Expand Down Expand Up @@ -45,12 +46,19 @@ func (s *ClientCommandDeliverServer) ListenCommand(stream Service.ClientCommandD

cmdChan := make(chan *Server.ClientCommandDeliverScRsp, 10)
s.commandService.RegisterClientChannel(clientID, cmdChan)
// Ensure Unregister happens before Close to avoid race conditions in SendCommand
defer close(cmdChan)
defer s.commandService.UnregisterClientChannel(clientID)

var streamMu sync.Mutex

// Send commands
go func() {
for cmd := range cmdChan {
if err := stream.Send(cmd); err != nil {
streamMu.Lock()
err := stream.Send(cmd)
streamMu.Unlock()
if err != nil {
log.Printf("Error sending command to %s: %v", clientID, err)
return
}
Expand All @@ -75,7 +83,10 @@ func (s *ClientCommandDeliverServer) ListenCommand(stream Service.ClientCommandD
RetCode: Enum.Retcode_Success,
Type: Enum.CommandTypes_Pong,
}
if err := stream.Send(pong); err != nil {
streamMu.Lock()
err := stream.Send(pong)
streamMu.Unlock()
if err != nil {
log.Printf("Error sending Pong to %s: %v", clientID, err)
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/service/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func (s *CommandService) UnregisterClientChannel(clientID string) {
// SendCommand sends a command to a specific client
func (s *CommandService) SendCommand(clientID string, cmdType Enum.CommandTypes, payload []byte) bool {
s.mu.RLock()
defer s.mu.RUnlock()
ch, ok := s.clientChans[clientID]
s.mu.RUnlock()

if !ok {
return false
Expand Down
54 changes: 54 additions & 0 deletions internal/service/command_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package service

import (
"sync"
"testing"
"time"

"github.com/MINIOpenSource/CIMS-backend/internal/proto/Protobuf/Enum"
"github.com/MINIOpenSource/CIMS-backend/internal/proto/Protobuf/Server"
)

func TestCommandService_SendCommand_ThreadSafety(t *testing.T) {
s := &CommandService{
clientChans: make(map[string]chan *Server.ClientCommandDeliverScRsp),
}

clientID := "test-client"
ch := make(chan *Server.ClientCommandDeliverScRsp, 10)

s.RegisterClientChannel(clientID, ch)

// Simulate concurrent access: SendCommand vs Unregister/Close

var wg sync.WaitGroup
wg.Add(2)

// Sender routine
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
s.SendCommand(clientID, Enum.CommandTypes_Ping, []byte{})
time.Sleep(100 * time.Microsecond)
}
}()

// Unregister/Closer routine
go func() {
defer wg.Done()
time.Sleep(50 * time.Millisecond) // Let some sends happen

// Simulate the sequence in ListenCommand
s.UnregisterClientChannel(clientID)
close(ch)
}()

wg.Wait()

// If the code is buggy (SendCommand does not hold lock), we might see a panic: "send on closed channel"
// Because SendCommand might get the channel, then Unregister/Close happens, then SendCommand sends.
// With the fix, SendCommand holds lock.
// If it gets lock before Unregister, it sends.
// If Unregister gets lock, it removes from map. SendCommand (later) will not find it.
// close(ch) happens after Unregister.
}
Loading