Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 32 additions & 47 deletions internal/claimer/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,24 +77,24 @@ type claimerBlockchain struct {
defaultBlock config.DefaultBlock
}

func (self *claimerBlockchain) submitClaimToBlockchain(
func (cb *claimerBlockchain) submitClaimToBlockchain(
ic *iconsensus.IConsensus,
application *model.Application,
epoch *model.Epoch,
) (common.Hash, error) {
txHash := common.Hash{}
lastBlockNumber := new(big.Int).SetUint64(epoch.LastBlock)
tx, err := ic.SubmitClaim(self.txOpts, application.IApplicationAddress,
tx, err := ic.SubmitClaim(cb.txOpts, application.IApplicationAddress,
lastBlockNumber, *epoch.OutputsMerkleRoot)
if err != nil {
self.logger.Error("submitClaimToBlockchain:failed",
cb.logger.Error("submitClaimToBlockchain:failed",
"appContractAddress", application.IApplicationAddress,
"claimHash", *epoch.OutputsMerkleRoot,
"last_block", epoch.LastBlock,
"error", err)
} else {
txHash = tx.Hash()
self.logger.Debug("submitClaimToBlockchain:success",
cb.logger.Debug("submitClaimToBlockchain:success",
"appContractAddress", application.IApplicationAddress,
"claimHash", *epoch.OutputsMerkleRoot,
"last_block", epoch.LastBlock,
Expand All @@ -103,25 +103,21 @@ func (self *claimerBlockchain) submitClaimToBlockchain(
return txHash, err
}

func unwrapClaimSubmitted(
ic *iconsensus.IConsensus,
func takeOneAndParse[T any](
pull func() (log *types.Log, err error, ok bool),
) (
*iconsensus.IConsensusClaimSubmitted,
bool,
error,
) {
parse func(log types.Log) (*T, error),
) (*T, bool, error) {
log, err, ok := pull()
if !ok || err != nil {
return nil, false, err
}
ev, err := ic.ParseClaimSubmitted(*log)
ev, err := parse(*log)
return ev, true, err
}

// scan the event stream for a claimSubmitted event that matches claim.
// return this event and its successor
func (self *claimerBlockchain) findClaimSubmittedEventAndSucc(
func (cb *claimerBlockchain) findClaimSubmittedEventAndSucc(
ctx context.Context,
application *model.Application,
epoch *model.Epoch,
Expand All @@ -132,7 +128,7 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc(
*iconsensus.IConsensusClaimSubmitted,
error,
) {
ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, self.client)
ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, cb.client)
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -142,6 +138,10 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc(
// - submitter == nil (any)
// - appContract == claim.IApplicationAddress
c, err := iconsensus.IConsensusMetaData.GetAbi()
if err != nil {
return nil, nil, nil, err
}

topics, err := abi.MakeTopics(
[]any{c.Events[model.MonitoredEvent_ClaimSubmitted.String()].ID},
nil,
Expand All @@ -151,7 +151,7 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc(
return nil, nil, nil, err
}

it, err := self.filter.ChunkedFilterLogs(ctx, self.client, ethereum.FilterQuery{
it, err := cb.filter.ChunkedFilterLogs(ctx, cb.client, ethereum.FilterQuery{
FromBlock: new(big.Int).SetUint64(epoch.LastBlock),
ToBlock: endBlock,
Addresses: []common.Address{application.IConsensusAddress},
Expand All @@ -165,15 +165,15 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc(
next, stop := iter.Pull2(it)
defer stop()
for {
event, ok, err := unwrapClaimSubmitted(ic, next)
event, ok, err := takeOneAndParse(next, ic.ParseClaimSubmitted)
if !ok || err != nil {
return ic, event, nil, err
}
lastBlock := event.LastProcessedBlockNumber.Uint64()

if claimSubmittedEventMatches(application, epoch, event) {
// found the event, does it has a successor? try to fetch it
succ, ok, err := unwrapClaimSubmitted(ic, next)
succ, ok, err := takeOneAndParse(next, ic.ParseClaimSubmitted)
if !ok || err != nil {
return ic, event, nil, err
}
Expand All @@ -185,25 +185,9 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc(
}
}

func unwrapClaimAccepted(
ic *iconsensus.IConsensus,
pull func() (log *types.Log, err error, ok bool),
) (
*iconsensus.IConsensusClaimAccepted,
bool,
error,
) {
log, err, ok := pull()
if !ok || err != nil {
return nil, false, err
}
ev, err := ic.ParseClaimAccepted(*log)
return ev, true, err
}

// scan the event stream for a claimAccepted event that matches claim.
// return this event and its successor
func (self *claimerBlockchain) findClaimAcceptedEventAndSucc(
func (cb *claimerBlockchain) findClaimAcceptedEventAndSucc(
ctx context.Context,
application *model.Application,
epoch *model.Epoch,
Expand All @@ -214,7 +198,7 @@ func (self *claimerBlockchain) findClaimAcceptedEventAndSucc(
*iconsensus.IConsensusClaimAccepted,
error,
) {
ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, self.client)
ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, cb.client)
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -231,7 +215,7 @@ func (self *claimerBlockchain) findClaimAcceptedEventAndSucc(
return nil, nil, nil, err
}

it, err := self.filter.ChunkedFilterLogs(ctx, self.client, ethereum.FilterQuery{
it, err := cb.filter.ChunkedFilterLogs(ctx, cb.client, ethereum.FilterQuery{
FromBlock: new(big.Int).SetUint64(epoch.LastBlock),
ToBlock: endBlock,
Addresses: []common.Address{application.IConsensusAddress},
Expand All @@ -245,15 +229,15 @@ func (self *claimerBlockchain) findClaimAcceptedEventAndSucc(
next, stop := iter.Pull2(it)
defer stop()
for {
event, ok, err := unwrapClaimAccepted(ic, next)
event, ok, err := takeOneAndParse(next, ic.ParseClaimAccepted)
if !ok || err != nil {
return ic, event, nil, err
}
lastBlock := event.LastProcessedBlockNumber.Uint64()

if claimAcceptedEventMatches(application, epoch, event) {
// found the event, does it has a successor? try to fetch it
succ, ok, err := unwrapClaimAccepted(ic, next)
succ, ok, err := takeOneAndParse(next, ic.ParseClaimAccepted)
if !ok || err != nil {
return ic, event, nil, err
}
Expand All @@ -265,29 +249,30 @@ func (self *claimerBlockchain) findClaimAcceptedEventAndSucc(
}
}

func (self *claimerBlockchain) getConsensusAddress(
func (cb *claimerBlockchain) getConsensusAddress(
ctx context.Context,
app *model.Application,
) (common.Address, error) {
return ethutil.GetConsensus(ctx, self.client, app.IApplicationAddress)
return ethutil.GetConsensus(ctx, cb.client, app.IApplicationAddress)
}

/* poll a transaction hash for its submission status and receipt */
func (self *claimerBlockchain) pollTransaction(
func (cb *claimerBlockchain) pollTransaction(
ctx context.Context,
txHash common.Hash,
endBlock *big.Int,
) (bool, *types.Receipt, error) {
_, isPending, err := self.client.TransactionByHash(ctx, txHash)
_, isPending, err := cb.client.TransactionByHash(ctx, txHash)
if err != nil || isPending {
return false, nil, err
}

receipt, err := self.client.TransactionReceipt(ctx, txHash)
receipt, err := cb.client.TransactionReceipt(ctx, txHash)
if err != nil {
return false, nil, err
}

// ensure that we respect the commitment
if receipt.BlockNumber.Cmp(endBlock) >= 0 {
return false, receipt, err
}
Expand All @@ -296,9 +281,9 @@ func (self *claimerBlockchain) pollTransaction(
}

/* Retrieve the block number of "DefaultBlock" */
func (self *claimerBlockchain) getBlockNumber(ctx context.Context) (*big.Int, error) {
func (cb *claimerBlockchain) getBlockNumber(ctx context.Context) (*big.Int, error) {
var nr int64
switch self.defaultBlock {
switch cb.defaultBlock {
case model.DefaultBlock_Pending:
nr = rpc.PendingBlockNumber.Int64()
case model.DefaultBlock_Latest:
Expand All @@ -308,10 +293,10 @@ func (self *claimerBlockchain) getBlockNumber(ctx context.Context) (*big.Int, er
case model.DefaultBlock_Safe:
nr = rpc.SafeBlockNumber.Int64()
default:
return nil, fmt.Errorf("default block '%v' not supported", self.defaultBlock)
return nil, fmt.Errorf("default block '%v' not supported", cb.defaultBlock)
}

hdr, err := self.client.HeaderByNumber(ctx, big.NewInt(nr))
hdr, err := cb.client.HeaderByNumber(ctx, big.NewInt(nr))
if err != nil {
return nil, err
}
Expand Down
Loading
Loading