diff --git a/internal/claimer/blockchain.go b/internal/claimer/blockchain.go index e148460dd..6715b6ebf 100644 --- a/internal/claimer/blockchain.go +++ b/internal/claimer/blockchain.go @@ -77,24 +77,24 @@ type claimerBlockchain struct { defaultBlock config.DefaultBlock } -func (self *claimerBlockchain) submitClaimToBlockchain( +func (cb *claimerBlockchain) submitClaimToBlockchain( ic *iconsensus.IConsensus, application *model.Application, epoch *model.Epoch, ) (common.Hash, error) { txHash := common.Hash{} lastBlockNumber := new(big.Int).SetUint64(epoch.LastBlock) - tx, err := ic.SubmitClaim(self.txOpts, application.IApplicationAddress, + tx, err := ic.SubmitClaim(cb.txOpts, application.IApplicationAddress, lastBlockNumber, *epoch.OutputsMerkleRoot) if err != nil { - self.logger.Error("submitClaimToBlockchain:failed", + cb.logger.Error("submitClaimToBlockchain:failed", "appContractAddress", application.IApplicationAddress, "claimHash", *epoch.OutputsMerkleRoot, "last_block", epoch.LastBlock, "error", err) } else { txHash = tx.Hash() - self.logger.Debug("submitClaimToBlockchain:success", + cb.logger.Debug("submitClaimToBlockchain:success", "appContractAddress", application.IApplicationAddress, "claimHash", *epoch.OutputsMerkleRoot, "last_block", epoch.LastBlock, @@ -103,25 +103,21 @@ func (self *claimerBlockchain) submitClaimToBlockchain( return txHash, err } -func unwrapClaimSubmitted( - ic *iconsensus.IConsensus, +func takeOneAndParse[T any]( pull func() (log *types.Log, err error, ok bool), -) ( - *iconsensus.IConsensusClaimSubmitted, - bool, - error, -) { + parse func(log types.Log) (*T, error), +) (*T, bool, error) { log, err, ok := pull() if !ok || err != nil { return nil, false, err } - ev, err := ic.ParseClaimSubmitted(*log) + ev, err := parse(*log) return ev, true, err } // scan the event stream for a claimSubmitted event that matches claim. // return this event and its successor -func (self *claimerBlockchain) findClaimSubmittedEventAndSucc( +func (cb *claimerBlockchain) findClaimSubmittedEventAndSucc( ctx context.Context, application *model.Application, epoch *model.Epoch, @@ -132,7 +128,7 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc( *iconsensus.IConsensusClaimSubmitted, error, ) { - ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, self.client) + ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, cb.client) if err != nil { return nil, nil, nil, err } @@ -142,6 +138,10 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc( // - submitter == nil (any) // - appContract == claim.IApplicationAddress c, err := iconsensus.IConsensusMetaData.GetAbi() + if err != nil { + return nil, nil, nil, err + } + topics, err := abi.MakeTopics( []any{c.Events[model.MonitoredEvent_ClaimSubmitted.String()].ID}, nil, @@ -151,7 +151,7 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc( return nil, nil, nil, err } - it, err := self.filter.ChunkedFilterLogs(ctx, self.client, ethereum.FilterQuery{ + it, err := cb.filter.ChunkedFilterLogs(ctx, cb.client, ethereum.FilterQuery{ FromBlock: new(big.Int).SetUint64(epoch.LastBlock), ToBlock: endBlock, Addresses: []common.Address{application.IConsensusAddress}, @@ -165,7 +165,7 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc( next, stop := iter.Pull2(it) defer stop() for { - event, ok, err := unwrapClaimSubmitted(ic, next) + event, ok, err := takeOneAndParse(next, ic.ParseClaimSubmitted) if !ok || err != nil { return ic, event, nil, err } @@ -173,7 +173,7 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc( if claimSubmittedEventMatches(application, epoch, event) { // found the event, does it has a successor? try to fetch it - succ, ok, err := unwrapClaimSubmitted(ic, next) + succ, ok, err := takeOneAndParse(next, ic.ParseClaimSubmitted) if !ok || err != nil { return ic, event, nil, err } @@ -185,25 +185,9 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc( } } -func unwrapClaimAccepted( - ic *iconsensus.IConsensus, - pull func() (log *types.Log, err error, ok bool), -) ( - *iconsensus.IConsensusClaimAccepted, - bool, - error, -) { - log, err, ok := pull() - if !ok || err != nil { - return nil, false, err - } - ev, err := ic.ParseClaimAccepted(*log) - return ev, true, err -} - // scan the event stream for a claimAccepted event that matches claim. // return this event and its successor -func (self *claimerBlockchain) findClaimAcceptedEventAndSucc( +func (cb *claimerBlockchain) findClaimAcceptedEventAndSucc( ctx context.Context, application *model.Application, epoch *model.Epoch, @@ -214,7 +198,7 @@ func (self *claimerBlockchain) findClaimAcceptedEventAndSucc( *iconsensus.IConsensusClaimAccepted, error, ) { - ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, self.client) + ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, cb.client) if err != nil { return nil, nil, nil, err } @@ -231,7 +215,7 @@ func (self *claimerBlockchain) findClaimAcceptedEventAndSucc( return nil, nil, nil, err } - it, err := self.filter.ChunkedFilterLogs(ctx, self.client, ethereum.FilterQuery{ + it, err := cb.filter.ChunkedFilterLogs(ctx, cb.client, ethereum.FilterQuery{ FromBlock: new(big.Int).SetUint64(epoch.LastBlock), ToBlock: endBlock, Addresses: []common.Address{application.IConsensusAddress}, @@ -245,7 +229,7 @@ func (self *claimerBlockchain) findClaimAcceptedEventAndSucc( next, stop := iter.Pull2(it) defer stop() for { - event, ok, err := unwrapClaimAccepted(ic, next) + event, ok, err := takeOneAndParse(next, ic.ParseClaimAccepted) if !ok || err != nil { return ic, event, nil, err } @@ -253,7 +237,7 @@ func (self *claimerBlockchain) findClaimAcceptedEventAndSucc( if claimAcceptedEventMatches(application, epoch, event) { // found the event, does it has a successor? try to fetch it - succ, ok, err := unwrapClaimAccepted(ic, next) + succ, ok, err := takeOneAndParse(next, ic.ParseClaimAccepted) if !ok || err != nil { return ic, event, nil, err } @@ -265,29 +249,30 @@ func (self *claimerBlockchain) findClaimAcceptedEventAndSucc( } } -func (self *claimerBlockchain) getConsensusAddress( +func (cb *claimerBlockchain) getConsensusAddress( ctx context.Context, app *model.Application, ) (common.Address, error) { - return ethutil.GetConsensus(ctx, self.client, app.IApplicationAddress) + return ethutil.GetConsensus(ctx, cb.client, app.IApplicationAddress) } /* poll a transaction hash for its submission status and receipt */ -func (self *claimerBlockchain) pollTransaction( +func (cb *claimerBlockchain) pollTransaction( ctx context.Context, txHash common.Hash, endBlock *big.Int, ) (bool, *types.Receipt, error) { - _, isPending, err := self.client.TransactionByHash(ctx, txHash) + _, isPending, err := cb.client.TransactionByHash(ctx, txHash) if err != nil || isPending { return false, nil, err } - receipt, err := self.client.TransactionReceipt(ctx, txHash) + receipt, err := cb.client.TransactionReceipt(ctx, txHash) if err != nil { return false, nil, err } + // ensure that we respect the commitment if receipt.BlockNumber.Cmp(endBlock) >= 0 { return false, receipt, err } @@ -296,9 +281,9 @@ func (self *claimerBlockchain) pollTransaction( } /* Retrieve the block number of "DefaultBlock" */ -func (self *claimerBlockchain) getBlockNumber(ctx context.Context) (*big.Int, error) { +func (cb *claimerBlockchain) getBlockNumber(ctx context.Context) (*big.Int, error) { var nr int64 - switch self.defaultBlock { + switch cb.defaultBlock { case model.DefaultBlock_Pending: nr = rpc.PendingBlockNumber.Int64() case model.DefaultBlock_Latest: @@ -308,10 +293,10 @@ func (self *claimerBlockchain) getBlockNumber(ctx context.Context) (*big.Int, er case model.DefaultBlock_Safe: nr = rpc.SafeBlockNumber.Int64() default: - return nil, fmt.Errorf("default block '%v' not supported", self.defaultBlock) + return nil, fmt.Errorf("default block '%v' not supported", cb.defaultBlock) } - hdr, err := self.client.HeaderByNumber(ctx, big.NewInt(nr)) + hdr, err := cb.client.HeaderByNumber(ctx, big.NewInt(nr)) if err != nil { return nil, err } diff --git a/internal/claimer/claimer.go b/internal/claimer/claimer.go index 5c5f4b982..6ae0d0cb4 100644 --- a/internal/claimer/claimer.go +++ b/internal/claimer/claimer.go @@ -104,7 +104,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( apps map[int64]*model.Application, endBlock *big.Int, ) []error { - errs := []error{} + var errs []error var err error // check claims in flight @@ -130,7 +130,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( ) if err != nil { errs = append(errs, err) - return errs + continue } s.Logger.Info("Claim submitted", "app", apps[key].IApplicationAddress, @@ -147,13 +147,13 @@ func (s *Service) submitClaimsAndUpdateDatabase( } // check computed epochs - for key, currEpoch := range computedEpochs { + nextApp: for key, currEpoch := range computedEpochs { var ic *iconsensus.IConsensus var prevClaimSubmissionEvent *iconsensus.IConsensusClaimSubmitted var currClaimSubmissionEvent *iconsensus.IConsensusClaimSubmitted if _, isClaimInFlight := s.claimsInFlight[key]; isClaimInFlight { - continue + continue nextApp } app := apps[key] // guaranteed to exist because of the query and database constraints @@ -163,8 +163,10 @@ func (s *Service) submitClaimsAndUpdateDatabase( if err := s.checkConsensusForAddressChange(app); err != nil { delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } + + // are we dealing with cases {3, 4} or {1, 2}. Do the checks if previousEpochExists { err := checkEpochSequenceConstraint(prevEpoch, currEpoch) if err != nil { @@ -181,17 +183,17 @@ func (s *Service) submitClaimsAndUpdateDatabase( ) delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } - // the previous epoch must have a matching claim submission event. - // current epoch may or may not be present + // previous epoch must have a matching claim submission event. + // current epoch may or may not have one at this point. ic, prevClaimSubmissionEvent, currClaimSubmissionEvent, err = s.blockchain.findClaimSubmittedEventAndSucc(s.Context, app, prevEpoch, endBlock) if err != nil { delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } if prevClaimSubmissionEvent == nil { err = s.setApplicationInoperable( @@ -205,7 +207,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( ) delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } if !claimSubmittedEventMatches(app, prevEpoch, prevClaimSubmissionEvent) { s.Logger.Error("event mismatch", @@ -225,7 +227,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( ) delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } } else { // first claim @@ -234,10 +236,12 @@ func (s *Service) submitClaimsAndUpdateDatabase( if err != nil { delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } } + // claim submission must match the current epoch if we found it. + // otherwise we submit it if the submission flag is enabled. if currClaimSubmissionEvent != nil { s.Logger.Debug("Found ClaimSubmitted Event", "app", currClaimSubmissionEvent.AppContract, @@ -254,7 +258,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( ) delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } s.Logger.Debug("Updating claim status to submitted", "app", app.IApplicationAddress, @@ -271,7 +275,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( if err != nil { delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } delete(s.claimsInFlight, key) s.Logger.Info("Claim previously submitted", @@ -287,7 +291,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( "claim_hash", fmt.Sprintf("%x", prevEpoch.OutputsMerkleRoot), "last_block", prevEpoch.LastBlock, ) - goto nextApp + continue nextApp } s.Logger.Debug("Submitting claim to blockchain", "app", app.IApplicationAddress, @@ -298,7 +302,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( if err != nil { delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } s.claimsInFlight[key] = txHash } else { @@ -309,7 +313,6 @@ func (s *Service) submitClaimsAndUpdateDatabase( ) } - nextApp: } return errs } @@ -325,7 +328,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( var err error // check submitted claims - for key, submittedEpoch := range submittedEpochs { + nextApp: for key, submittedEpoch := range submittedEpochs { var prevEvent *iconsensus.IConsensusClaimAccepted var currEvent *iconsensus.IConsensusClaimAccepted @@ -335,7 +338,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( if err := s.checkConsensusForAddressChange(app); err != nil { delete(submittedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } if prevExists { err := checkEpochSequenceConstraint(acceptedEpoch, submittedEpoch) @@ -348,7 +351,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( ) delete(submittedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } // if prevClaimRow exists, there must be a matching event @@ -357,7 +360,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( if err != nil { delete(submittedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } if prevEvent == nil { s.Logger.Error("Missing event", @@ -367,7 +370,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( ) delete(submittedEpochs, key) errs = append(errs, ErrMissingEvent) - goto nextApp + continue nextApp } if !claimAcceptedEventMatches(app, acceptedEpoch, prevEvent) { s.Logger.Error("Event mismatch", @@ -378,7 +381,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( ) delete(submittedEpochs, key) errs = append(errs, ErrEventMismatch) - goto nextApp + continue nextApp } } else { // first claim @@ -387,7 +390,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( if err != nil { delete(submittedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } } @@ -405,7 +408,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( ) delete(submittedEpochs, key) errs = append(errs, ErrEventMismatch) - goto nextApp + continue nextApp } s.Logger.Debug("Updating claim status to accepted", "app", app.IApplicationAddress, @@ -417,7 +420,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( if err != nil { delete(submittedEpochs, key) errs = append(errs, err) - goto nextApp + continue nextApp } s.Logger.Info("Claim accepted", "app", currEvent.AppContract, @@ -427,7 +430,6 @@ func (s *Service) acceptClaimsAndUpdateDatabase( "tx", txHash, ) } - nextApp: } return errs } @@ -481,7 +483,7 @@ func checkEpochConstraint(c *model.Epoch) error { if c.FirstBlock > c.LastBlock { return fmt.Errorf("unexpected epoch state. first_block: %v > last_block: %v", c.FirstBlock, c.LastBlock) } - if c.Status == model.EpochStatus_ClaimSubmitted { + if c.Status == model.EpochStatus_ClaimSubmitted || c.Status == model.EpochStatus_ClaimComputed || c.Status == model.EpochStatus_ClaimAccepted { if c.OutputsMerkleRoot == nil { return fmt.Errorf("unexpected epoch state. missing claim_hash.") } @@ -520,12 +522,14 @@ func checkEpochSequenceConstraint(prevEpoch *model.Epoch, currEpoch *model.Epoch func claimSubmittedEventMatches(application *model.Application, epoch *model.Epoch, event *iconsensus.IConsensusClaimSubmitted) bool { return application.IApplicationAddress == event.AppContract && + epoch.OutputsMerkleRoot != nil && *epoch.OutputsMerkleRoot == event.OutputsMerkleRoot && epoch.LastBlock == event.LastProcessedBlockNumber.Uint64() } func claimAcceptedEventMatches(application *model.Application, epoch *model.Epoch, event *iconsensus.IConsensusClaimAccepted) bool { return application.IApplicationAddress == event.AppContract && + epoch.OutputsMerkleRoot != nil && *epoch.OutputsMerkleRoot == event.OutputsMerkleRoot && epoch.LastBlock == event.LastProcessedBlockNumber.Uint64() }