Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions examples/sign/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,19 @@ func main() {

txMsg := &types.SignTxMessage{
KeyType: types.KeyTypeEd25519,
WalletID: "ad24f678-b04b-4149-bcf6-bf9c90df8e63", // Use the generated wallet ID
WalletID: "88f1512b-2508-4992-a076-5416fb0aae28", // Use the generated wallet ID
NetworkInternalCode: "solana-devnet",
TxID: txID,
Tx: dummyTx,
}
// 3) Use SignTransaction (Async)
err = mpcClient.SignTransaction(txMsg)
if err != nil {
logger.Fatal("SignTransaction failed", err)
}
fmt.Printf("SignTransaction(%q) sent, awaiting result...\n", txID)

// 3) Listen for signing results
// 4) Listen for signing results
err = mpcClient.OnSignResult(func(evt event.SigningResultEvent) {
logger.Info("Signing result received",
"txID", evt.TxID,
Expand Down
99 changes: 99 additions & 0 deletions examples/sign_sync/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
"slices"
"syscall"

"github.com/fystack/mpcium/pkg/client"
"github.com/fystack/mpcium/pkg/config"
"github.com/fystack/mpcium/pkg/logger"
"github.com/fystack/mpcium/pkg/types"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
"github.com/spf13/viper"
)

func main() {
const environment = "dev"
config.InitViperConfig("")
logger.Init(environment, true)

algorithm := viper.GetString("event_initiator_algorithm")
if algorithm == "" {
algorithm = string(types.EventInitiatorKeyTypeEd25519)
}

// Validate algorithm
if !slices.Contains(
[]string{
string(types.EventInitiatorKeyTypeEd25519),
string(types.EventInitiatorKeyTypeP256),
},
algorithm,
) {
logger.Fatal(
fmt.Sprintf(
"invalid algorithm: %s. Must be %s or %s",
algorithm,
types.EventInitiatorKeyTypeEd25519,
types.EventInitiatorKeyTypeP256,
),
nil,
)
}
natsURL := viper.GetString("nats.url")
natsConn, err := nats.Connect(natsURL)
if err != nil {
logger.Fatal("Failed to connect to NATS", err)
}
defer natsConn.Drain()
defer natsConn.Close()

localSigner, err := client.NewLocalSigner(types.EventInitiatorKeyType(algorithm), client.LocalSignerOptions{
KeyPath: "./event_initiator.key",
})
if err != nil {
logger.Fatal("Failed to create local signer", err)
}

mpcClient := client.NewMPCClient(client.Options{
NatsConn: natsConn,
Signer: localSigner,
})

// 2) Once wallet exists, immediately fire a SignTransaction
txID := uuid.New().String()
dummyTx := []byte("deadbeef") // replace with real transaction bytes

txMsg := &types.SignTxMessage{
KeyType: types.KeyTypeEd25519,
WalletID: "88f1512b-2508-4992-a076-5416fb0aae28", // Use the generated wallet ID
NetworkInternalCode: "solana-devnet",
TxID: txID,
Tx: dummyTx,
}
// 3) Use SignTransactionSync to get result directly
ctx := context.Background()
result, err := mpcClient.SignTransactionSync(ctx, txMsg)
if err != nil {
logger.Fatal("SignTransactionSync failed", err)
}

logger.Info("Signing result received",
"txID", result.TxID,
"signature", fmt.Sprintf("%x", result.Signature),
"resultType", result.ResultType,
"errorCode", result.ErrorCode,
"errorReason", result.ErrorReason,
)

stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
<-stop

fmt.Println("Shutting down.")
}
58 changes: 58 additions & 0 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
OnWalletCreationResult(callback func(event event.KeygenResultEvent)) error

SignTransaction(msg *types.SignTxMessage) error
SignTransactionSync(ctx context.Context, msg *types.SignTxMessage) (*event.SigningResultEvent, error)
OnSignResult(callback func(event event.SigningResultEvent)) error

Resharing(msg *types.ResharingMessage) error
Expand Down Expand Up @@ -180,6 +181,63 @@
return nil
}

// SignTransactionSync builds a SignTxMessage, signs it, and publishes it using Request-Reply pattern.
func (c *mpcClient) SignTransactionSync(ctx context.Context, msg *types.SignTxMessage) (*event.SigningResultEvent, error) {
// compute the canonical raw bytes (omitting Signature field)
raw, err := msg.Raw()
if err != nil {
return nil, fmt.Errorf("SignTransactionSync: raw payload error: %w", err)
}
signature, err := c.signer.Sign(raw)
if err != nil {
return nil, fmt.Errorf("SignTransactionSync: failed to sign message: %w", err)
}
msg.Signature = signature

bytes, err := json.Marshal(msg)
if err != nil {
return nil, fmt.Errorf("SignTransactionSync: marshal error: %w", err)
}

// Create a unique reply subject (ephemeral inbox)
replySubject := nats.NewInbox()

// Subscribe to the reply subject
respChan := make(chan *nats.Msg, 1)
sub, err := c.pubsub.Subscribe(replySubject, func(msg *nats.Msg) {
respChan <- msg
})
if err != nil {
return nil, fmt.Errorf("SignTransactionSync: subscribe error: %w", err)
}
defer sub.Unsubscribe()

Check failure on line 213 in pkg/client/client.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `sub.Unsubscribe` is not checked (errcheck)

publishSubject := fmt.Sprintf("mpc.signing_request.%s", msg.TxID)

// Use Headers for Reply to avoid NATS JetStream PubAck on the reply subject
headers := map[string]string{
"Mpc-Reply-To": replySubject,
}

// Pass empty string for reply argument so NATS doesn't expect a transport-level reply (avoids PubAck)
err = c.pubsub.PublishWithReply(publishSubject, "", bytes, headers)
if err != nil {
return nil, fmt.Errorf("SignTransactionSync: publish error: %w", err)
}

// Wait for response
select {
case <-ctx.Done():
return nil, ctx.Err()
case m := <-respChan:
var result event.SigningResultEvent
if err := json.Unmarshal(m.Data, &result); err != nil {
return nil, fmt.Errorf("SignTransactionSync: unmarshal response error: %w", err)
}
return &result, nil
}
}

func (c *mpcClient) OnSignResult(callback func(event event.SigningResultEvent)) error {
err := c.signResultQueue.Dequeue(event.SigningResultCompleteTopic, func(msg []byte) error {
var event event.SigningResultEvent
Expand Down
37 changes: 15 additions & 22 deletions pkg/eventconsumer/event_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (ec *eventConsumer) handleKeyGenEvent(natMsg *nats.Msg) {
ec.handleKeygenSessionError(walletID, err, "Failed to publish key generation success message", natMsg)
return
}
ec.sendReplyToRemoveMsg(natMsg)
ec.sendReply(natMsg, payload)
logger.Info("[COMPLETED KEY GEN] Key generation completed successfully", "walletID", walletID)
}

Expand Down Expand Up @@ -303,7 +303,7 @@ func (ec *eventConsumer) handleKeygenSessionError(walletID string, err error, co
"payload", string(keygenResultBytes),
)
}
ec.sendReplyToRemoveMsg(natMsg)
ec.sendReply(natMsg, keygenResultBytes)
}

func (ec *eventConsumer) startKeyGenEventWorker() {
Expand Down Expand Up @@ -401,7 +401,7 @@ func (ec *eventConsumer) handleSigningEvent(natMsg *nats.Msg) {
msg.WalletID,
msg.TxID,
msg.NetworkInternalCode,
ec.signingResultQueue,
nil,
msg.DerivationPath,
idempotentKey,
)
Expand All @@ -411,7 +411,7 @@ func (ec *eventConsumer) handleSigningEvent(natMsg *nats.Msg) {
msg.WalletID,
msg.TxID,
msg.NetworkInternalCode,
ec.signingResultQueue,
nil,
msg.DerivationPath,
idempotentKey,
)
Expand Down Expand Up @@ -502,7 +502,7 @@ func (ec *eventConsumer) handleSigningEvent(natMsg *nats.Msg) {

onSuccess := func(data []byte) {
done()
ec.sendReplyToRemoveMsg(natMsg)
ec.sendReply(natMsg, data)
}
go session.Sign(onSuccess)
}
Expand Down Expand Up @@ -549,30 +549,23 @@ func (ec *eventConsumer) handleSigningSessionError(walletID, txID, networkIntern
)
return
}
err = ec.signingResultQueue.Enqueue(event.SigningResultCompleteTopic, signingResultBytes, &messaging.EnqueueOptions{
IdempotententKey: composeSigningIdempotentKey(txID, natMsg),
})
if err != nil {
logger.Error("Failed to enqueue signing result event", err,
"walletID", walletID,
"txID", txID,
"payload", string(signingResultBytes),
)
}
ec.sendReplyToRemoveMsg(natMsg)
ec.sendReply(natMsg, signingResultBytes)
}

func (ec *eventConsumer) sendReplyToRemoveMsg(natMsg *nats.Msg) {
msg := natMsg.Data
func (ec *eventConsumer) sendReply(natMsg *nats.Msg, payload []byte) {
reply := natMsg.Header.Get("Mpc-Reply-To")
if reply == "" {
reply = natMsg.Reply
}

if natMsg.Reply == "" {
logger.Warn("No reply inbox specified for sign success message", "msg", string(msg))
if reply == "" {
logger.Warn("No reply inbox specified for sign success message", "msg", string(natMsg.Data))
return
}

err := ec.pubsub.Publish(natMsg.Reply, msg)
err := ec.pubsub.Publish(reply, payload)
if err != nil {
logger.Error("Failed to reply message", err, "reply", natMsg.Reply)
logger.Error("Failed to reply message", err, "reply", reply)
return
}
}
Expand Down
26 changes: 26 additions & 0 deletions pkg/eventconsumer/sign_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,32 @@ func (sc *signingConsumer) handleSigningEvent(msg jetstream.Msg) {
}
if replyMsg != nil {
logger.Info("SigningConsumer: Completed signing event; reply received")

err = sc.signingResultQueue.Enqueue(event.SigningResultCompleteTopic, replyMsg.Data, &messaging.EnqueueOptions{
IdempotententKey: buildIdempotentKey(signingMsg.TxID, sessionID, mpc.TypeSigningResultFmt),
})
if err != nil {
logger.Error("SigningConsumer: Failed to enqueue signing result", err,
"walletID", signingMsg.WalletID,
"txID", signingMsg.TxID,
)
}

// If the original message has a reply header (Mpc-Reply-To), forward the result there
clientReplySubject := msg.Headers().Get("Mpc-Reply-To")
if clientReplySubject != "" {
if err := sc.natsConn.Publish(clientReplySubject, replyMsg.Data); err != nil {
logger.Error("SigningConsumer: Failed to forward result to client", err, "replySubject", clientReplySubject)
} else {
logger.Debug("SigningConsumer: Forwarded result to client", "replySubject", clientReplySubject)
}
} else if msg.Reply() != "" {
// Fallback to msg.Reply() if header not present, but note that this might conflict with JS PubAck
if err := sc.natsConn.Publish(msg.Reply(), replyMsg.Data); err != nil {
logger.Warn("SigningConsumer: Failed to forward result to msg.Reply()", "error", err)
}
}

if ackErr := msg.Ack(); ackErr != nil {
logger.Error("SigningConsumer: ACK failed", ackErr)
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/mpc/ecdsa_signing_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ func (s *ecdsaSigningSession) Init(tx *big.Int) error {
if err != nil {
return errors.Wrap(err, "Failed to unmarshal wallet data")
}


if len(s.derivationPath) > 0 {
il, extendedChildPk, errorDerivation := s.ckd.Derive(s.walletID, data.ECDSAPub, s.derivationPath, tss.S256())
Expand Down Expand Up @@ -204,12 +203,14 @@ func (s *ecdsaSigningSession) Sign(onSuccess func(data []byte)) {
return
}

err = s.resultQueue.Enqueue(event.SigningResultCompleteTopic, bytes, &messaging.EnqueueOptions{
IdempotententKey: s.idempotentKey,
})
if err != nil {
s.ErrCh <- errors.Wrap(err, "Failed to publish sign success message")
return
if s.resultQueue != nil {
err = s.resultQueue.Enqueue(event.SigningResultCompleteTopic, bytes, &messaging.EnqueueOptions{
IdempotententKey: s.idempotentKey,
})
if err != nil {
s.ErrCh <- errors.Wrap(err, "Failed to publish sign success message")
return
}
}

logger.Info("[SIGN] Sign successfully", "walletID", s.walletID)
Expand Down
16 changes: 9 additions & 7 deletions pkg/mpc/eddsa_signing_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ func (s *eddsaSigningSession) Init(tx *big.Int) error {
if err != nil {
return errors.Wrap(err, "Failed to unmarshal wallet data")
}


if len(s.derivationPath) > 0 {
il, extendedChildPk, errorDerivation := s.ckd.Derive(s.walletID, data.EDDSAPub, s.derivationPath, tss.Edwards())
Expand Down Expand Up @@ -192,12 +191,14 @@ func (s *eddsaSigningSession) Sign(onSuccess func(data []byte)) {
return
}

err = s.resultQueue.Enqueue(event.SigningResultCompleteTopic, bytes, &messaging.EnqueueOptions{
IdempotententKey: s.idempotentKey,
})
if err != nil {
s.ErrCh <- errors.Wrap(err, "Failed to publish sign success message")
return
if s.resultQueue != nil {
err = s.resultQueue.Enqueue(event.SigningResultCompleteTopic, bytes, &messaging.EnqueueOptions{
IdempotententKey: s.idempotentKey,
})
if err != nil {
s.ErrCh <- errors.Wrap(err, "Failed to publish sign success message")
return
}
}

logger.Info("[SIGN] Sign successfully", "walletID", s.walletID)
Expand All @@ -213,6 +214,7 @@ func (s *eddsaSigningSession) Sign(onSuccess func(data []byte)) {

}
}

// Close cleans up the EDDSA signing session by zeroing all sensitive data.
func (s *eddsaSigningSession) Close() error {
if s == nil {
Expand Down
Loading