diff --git a/examples/sign/main.go b/examples/sign/main.go index 3424610..6acb51c 100644 --- a/examples/sign/main.go +++ b/examples/sign/main.go @@ -71,18 +71,19 @@ func main() { txMsg := &types.SignTxMessage{ KeyType: types.KeyTypeEd25519, - WalletID: "ad24f678-b04b-4149-bcf6-bf9c90df8e63", // Use the generated wallet ID + WalletID: "88f1512b-2508-4992-a076-5416fb0aae28", // Use the generated wallet ID NetworkInternalCode: "solana-devnet", TxID: txID, Tx: dummyTx, } + // 3) Use SignTransaction (Async) err = mpcClient.SignTransaction(txMsg) if err != nil { logger.Fatal("SignTransaction failed", err) } fmt.Printf("SignTransaction(%q) sent, awaiting result...\n", txID) - // 3) Listen for signing results + // 4) Listen for signing results err = mpcClient.OnSignResult(func(evt event.SigningResultEvent) { logger.Info("Signing result received", "txID", evt.TxID, diff --git a/examples/sign_sync/main.go b/examples/sign_sync/main.go new file mode 100644 index 0000000..5556514 --- /dev/null +++ b/examples/sign_sync/main.go @@ -0,0 +1,99 @@ +package main + +import ( + "context" + "fmt" + "os" + "os/signal" + "slices" + "syscall" + + "github.com/fystack/mpcium/pkg/client" + "github.com/fystack/mpcium/pkg/config" + "github.com/fystack/mpcium/pkg/logger" + "github.com/fystack/mpcium/pkg/types" + "github.com/google/uuid" + "github.com/nats-io/nats.go" + "github.com/spf13/viper" +) + +func main() { + const environment = "dev" + config.InitViperConfig("") + logger.Init(environment, true) + + algorithm := viper.GetString("event_initiator_algorithm") + if algorithm == "" { + algorithm = string(types.EventInitiatorKeyTypeEd25519) + } + + // Validate algorithm + if !slices.Contains( + []string{ + string(types.EventInitiatorKeyTypeEd25519), + string(types.EventInitiatorKeyTypeP256), + }, + algorithm, + ) { + logger.Fatal( + fmt.Sprintf( + "invalid algorithm: %s. Must be %s or %s", + algorithm, + types.EventInitiatorKeyTypeEd25519, + types.EventInitiatorKeyTypeP256, + ), + nil, + ) + } + natsURL := viper.GetString("nats.url") + natsConn, err := nats.Connect(natsURL) + if err != nil { + logger.Fatal("Failed to connect to NATS", err) + } + defer natsConn.Drain() + defer natsConn.Close() + + localSigner, err := client.NewLocalSigner(types.EventInitiatorKeyType(algorithm), client.LocalSignerOptions{ + KeyPath: "./event_initiator.key", + }) + if err != nil { + logger.Fatal("Failed to create local signer", err) + } + + mpcClient := client.NewMPCClient(client.Options{ + NatsConn: natsConn, + Signer: localSigner, + }) + + // 2) Once wallet exists, immediately fire a SignTransaction + txID := uuid.New().String() + dummyTx := []byte("deadbeef") // replace with real transaction bytes + + txMsg := &types.SignTxMessage{ + KeyType: types.KeyTypeEd25519, + WalletID: "88f1512b-2508-4992-a076-5416fb0aae28", // Use the generated wallet ID + NetworkInternalCode: "solana-devnet", + TxID: txID, + Tx: dummyTx, + } + // 3) Use SignTransactionSync to get result directly + ctx := context.Background() + result, err := mpcClient.SignTransactionSync(ctx, txMsg) + if err != nil { + logger.Fatal("SignTransactionSync failed", err) + } + + logger.Info("Signing result received", + "txID", result.TxID, + "signature", fmt.Sprintf("%x", result.Signature), + "resultType", result.ResultType, + "errorCode", result.ErrorCode, + "errorReason", result.ErrorReason, + ) + + stop := make(chan os.Signal, 1) + signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) + <-stop + + fmt.Println("Shutting down.") +} diff --git a/pkg/client/client.go b/pkg/client/client.go index 2ed4dc4..50d0296 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -24,6 +24,7 @@ type MPCClient interface { OnWalletCreationResult(callback func(event event.KeygenResultEvent)) error SignTransaction(msg *types.SignTxMessage) error + SignTransactionSync(ctx context.Context, msg *types.SignTxMessage) (*event.SigningResultEvent, error) OnSignResult(callback func(event event.SigningResultEvent)) error Resharing(msg *types.ResharingMessage) error @@ -180,6 +181,63 @@ func (c *mpcClient) SignTransaction(msg *types.SignTxMessage) error { return nil } +// SignTransactionSync builds a SignTxMessage, signs it, and publishes it using Request-Reply pattern. +func (c *mpcClient) SignTransactionSync(ctx context.Context, msg *types.SignTxMessage) (*event.SigningResultEvent, error) { + // compute the canonical raw bytes (omitting Signature field) + raw, err := msg.Raw() + if err != nil { + return nil, fmt.Errorf("SignTransactionSync: raw payload error: %w", err) + } + signature, err := c.signer.Sign(raw) + if err != nil { + return nil, fmt.Errorf("SignTransactionSync: failed to sign message: %w", err) + } + msg.Signature = signature + + bytes, err := json.Marshal(msg) + if err != nil { + return nil, fmt.Errorf("SignTransactionSync: marshal error: %w", err) + } + + // Create a unique reply subject (ephemeral inbox) + replySubject := nats.NewInbox() + + // Subscribe to the reply subject + respChan := make(chan *nats.Msg, 1) + sub, err := c.pubsub.Subscribe(replySubject, func(msg *nats.Msg) { + respChan <- msg + }) + if err != nil { + return nil, fmt.Errorf("SignTransactionSync: subscribe error: %w", err) + } + defer sub.Unsubscribe() + + publishSubject := fmt.Sprintf("mpc.signing_request.%s", msg.TxID) + + // Use Headers for Reply to avoid NATS JetStream PubAck on the reply subject + headers := map[string]string{ + "Mpc-Reply-To": replySubject, + } + + // Pass empty string for reply argument so NATS doesn't expect a transport-level reply (avoids PubAck) + err = c.pubsub.PublishWithReply(publishSubject, "", bytes, headers) + if err != nil { + return nil, fmt.Errorf("SignTransactionSync: publish error: %w", err) + } + + // Wait for response + select { + case <-ctx.Done(): + return nil, ctx.Err() + case m := <-respChan: + var result event.SigningResultEvent + if err := json.Unmarshal(m.Data, &result); err != nil { + return nil, fmt.Errorf("SignTransactionSync: unmarshal response error: %w", err) + } + return &result, nil + } +} + func (c *mpcClient) OnSignResult(callback func(event event.SigningResultEvent)) error { err := c.signResultQueue.Dequeue(event.SigningResultCompleteTopic, func(msg []byte) error { var event event.SigningResultEvent diff --git a/pkg/eventconsumer/event_consumer.go b/pkg/eventconsumer/event_consumer.go index 087ac67..cb6dd1d 100644 --- a/pkg/eventconsumer/event_consumer.go +++ b/pkg/eventconsumer/event_consumer.go @@ -270,7 +270,7 @@ func (ec *eventConsumer) handleKeyGenEvent(natMsg *nats.Msg) { ec.handleKeygenSessionError(walletID, err, "Failed to publish key generation success message", natMsg) return } - ec.sendReplyToRemoveMsg(natMsg) + ec.sendReply(natMsg, payload) logger.Info("[COMPLETED KEY GEN] Key generation completed successfully", "walletID", walletID) } @@ -303,7 +303,7 @@ func (ec *eventConsumer) handleKeygenSessionError(walletID string, err error, co "payload", string(keygenResultBytes), ) } - ec.sendReplyToRemoveMsg(natMsg) + ec.sendReply(natMsg, keygenResultBytes) } func (ec *eventConsumer) startKeyGenEventWorker() { @@ -401,7 +401,7 @@ func (ec *eventConsumer) handleSigningEvent(natMsg *nats.Msg) { msg.WalletID, msg.TxID, msg.NetworkInternalCode, - ec.signingResultQueue, + nil, msg.DerivationPath, idempotentKey, ) @@ -411,7 +411,7 @@ func (ec *eventConsumer) handleSigningEvent(natMsg *nats.Msg) { msg.WalletID, msg.TxID, msg.NetworkInternalCode, - ec.signingResultQueue, + nil, msg.DerivationPath, idempotentKey, ) @@ -502,7 +502,7 @@ func (ec *eventConsumer) handleSigningEvent(natMsg *nats.Msg) { onSuccess := func(data []byte) { done() - ec.sendReplyToRemoveMsg(natMsg) + ec.sendReply(natMsg, data) } go session.Sign(onSuccess) } @@ -549,30 +549,23 @@ func (ec *eventConsumer) handleSigningSessionError(walletID, txID, networkIntern ) return } - err = ec.signingResultQueue.Enqueue(event.SigningResultCompleteTopic, signingResultBytes, &messaging.EnqueueOptions{ - IdempotententKey: composeSigningIdempotentKey(txID, natMsg), - }) - if err != nil { - logger.Error("Failed to enqueue signing result event", err, - "walletID", walletID, - "txID", txID, - "payload", string(signingResultBytes), - ) - } - ec.sendReplyToRemoveMsg(natMsg) + ec.sendReply(natMsg, signingResultBytes) } -func (ec *eventConsumer) sendReplyToRemoveMsg(natMsg *nats.Msg) { - msg := natMsg.Data +func (ec *eventConsumer) sendReply(natMsg *nats.Msg, payload []byte) { + reply := natMsg.Header.Get("Mpc-Reply-To") + if reply == "" { + reply = natMsg.Reply + } - if natMsg.Reply == "" { - logger.Warn("No reply inbox specified for sign success message", "msg", string(msg)) + if reply == "" { + logger.Warn("No reply inbox specified for sign success message", "msg", string(natMsg.Data)) return } - err := ec.pubsub.Publish(natMsg.Reply, msg) + err := ec.pubsub.Publish(reply, payload) if err != nil { - logger.Error("Failed to reply message", err, "reply", natMsg.Reply) + logger.Error("Failed to reply message", err, "reply", reply) return } } diff --git a/pkg/eventconsumer/sign_consumer.go b/pkg/eventconsumer/sign_consumer.go index e0ea23e..93bfd66 100644 --- a/pkg/eventconsumer/sign_consumer.go +++ b/pkg/eventconsumer/sign_consumer.go @@ -205,6 +205,32 @@ func (sc *signingConsumer) handleSigningEvent(msg jetstream.Msg) { } if replyMsg != nil { logger.Info("SigningConsumer: Completed signing event; reply received") + + err = sc.signingResultQueue.Enqueue(event.SigningResultCompleteTopic, replyMsg.Data, &messaging.EnqueueOptions{ + IdempotententKey: buildIdempotentKey(signingMsg.TxID, sessionID, mpc.TypeSigningResultFmt), + }) + if err != nil { + logger.Error("SigningConsumer: Failed to enqueue signing result", err, + "walletID", signingMsg.WalletID, + "txID", signingMsg.TxID, + ) + } + + // If the original message has a reply header (Mpc-Reply-To), forward the result there + clientReplySubject := msg.Headers().Get("Mpc-Reply-To") + if clientReplySubject != "" { + if err := sc.natsConn.Publish(clientReplySubject, replyMsg.Data); err != nil { + logger.Error("SigningConsumer: Failed to forward result to client", err, "replySubject", clientReplySubject) + } else { + logger.Debug("SigningConsumer: Forwarded result to client", "replySubject", clientReplySubject) + } + } else if msg.Reply() != "" { + // Fallback to msg.Reply() if header not present, but note that this might conflict with JS PubAck + if err := sc.natsConn.Publish(msg.Reply(), replyMsg.Data); err != nil { + logger.Warn("SigningConsumer: Failed to forward result to msg.Reply()", "error", err) + } + } + if ackErr := msg.Ack(); ackErr != nil { logger.Error("SigningConsumer: ACK failed", ackErr) } diff --git a/pkg/mpc/ecdsa_signing_session.go b/pkg/mpc/ecdsa_signing_session.go index 4d5e0d5..0fe8163 100644 --- a/pkg/mpc/ecdsa_signing_session.go +++ b/pkg/mpc/ecdsa_signing_session.go @@ -137,7 +137,6 @@ func (s *ecdsaSigningSession) Init(tx *big.Int) error { if err != nil { return errors.Wrap(err, "Failed to unmarshal wallet data") } - if len(s.derivationPath) > 0 { il, extendedChildPk, errorDerivation := s.ckd.Derive(s.walletID, data.ECDSAPub, s.derivationPath, tss.S256()) @@ -204,12 +203,14 @@ func (s *ecdsaSigningSession) Sign(onSuccess func(data []byte)) { return } - err = s.resultQueue.Enqueue(event.SigningResultCompleteTopic, bytes, &messaging.EnqueueOptions{ - IdempotententKey: s.idempotentKey, - }) - if err != nil { - s.ErrCh <- errors.Wrap(err, "Failed to publish sign success message") - return + if s.resultQueue != nil { + err = s.resultQueue.Enqueue(event.SigningResultCompleteTopic, bytes, &messaging.EnqueueOptions{ + IdempotententKey: s.idempotentKey, + }) + if err != nil { + s.ErrCh <- errors.Wrap(err, "Failed to publish sign success message") + return + } } logger.Info("[SIGN] Sign successfully", "walletID", s.walletID) diff --git a/pkg/mpc/eddsa_signing_session.go b/pkg/mpc/eddsa_signing_session.go index 4f859e5..e38697d 100644 --- a/pkg/mpc/eddsa_signing_session.go +++ b/pkg/mpc/eddsa_signing_session.go @@ -126,7 +126,6 @@ func (s *eddsaSigningSession) Init(tx *big.Int) error { if err != nil { return errors.Wrap(err, "Failed to unmarshal wallet data") } - if len(s.derivationPath) > 0 { il, extendedChildPk, errorDerivation := s.ckd.Derive(s.walletID, data.EDDSAPub, s.derivationPath, tss.Edwards()) @@ -192,12 +191,14 @@ func (s *eddsaSigningSession) Sign(onSuccess func(data []byte)) { return } - err = s.resultQueue.Enqueue(event.SigningResultCompleteTopic, bytes, &messaging.EnqueueOptions{ - IdempotententKey: s.idempotentKey, - }) - if err != nil { - s.ErrCh <- errors.Wrap(err, "Failed to publish sign success message") - return + if s.resultQueue != nil { + err = s.resultQueue.Enqueue(event.SigningResultCompleteTopic, bytes, &messaging.EnqueueOptions{ + IdempotententKey: s.idempotentKey, + }) + if err != nil { + s.ErrCh <- errors.Wrap(err, "Failed to publish sign success message") + return + } } logger.Info("[SIGN] Sign successfully", "walletID", s.walletID) @@ -213,6 +214,7 @@ func (s *eddsaSigningSession) Sign(onSuccess func(data []byte)) { } } + // Close cleans up the EDDSA signing session by zeroing all sensitive data. func (s *eddsaSigningSession) Close() error { if s == nil {