Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 26 additions & 15 deletions cmd/mpcium-cli/register-peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/fystack/mpcium/pkg/config"
"github.com/fystack/mpcium/pkg/infra"
"github.com/fystack/mpcium/pkg/logger"
"github.com/hashicorp/consul/api"
"github.com/nats-io/nats.go/jetstream"
"github.com/urfave/cli/v3"
)

Expand All @@ -23,8 +23,7 @@ func registerPeers(ctx context.Context, c *cli.Command) error {
inputPath = "peers.json"
}

// Hardcoded prefix for MPC peers in Consul
prefix := "mpc_peers/"
bucketName := "mpc-peers"

// Validate the input file path for security
if err := pathutil.ValidateFilePath(inputPath); err != nil {
Expand Down Expand Up @@ -57,41 +56,53 @@ func registerPeers(ctx context.Context, c *cli.Command) error {

// Initialize config and logger
config.InitViperConfig(c.String("config"))
appConfig := config.LoadConfig()
logger.Init(environment, true)

// Connect to Consul
client := infra.GetConsulClient(environment)
kv := client.KV()
// Connect to NATS
nc, err := getNATSConnection(environment, appConfig)
if err != nil {
return fmt.Errorf("failed to connect to NATS: %w", err)
}
defer nc.Close()

js, err := jetstream.New(nc)
if err != nil {
return fmt.Errorf("failed to get JetStream context: %w", err)
}

// Register peers in Consul
peersKV, err := infra.NewNatsKVStore(js, "mpc-peers")
if err != nil {
return fmt.Errorf("failed to init mpc-peers KV bucket: %w", err)
}

// Register peers in NATS KV
for nodeName, nodeID := range peerMap {
key := prefix + nodeName
key := nodeName

// Check if the key already exists
existing, _, err := kv.Get(key, nil)
existing, err := peersKV.Get(key)
if err != nil {
return fmt.Errorf("failed to check existing key %s: %w", key, err)
}

if existing != nil {
existingID := string(existing.Value)
existingID := string(existing)
if existingID != nodeID {
return fmt.Errorf("conflict detected: peer %s already exists with ID %s, but trying to register with different ID %s", nodeName, existingID, nodeID)
}
fmt.Printf("Peer %s already registered with same ID %s, skipping\n", nodeName, nodeID)
continue
}

p := &api.KVPair{Key: key, Value: []byte(nodeID)}

// Store the key-value pair
_, err = kv.Put(p, nil)
err = peersKV.Put(key, []byte(nodeID))
if err != nil {
return fmt.Errorf("failed to store key %s: %w", key, err)
}
fmt.Printf("Registered peer %s with ID %s to Consul\n", nodeName, nodeID)
fmt.Printf("Registered peer %s with ID %s to NATS KV\n", nodeName, nodeID)
}

logger.Info("Successfully registered peers to Consul", "peers", peerMap, "prefix", prefix)
logger.Info("Successfully registered peers to NATS KV", "peers", peerMap, "bucket", bucketName)
return nil
}
62 changes: 34 additions & 28 deletions cmd/mpcium/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/fystack/mpcium/pkg/messaging"
"github.com/fystack/mpcium/pkg/mpc"
"github.com/fystack/mpcium/pkg/security"
"github.com/hashicorp/consul/api"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/spf13/viper"
"github.com/urfave/cli/v3"
"golang.org/x/term"
Expand Down Expand Up @@ -144,9 +144,33 @@ func runNode(ctx context.Context, c *cli.Command) error {
// Validate the config values
checkRequiredConfigValues(appConfig)

consulClient := infra.GetConsulClient(environment)
keyinfoStore := keyinfo.NewStore(consulClient.KV())
peers := LoadPeersFromConsul(consulClient)
natsConn, err := GetNATSConnection(environment, appConfig)
if err != nil {
logger.Fatal("Failed to connect to NATS", err)
}

js, err := jetstream.New(natsConn)
if err != nil {
logger.Fatal("Failed to get JetStream context", err)
}

readyKV, err := infra.NewNatsKVStore(js, "mpc-ready")
if err != nil {
logger.Fatal("Failed to init mpc-ready KV bucket", err)
}

keyinfoKV, err := infra.NewNatsKVStore(js, "mpc-keyinfo")
if err != nil {
logger.Fatal("Failed to init mpc-keyinfo KV bucket", err)
}

peersKV, err := infra.NewNatsKVStore(js, "mpc-peers")
if err != nil {
logger.Fatal("Failed to init mpc-peers KV bucket", err)
}

keyinfoStore := keyinfo.NewStore(keyinfoKV)
peers := LoadPeersFromNatsKV(peersKV)
nodeID := GetIDFromName(nodeName, peers)

badgerKV := NewBadgerKV(nodeName, nodeID, appConfig)
Expand All @@ -165,11 +189,6 @@ func runNode(ctx context.Context, c *cli.Command) error {
logger.Fatal("Failed to create identity store", err)
}

natsConn, err := GetNATSConnection(environment, appConfig)
if err != nil {
logger.Fatal("Failed to connect to NATS", err)
}

pubsub := messaging.NewNATSPubSub(natsConn)
keygenBroker, err := messaging.NewJetStreamBroker(ctx, natsConn, event.KeygenBrokerStream, []string{
event.KeygenRequestTopic,
Expand Down Expand Up @@ -201,7 +220,7 @@ func runNode(ctx context.Context, c *cli.Command) error {
logger.Info("Starting mpcium node", "version", Version, "ID", nodeID, "name", nodeName)

peerNodeIDs := GetPeerIDs(peers)
peerRegistry := mpc.NewRegistry(nodeID, peerNodeIDs, consulClient.KV(), directMessaging, pubsub, identityStore)
peerRegistry := mpc.NewRegistry(nodeID, peerNodeIDs, readyKV, directMessaging, pubsub, identityStore)

chainCodeHex := viper.GetString("chain_code")
ckd, err := mpc.NewCKDFromHex(chainCodeHex)
Expand Down Expand Up @@ -256,7 +275,7 @@ func runNode(ctx context.Context, c *cli.Command) error {
if healthAddr == "" {
healthAddr = ":8080"
}
healthServer = healthcheck.NewServer(healthAddr, peerRegistry, natsConn, consulClient)
healthServer = healthcheck.NewServer(healthAddr, peerRegistry, natsConn)
go func() {
if err := healthServer.Start(); err != nil {
logger.Error("Health check server error", err)
Expand Down Expand Up @@ -460,25 +479,12 @@ func checkRequiredConfigValues(appConfig *config.AppConfig) {
}
}

func NewConsulClient(addr string) *api.Client {
// Create a new Consul client
consulConfig := api.DefaultConfig()
consulConfig.Address = addr
consulClient, err := api.NewClient(consulConfig)
if err != nil {
logger.Fatal("Failed to create consul client", err)
}
logger.Info("Connected to consul!")
return consulClient
}

func LoadPeersFromConsul(consulClient *api.Client) []config.Peer { // Create a Consul Key-Value store client
kv := consulClient.KV()
peers, err := config.LoadPeersFromConsul(kv, "mpc_peers/")
func LoadPeersFromNatsKV(peersKV infra.NatsKV) []config.Peer {
peers, err := config.LoadPeersFromNatsKV(peersKV)
if err != nil {
logger.Fatal("Failed to load peers from Consul", err)
logger.Fatal("Failed to load peers from NATS KV", err)
}
logger.Info("Loaded peers from consul", "peers", peers)
logger.Info("Loaded peers from NATS KV", "peers", peers)

return peers
}
Expand Down
77 changes: 33 additions & 44 deletions e2e/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ import (
"github.com/dgraph-io/badger/v4/options"
"github.com/fystack/mpcium/pkg/client"
"github.com/fystack/mpcium/pkg/event"
"github.com/fystack/mpcium/pkg/infra"
"github.com/fystack/mpcium/pkg/kvstore"
"github.com/fystack/mpcium/pkg/types"
"github.com/hashicorp/consul/api"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
)
Expand All @@ -35,9 +36,6 @@ type TestConfig struct {
Nats struct {
URL string `yaml:"url"`
} `yaml:"nats"`
Consul struct {
Address string `yaml:"address"`
} `yaml:"consul"`
MPCThreshold int `yaml:"mpc_threshold"`
Environment string `yaml:"environment"`
BadgerPassword string `yaml:"badger_password"`
Expand All @@ -49,7 +47,7 @@ type TestConfig struct {

type E2ETestSuite struct {
ctx context.Context
consulClient *api.Client
cancel context.CancelFunc
natsConn *nats.Conn
mpcClient client.MPCClient
testDir string
Expand All @@ -62,9 +60,10 @@ type E2ETestSuite struct {
}

func NewE2ETestSuite(testDir string) *E2ETestSuite {
ctx, _ := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
return &E2ETestSuite{
ctx: ctx,
cancel: cancel,
testDir: testDir,
walletIDs: make([]string, 0),
keygenResults: make(map[string]*event.KeygenResultEvent),
Expand Down Expand Up @@ -148,18 +147,7 @@ func (s *E2ETestSuite) setupClients(t *testing.T) {
var err error

// Use the fixed ports from docker-compose.test.yaml
consulPort := 8501 // consul-test service maps 8501:8500
natsPort := 4223 // nats-server-test service maps 4223:4222

// Setup Consul client
consulConfig := api.DefaultConfig()
consulConfig.Address = fmt.Sprintf("localhost:%d", consulPort)
s.consulClient, err = api.NewClient(consulConfig)
require.NoError(t, err, "Failed to create Consul client")

// Test Consul connection
_, err = s.consulClient.Agent().Self()
require.NoError(t, err, "Failed to connect to Consul")
natsPort := 4223 // nats-server-test service maps 4223:4222

// Setup NATS client
natsConn, err := nats.Connect(fmt.Sprintf("nats://localhost:%d", natsPort))
Expand Down Expand Up @@ -216,13 +204,7 @@ func (s *E2ETestSuite) SetupTestNodes(t *testing.T) {
}

func (s *E2ETestSuite) RegisterPeers(t *testing.T) {
t.Log("Registering peers in Consul...")

// Check Consul health before proceeding
t.Log("Checking Consul health...")
_, err := s.consulClient.Status().Leader()
require.NoError(t, err, "Consul is not healthy")
t.Log("Consul is healthy")
t.Log("Registering peers in NATS KV...")

// Use mpcium register-peers command instead of manual registration
t.Log("Running mpcium-cli register-peers...")
Expand All @@ -237,20 +219,24 @@ func (s *E2ETestSuite) RegisterPeers(t *testing.T) {
require.NoError(t, err, "Failed to register peers")
}

t.Log("Peers registered in Consul")
t.Log("Peers registered in NATS KV")

// List current peers to verify registration
t.Log("Listing current peers in Consul...")
kv := s.consulClient.KV()
t.Log("Listing current peers in NATS KV...")
js, err := jetstream.New(s.natsConn)
require.NoError(t, err, "Failed to get JetStream context")

peersKV, err := infra.NewNatsKVStore(js, "mpc-peers")
require.NoError(t, err, "Failed to init mpc-peers KV bucket")

// Get all keys under the mpc_peers/ prefix (matches register-peers command)
pairs, _, err := kv.List("mpc_peers/", nil)
// Get all keys (empty prefix)
pairs, err := peersKV.List("")
if err != nil {
t.Logf("Failed to list peers: %v", err)
} else {
t.Logf("Found %d peer entries in Consul under 'mpc_peers/':", len(pairs))
for _, pair := range pairs {
t.Logf(" - Key: %s, Value: %s", pair.Key, string(pair.Value))
t.Logf("Found %d peer entries in NATS KV under 'mpc_peers/':", len(pairs))
for k, v := range pairs {
t.Logf(" - Key: %s, Value: %s", k, string(v))
}
}

Expand All @@ -267,13 +253,12 @@ func (s *E2ETestSuite) RegisterPeers(t *testing.T) {
func (s *E2ETestSuite) StartNodes(t *testing.T) {
t.Log("Starting MPC nodes...")

// Double-check that Consul is still accessible before starting nodes
t.Log("Verifying Consul is still accessible...")
_, err := s.consulClient.Status().Leader()
if err != nil {
t.Logf("Consul connection test failed: %v", err)
// Double-check that NATS is still accessible before starting nodes
t.Log("Verifying NATS is still accessible...")
if !s.natsConn.IsConnected() {
t.Log("NATS connection lost")
} else {
t.Log("Consul is still accessible")
t.Log("NATS is still accessible")
}

s.mpciumProcesses = make([]*exec.Cmd, numNodes)
Expand Down Expand Up @@ -318,12 +303,11 @@ func (s *E2ETestSuite) StartNodes(t *testing.T) {
time.Sleep(5 * time.Second)

// Verify containers are still accessible
t.Log("Final verification that Consul is still accessible...")
_, err = s.consulClient.Status().Leader()
if err != nil {
t.Logf("Consul connection test failed after starting nodes: %v", err)
t.Log("Final verification that NATS is still accessible...")
if !s.natsConn.IsConnected() {
t.Log("NATS connection lost after starting nodes")
} else {
t.Log("Consul is still accessible after starting nodes")
t.Log("NATS is still accessible after starting nodes")
}

// Show recent logs from each node
Expand Down Expand Up @@ -562,6 +546,11 @@ func (s *E2ETestSuite) Cleanup(t *testing.T) {
s.natsConn.Close()
}

// Cancel the context to release resources
if s.cancel != nil {
s.cancel()
}

// Stop Docker Compose stack
t.Log("Stopping Docker Compose stack...")
cmd := exec.Command("docker", "compose", "-f", "docker-compose.test.yaml", "down", "-v")
Expand Down
9 changes: 0 additions & 9 deletions e2e/docker-compose.test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,3 @@ services:
- "6223:6222"
tty: true
restart: always

consul-test:
image: consul:1.15.4
container_name: consul-test
ports:
- "8501:8500"
- "8602:8600/udp"
command: "agent -server -ui -node=server-1 -bootstrap-expect=1 -client=0.0.0.0"
restart: always
Loading
Loading