diff --git a/internal/config/blockchain.go b/internal/config/blockchain.go index 0f49720..7b0dde8 100644 --- a/internal/config/blockchain.go +++ b/internal/config/blockchain.go @@ -1,8 +1,16 @@ package config -import "github.com/dv-net/dv-processing/pkg/walletsdk/wconstants" +import ( + "github.com/dv-net/dv-processing/pkg/walletsdk/wconstants" +) + +type EScanner struct { + BlocksInChunk int `yaml:"blocks_in_chunk" default:"20"` +} type Blockchain struct { + EScanner EScanner `yaml:"e_scanner"` + // Tron Tron TronBlockchain diff --git a/internal/config/config.go b/internal/config/config.go index dfec500..1275f57 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -85,6 +85,8 @@ func (o *ResourceManager) Validate() error { } func (c *Config) SetDefaults() { + c.Blockchain.EScanner.BlocksInChunk = 20 + c.ExplorerProxy.Addr = "https://explorer-proxy.dv.net" c.ExplorerProxy.Name = "explorer-proxy-client" diff --git a/internal/escanner/scanner.go b/internal/escanner/scanner.go index 138ebcc..baa8601 100644 --- a/internal/escanner/scanner.go +++ b/internal/escanner/scanner.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strings" + "sync" "sync/atomic" "time" @@ -32,7 +33,6 @@ import ( "github.com/dv-net/dv-processing/pkg/walletsdk/tron" "github.com/dv-net/dv-processing/pkg/walletsdk/wconstants" blocksv2 "github.com/dv-net/dv-proto/gen/go/eproxy/blocks/v2" - incidentsv2 "github.com/dv-net/dv-proto/gen/go/eproxy/incidents/v2" transactionsv2 "github.com/dv-net/dv-proto/gen/go/eproxy/transactions/v2" ) @@ -129,7 +129,7 @@ func (s *scanner) start(ctx context.Context) error { return nil } -// handleBlock +// handleBlocks processes all new blocks using parallel fetching per chunk and sequential commits. func (s *scanner) handleBlocks(ctx context.Context) error { lpBlock := s.lastParsedBlockHeight.Load() lnBlock := s.lastNodeBlockHeight.Load() @@ -145,30 +145,75 @@ func (s *scanner) handleBlocks(ctx context.Context) error { existsLastBlockDB = false } - // Check for potential rollback before processing next block - if existsLastBlockDB && lpBlock >= 0 { - nextBlock := lpBlock + 1 - s.logger.Debugf("Checking for rollback for next block %d", nextBlock) - if err := s.checkForRollback(ctx, nextBlock); err != nil { - return fmt.Errorf("rollback check for block %d: %w", nextBlock, err) - } + blocksInChunk := int64(s.conf.EScanner.BlocksInChunk) + if blocksInChunk <= 0 { + blocksInChunk = 1 + } - // Reload lpBlock in case it was updated during rollback recovery - newLpBlock := s.lastParsedBlockHeight.Load() - if newLpBlock != lpBlock { - s.logger.Infof("Block height updated after rollback check: %d -> %d", lpBlock, newLpBlock) - lpBlock = newLpBlock + // Fetch last known hash once; updated in memory after each committed batch. + var lastKnownHash string + if existsLastBlockDB { + lastBlock, err := s.bs.ProcessedBlocks().LastBlock(ctx, s.blockchain) + if err == nil { + lastKnownHash = lastBlock.Hash } } - for i := lpBlock + 1; i <= lnBlock; i++ { + lag := lnBlock - lpBlock + var catchUpStart time.Time + var catchUpBlocksDone int64 + if lag > blocksInChunk { + catchUpStart = time.Now() + s.logger.Infof("catch-up started: lag=%d blocks (from=%d to=%d)", lag, lpBlock+1, lnBlock) + } + + for batchStart := lpBlock + 1; batchStart <= lnBlock; batchStart += blocksInChunk { if ctx.Err() != nil { return nil //nolint:nilerr } - if err := s.handleBlock(i, existsLastBlockDB); err != nil { //nolint:contextcheck - return fmt.Errorf("handle block %d: %w", i, err) + batchEnd := min(batchStart+blocksInChunk-1, lnBlock) + + results, err := s.loadBlocks(ctx, batchStart, batchEnd) + if err != nil { + return fmt.Errorf("load blocks [%d, %d]: %w", batchStart, batchEnd, err) + } + + // Validate chain integrity before committing — catches rollbacks within the batch too. + if existsLastBlockDB { + if err := s.validateChain(results, lastKnownHash); err != nil { + s.logger.Warnf("chain validation failed: %s", err) + return s.handleRollback(ctx) + } + } + + for _, r := range results { + if err := s.commitBlockResult(ctx, r, existsLastBlockDB); err != nil { + return fmt.Errorf("commit block %d: %w", r.height, err) + } + existsLastBlockDB = true + } + + if len(results) > 0 { + lastKnownHash = results[len(results)-1].blockHash } + + if !catchUpStart.IsZero() { + catchUpBlocksDone += int64(len(results)) + elapsed := time.Since(catchUpStart) + blocksPerSec := float64(catchUpBlocksDone) / elapsed.Seconds() + remaining := lag - catchUpBlocksDone + eta := time.Duration(float64(remaining)/blocksPerSec) * time.Second + s.logger.Infof("catch-up progress: %d/%d blocks done (%.0f blocks/s, ETA %s)", + catchUpBlocksDone, lag, blocksPerSec, eta.Round(time.Second)) + } + } + + if !catchUpStart.IsZero() { + elapsed := time.Since(catchUpStart) + blocksPerSec := float64(lag) / elapsed.Seconds() + s.logger.Infof("catch-up complete: %d blocks in %s (%.0f blocks/s)", + lag, elapsed.Round(time.Millisecond), blocksPerSec) } time.Sleep(100 * time.Millisecond) @@ -176,6 +221,54 @@ func (s *scanner) handleBlocks(ctx context.Context) error { return nil } +// blockFetchResult holds data fetched for a single block (RPC only, no DB). +type blockFetchResult struct { + height int64 + blockHash string + prevHash string + whParams []createWebhookParams +} + +// loadBlocks fetches a range of blocks in parallel (like evm LoadBlocks), preserving order. +func (s *scanner) loadBlocks(ctx context.Context, from, to int64) ([]blockFetchResult, error) { + size := int(to - from + 1) + results := make([]blockFetchResult, size) + errs := make([]error, size) + + var wg sync.WaitGroup + for i := range size { + wg.Add(1) + go func(idx int) { + defer wg.Done() + results[idx], errs[idx] = s.fetchBlockData(ctx, from+int64(idx)) + }(i) + } + wg.Wait() + + for _, err := range errs { + if err != nil { + return nil, err + } + } + + return results, nil +} + +// validateChain checks that prevHash of each block matches the previous block's hash. +// Catches rollbacks both at the batch boundary and within the batch. +func (s *scanner) validateChain(results []blockFetchResult, lastKnownHash string) error { + for i, r := range results { + expected := lastKnownHash + if i > 0 { + expected = results[i-1].blockHash + } + if r.prevHash != "" && expected != "" && r.prevHash != expected { + return fmt.Errorf("block %d: prevHash=%s expected=%s", r.height, r.prevHash, expected) + } + } + return nil +} + type createWebhookParams struct { tx *transactionsv2.Transaction event *transactionsv2.Event @@ -206,40 +299,39 @@ func (cwp *createWebhookParams) IsTrxHotWalletDeposit() bool { *cwp.event.AddressTo != "" } -// handleBlock -func (s *scanner) handleBlock(blockHeight int64, existsLastBlockDB bool) error { //nolint:funlen - ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) +// fetchBlockData fetches all data for a single block via RPC (no DB operations). +func (s *scanner) fetchBlockData(ctx context.Context, blockHeight int64) (blockFetchResult, error) { + ctx, cancel := context.WithTimeout(ctx, 120*time.Second) defer cancel() - s.logger.Debugf("start processing block %d", blockHeight) + s.logger.Debugf("start fetching block %d", blockHeight) now := time.Now() txs, err := s.bs.EProxy().FindTransactions(ctx, s.blockchain, eproxy.FindTransactionsParams{ BlockHeight: utils.Pointer(uint64(blockHeight)), //nolint:gosec }) if err != nil { - return fmt.Errorf("find transactions: %w", err) + return blockFetchResult{}, fmt.Errorf("find transactions: %w", err) } s.logger.Debugf("found %d transactions in block %d in %s", len(txs), blockHeight, time.Since(now)) - // Get block hash for rollback detection block, err := s.bs.EProxy().BlocksClient().Get(ctx, connect.NewRequest(&blocksv2.GetRequest{ Blockchain: eproxy.ConvertBlockchain(s.blockchain), Height: uint64(blockHeight), //nolint:gosec })) if err != nil { - return fmt.Errorf("get block for hash: %w", err) + return blockFetchResult{}, fmt.Errorf("get block for hash: %w", err) } blockHash := block.Msg.GetItem().GetHash() + prevHash := block.Msg.GetItem().GetPrevHash() createWhParams := utils.NewSlice[createWebhookParams]() eg, gCtx := errgroup.WithContext(ctx) eg.SetLimit(100) - now = time.Now() for _, tx := range txs { eg.Go(func() error { for _, event := range tx.Events { @@ -287,16 +379,28 @@ func (s *scanner) handleBlock(blockHeight int64, existsLastBlockDB bool) error { } if err := eg.Wait(); err != nil { - return fmt.Errorf("wait for all transactions: %w", err) + return blockFetchResult{}, fmt.Errorf("wait for all transactions: %w", err) } - // create webhooks + return blockFetchResult{ + height: blockHeight, + blockHash: blockHash, + prevHash: prevHash, + whParams: createWhParams.GetAll(), + }, nil +} + +// commitBlockResult writes a fetched block result to the database (must be called sequentially). +func (s *scanner) commitBlockResult(ctx context.Context, r blockFetchResult, existsLastBlockDB bool) error { + ctx, cancel := context.WithTimeout(ctx, 120*time.Second) + defer cancel() + + now := time.Now() - // handle block in postgres transaction if err := pgx.BeginTxFunc(ctx, s.store.PSQLConn(), pgx.TxOptions{}, func(dbTx pgx.Tx) error { - if len(createWhParams.GetAll()) != 0 { - batchParams := make([]webhooks.BatchCreateParams, 0, createWhParams.Len()) - for _, params := range createWhParams.GetAll() { + if len(r.whParams) != 0 { + batchParams := make([]webhooks.BatchCreateParams, 0, len(r.whParams)) + for _, params := range r.whParams { transactionData := webhooks.TransactionData{ Hash: params.tx.Hash, Confirmations: params.tx.Confirmations, @@ -384,16 +488,16 @@ func (s *scanner) handleBlock(blockHeight int64, existsLastBlockDB bool) error { } } - s.logger.Debugf("processed block %d in %s", blockHeight, time.Since(now)) + s.logger.Debugf("processed block %d in %s", r.height, time.Since(now)) if existsLastBlockDB { - if err := s.bs.ProcessedBlocks().UpdateNumberWithHash(ctx, s.blockchain, blockHeight, blockHash, repos.WithTx(dbTx)); err != nil { + if err := s.bs.ProcessedBlocks().UpdateNumberWithHash(ctx, s.blockchain, r.height, r.blockHash, repos.WithTx(dbTx)); err != nil { return fmt.Errorf("update block number: %w", err) } - s.logger.Debugf("updated last block from explorer %s is %d", s.blockchain.String(), blockHeight) + s.logger.Debugf("updated last block from explorer %s is %d", s.blockchain.String(), r.height) } else { - if err := s.bs.ProcessedBlocks().Create(ctx, s.blockchain, blockHeight, blockHash, repos.WithTx(dbTx)); err != nil { + if err := s.bs.ProcessedBlocks().Create(ctx, s.blockchain, r.height, r.blockHash, repos.WithTx(dbTx)); err != nil { return fmt.Errorf("create block number: %w", err) } } @@ -403,7 +507,7 @@ func (s *scanner) handleBlock(blockHeight int64, existsLastBlockDB bool) error { return err } - s.lastParsedBlockHeight.Store(blockHeight) + s.lastParsedBlockHeight.Store(r.height) return nil } @@ -483,56 +587,6 @@ func (s *scanner) checksForEvent(event *transactionsv2.Event) []eventCheck { return checks } -// checkForRollback checks if the next block's prevHash matches our stored hash -func (s *scanner) checkForRollback(ctx context.Context, nextBlockHeight int64) error { - s.logger.Debugf("Checking rollback for block %d", nextBlockHeight) - - // Get the next block to check prevHash consistency - block, err := s.bs.EProxy().BlocksClient().Get(ctx, connect.NewRequest(&blocksv2.GetRequest{ - Blockchain: eproxy.ConvertBlockchain(s.blockchain), - Height: uint64(nextBlockHeight), //nolint:gosec - })) - if err != nil { - // If we can't get the block, it might not exist yet OR there was a rollback - if strings.Contains(err.Error(), "not found") || strings.Contains(err.Error(), "does not exist") { - s.logger.Warnf("Block %d not found in explorer - checking for rollback incident", nextBlockHeight) - // Check incidents as fallback - the block might have been rolled back - return s.checkForIncidents(ctx) - } - return fmt.Errorf("get block for rollback check: %w", err) - } - - // Get the previous block hash from the next block - nextBlockPrevHash := block.Msg.GetItem().GetPrevHash() - if nextBlockPrevHash == "" { - s.logger.Debugf("block %d has no prevHash, skipping rollback check", nextBlockHeight) - return nil - } - - // Get stored block info from database - lastBlock, err := s.bs.ProcessedBlocks().LastBlock(ctx, s.blockchain) - if err != nil { - if errors.Is(err, storecmn.ErrNotFound) { - s.logger.Debugf("No stored block found, skipping rollback check") - return nil - } - return fmt.Errorf("get last processed block: %w", err) - } - - s.logger.Debugf("Comparing hashes: stored=%s, next_block_prevHash=%s", lastBlock.Hash, nextBlockPrevHash) - - // If we have a stored hash and the next block's prevHash doesn't match it, we have a rollback - if lastBlock.Hash != "" && lastBlock.Hash != nextBlockPrevHash { - s.logger.Warnf("Rollback detected! Stored hash %s != next block's prevHash %s for block %d", - lastBlock.Hash, nextBlockPrevHash, nextBlockHeight) - - return s.handleRollback(ctx) - } - - s.logger.Debugf("No rollback detected for block %d", nextBlockHeight) - return nil -} - // handleRollback handles blockchain rollback by getting the new starting point from incidents API func (s *scanner) handleRollback(ctx context.Context) error { s.logger.Infof("Handling rollback incident for blockchain %s", s.blockchain.String()) @@ -573,120 +627,3 @@ func (s *scanner) handleRollback(ctx context.Context) error { return nil }) } - -// checkForIncidents checks for new rollback incidents and processes them -func (s *scanner) checkForIncidents(ctx context.Context) error { - s.logger.Debugf("Checking for new rollback incidents") - - // Get incidents from explorer - incidents, err := s.bs.EProxy().GetIncidents(ctx, s.blockchain, 10) - if err != nil { - s.logger.Debugf("Failed to get incidents: %v", err) - return nil // Don't fail scanner if we can't get incidents - } - - if len(incidents) == 0 { - s.logger.Debugf("No incidents found") - return nil - } - - // Check each incident - for _, incident := range incidents { - if incident.GetType() != incidentsv2.IncidentType_INCIDENT_TYPE_ROLLBACK { - continue - } - - // Check if this incident was already processed - processed, err := s.bs.ProcessedIncidents().IsProcessed(ctx, s.blockchain, incident.GetId()) - if err != nil { - return fmt.Errorf("check if incident processed: %w", err) - } - - if processed { - s.logger.Debugf("Incident %s already processed, skipping", incident.GetId()) - continue - } - - // New incident found! - rollbackStartBlock := int64(incident.GetDataRollback().GetRevertToBlockHeight()) //nolint:gosec - currentBlock := s.lastParsedBlockHeight.Load() - - // Check if we need to rollback - if currentBlock >= rollbackStartBlock { //nolint:nestif - s.logger.Warnf("New rollback incident detected: id=%s, current_block=%d, rollback_to=%d", - incident.GetId(), currentBlock, rollbackStartBlock-1) - - // Mark incident as processing before handling - if err := s.bs.ProcessedIncidents().MarkAsProcessing(ctx, - s.blockchain, - incident.GetId(), - "rollback", - rollbackStartBlock, - currentBlock); err != nil { - s.logger.Errorf("Failed to mark incident as processing: %v", err) - } - - // Handle the rollback - if err := s.handleRollbackWithIncident(ctx, incident); err != nil { - // Mark as failed - if markErr := s.bs.ProcessedIncidents().MarkAsFailed(ctx, - s.blockchain, - incident.GetId(), - err.Error()); markErr != nil { - s.logger.Errorf("Failed to mark incident as failed: %v", markErr) - } - return fmt.Errorf("handle rollback for incident %s: %w", incident.GetId(), err) - } - - // Mark as completed - if err := s.bs.ProcessedIncidents().MarkAsCompleted(ctx, s.blockchain, incident.GetId()); err != nil { - s.logger.Errorf("Failed to mark incident as completed: %v", err) - } - - s.logger.Infof("Successfully processed rollback incident %s", incident.GetId()) - return nil // Process one incident at a time - } - - s.logger.Debugf("Rollback incident %s not applicable: current_block=%d < rollback_start=%d", - incident.GetId(), currentBlock, rollbackStartBlock) - } - - return nil -} - -// handleRollbackWithIncident handles rollback using incident information -func (s *scanner) handleRollbackWithIncident(ctx context.Context, incident *incidentsv2.Incident) error { - rollbackStartBlock := int64(incident.GetDataRollback().GetRevertToBlockHeight()) //nolint:gosec - rollbackBlockHeight := rollbackStartBlock - 1 - - s.logger.Infof("Processing rollback incident %s: rolling back to block %d", incident.GetId(), rollbackBlockHeight) - - // Get the hash of the rollback target block - block, err := s.bs.EProxy().BlocksClient().Get(ctx, connect.NewRequest(&blocksv2.GetRequest{ - Blockchain: eproxy.ConvertBlockchain(s.blockchain), - Height: uint64(rollbackBlockHeight), //nolint:gosec - })) - if err != nil { - return fmt.Errorf("get rollback block hash: %w", err) - } - - rollbackBlockHash := block.Msg.GetItem().GetHash() - - // Update lastParsedBlockHeight - s.lastParsedBlockHeight.Store(rollbackBlockHeight) - - // Update database - return pgx.BeginTxFunc(ctx, s.store.PSQLConn(), pgx.TxOptions{}, func(dbTx pgx.Tx) error { - if err := s.bs.ProcessedBlocks().UpdateNumberWithHash(ctx, - s.blockchain, - rollbackBlockHeight, - rollbackBlockHash, - repos.WithTx(dbTx)); err != nil { - return fmt.Errorf("update processed block: %w", err) - } - - s.logger.Infof("Rollback completed: stored block %d with hash %s, will resume from block %d", - rollbackBlockHeight, rollbackBlockHash, rollbackStartBlock) - return nil - }) -}