diff --git a/cmd/cartesi-rollups-advancer/root/root.go b/cmd/cartesi-rollups-advancer/root/root.go index d8680f501..65a37964a 100644 --- a/cmd/cartesi-rollups-advancer/root/root.go +++ b/cmd/cartesi-rollups-advancer/root/root.go @@ -80,19 +80,21 @@ func run(cmd *cobra.Command, args []string) { defer cancel() createInfo := advancer.CreateInfo{ - CreateInfo: service.CreateInfo{ - Name: config.ServiceAdvancer, - LogLevel: config.ResolveServiceLogLevel(config.ServiceAdvancer, cfg.LogLevel), - LogColor: cfg.LogColor, - EnableSignalHandling: true, - TelemetryCreate: true, - TelemetryAddress: cfg.AdvancerTelemetryAddress, - PollInterval: cfg.AdvancerPollingInterval, + TickServiceConfigs: service.TickServiceConfigs{ + PollInterval: cfg.AdvancerPollingInterval, + ServiceConfigs: service.ServiceConfigs{ + Name: config.ServiceAdvancer, + LogLevel: config.ResolveServiceLogLevel(config.ServiceAdvancer, cfg.LogLevel), + LogColor: cfg.LogColor, + EnableSignalHandling: true, + TelemetryCreate: true, + TelemetryAddress: cfg.AdvancerTelemetryAddress, + }, }, Config: *cfg, } - logger := service.NewServiceLogger(&createInfo.CreateInfo) - createInfo.CreateInfo.Logger = logger + logger := service.NewServiceLogger(&createInfo.ServiceConfigs) + createInfo.ServiceConfigs.Logger = logger var err error createInfo.Repository, err = factory.NewRepositoryFromConnectionString(ctx, cfg.DatabaseConnection.Raw()) @@ -101,7 +103,6 @@ func run(cmd *cobra.Command, args []string) { advancerService, err := advancer.Create(ctx, &createInfo) cli.CheckErr(logger, err) - advancerService.LogConfig(createInfo.Config) cli.CheckErr(logger, advancerService.Serve()) } diff --git a/cmd/cartesi-rollups-claimer/root/root.go b/cmd/cartesi-rollups-claimer/root/root.go index d0a98b39a..b1d3b967c 100644 --- a/cmd/cartesi-rollups-claimer/root/root.go +++ b/cmd/cartesi-rollups-claimer/root/root.go @@ -81,19 +81,21 @@ func run(cmd *cobra.Command, args []string) { defer cancel() createInfo := claimer.CreateInfo{ - CreateInfo: service.CreateInfo{ - Name: config.ServiceClaimer, - LogLevel: config.ResolveServiceLogLevel(config.ServiceClaimer, cfg.LogLevel), - LogColor: cfg.LogColor, - EnableSignalHandling: true, - TelemetryCreate: true, - TelemetryAddress: cfg.ClaimerTelemetryAddress, - PollInterval: cfg.ClaimerPollingInterval, + TickServiceConfigs: service.TickServiceConfigs{ + PollInterval: cfg.ClaimerPollingInterval, + ServiceConfigs: service.ServiceConfigs{ + Name: config.ServiceClaimer, + LogLevel: config.ResolveServiceLogLevel(config.ServiceClaimer, cfg.LogLevel), + LogColor: cfg.LogColor, + EnableSignalHandling: true, + TelemetryCreate: true, + TelemetryAddress: cfg.ClaimerTelemetryAddress, + }, }, Config: *cfg, } - logger := service.NewServiceLogger(&createInfo.CreateInfo) - createInfo.CreateInfo.Logger = logger + logger := service.NewServiceLogger(&createInfo.ServiceConfigs) + createInfo.ServiceConfigs.Logger = logger authOpt, err := config.HTTPAuthorizationOption() cli.CheckErr(logger, err) @@ -112,7 +114,6 @@ func run(cmd *cobra.Command, args []string) { claimerService, err := claimer.Create(ctx, &createInfo) cli.CheckErr(logger, err) - claimerService.LogConfig(createInfo.Config) err = claimerService.Serve() cli.CheckErr(logger, err) diff --git a/cmd/cartesi-rollups-evm-reader/root/root.go b/cmd/cartesi-rollups-evm-reader/root/root.go index cae77132f..d19dbffe8 100644 --- a/cmd/cartesi-rollups-evm-reader/root/root.go +++ b/cmd/cartesi-rollups-evm-reader/root/root.go @@ -82,7 +82,7 @@ func run(cmd *cobra.Command, args []string) { defer cancel() createInfo := evmreader.CreateInfo{ - CreateInfo: service.CreateInfo{ + ServiceConfigs: service.ServiceConfigs{ Name: config.ServiceEvmReader, LogLevel: config.ResolveServiceLogLevel(config.ServiceEvmReader, cfg.LogLevel), LogColor: cfg.LogColor, @@ -92,8 +92,8 @@ func run(cmd *cobra.Command, args []string) { }, Config: *cfg, } - logger := service.NewServiceLogger(&createInfo.CreateInfo) - createInfo.CreateInfo.Logger = logger + logger := service.NewServiceLogger(&createInfo.ServiceConfigs) + createInfo.ServiceConfigs.Logger = logger var err error authOpt, err := config.HTTPAuthorizationOption() @@ -117,7 +117,6 @@ func run(cmd *cobra.Command, args []string) { readerService, err := evmreader.Create(ctx, &createInfo) cli.CheckErr(logger, err) - readerService.LogConfig(createInfo.Config) cli.CheckErr(logger, readerService.Serve()) } diff --git a/cmd/cartesi-rollups-jsonrpc-api/root/root.go b/cmd/cartesi-rollups-jsonrpc-api/root/root.go index 1da348eff..a18b984bf 100644 --- a/cmd/cartesi-rollups-jsonrpc-api/root/root.go +++ b/cmd/cartesi-rollups-jsonrpc-api/root/root.go @@ -68,7 +68,7 @@ func run(cmd *cobra.Command, args []string) { defer cancel() createInfo := jsonrpc.CreateInfo{ - CreateInfo: service.CreateInfo{ + ServiceConfigs: service.ServiceConfigs{ Name: config.ServiceJsonrpc, LogLevel: config.ResolveServiceLogLevel(config.ServiceJsonrpc, cfg.LogLevel), LogColor: cfg.LogColor, @@ -78,8 +78,8 @@ func run(cmd *cobra.Command, args []string) { }, Config: *cfg, } - logger := service.NewServiceLogger(&createInfo.CreateInfo) - createInfo.CreateInfo.Logger = logger + logger := service.NewServiceLogger(&createInfo.ServiceConfigs) + createInfo.ServiceConfigs.Logger = logger var err error createInfo.Repository, err = factory.NewRepositoryFromConnectionString(ctx, cfg.DatabaseConnection.Raw()) @@ -88,7 +88,6 @@ func run(cmd *cobra.Command, args []string) { jsonrpcService, err := jsonrpc.Create(ctx, &createInfo) cli.CheckErr(logger, err) - jsonrpcService.LogConfig(createInfo.Config) cli.CheckErr(logger, jsonrpcService.Serve()) } diff --git a/cmd/cartesi-rollups-node/root/root.go b/cmd/cartesi-rollups-node/root/root.go index f534f1d83..0a2d8f9fe 100644 --- a/cmd/cartesi-rollups-node/root/root.go +++ b/cmd/cartesi-rollups-node/root/root.go @@ -150,7 +150,7 @@ func run(cmd *cobra.Command, args []string) { defer cancel() createInfo := node.CreateInfo{ - CreateInfo: service.CreateInfo{ + ServiceConfigs: service.ServiceConfigs{ Name: config.ServiceNode, LogLevel: cfg.LogLevel, LogColor: cfg.LogColor, @@ -160,8 +160,8 @@ func run(cmd *cobra.Command, args []string) { }, Config: *cfg, } - logger := service.NewServiceLogger(&createInfo.CreateInfo) - createInfo.CreateInfo.Logger = logger + logger := service.NewServiceLogger(&createInfo.ServiceConfigs) + createInfo.ServiceConfigs.Logger = logger var err error createInfo.ReaderClient, err = newEthClient(ctx, config.ServiceEvmReader) @@ -183,7 +183,6 @@ func run(cmd *cobra.Command, args []string) { nodeService, err := node.Create(ctx, &createInfo) cli.CheckErr(logger, err) - nodeService.LogConfig(createInfo.Config) cli.CheckErr(logger, nodeService.Serve()) } diff --git a/cmd/cartesi-rollups-prt/root/root.go b/cmd/cartesi-rollups-prt/root/root.go index e7f13673d..d2d60f0c2 100644 --- a/cmd/cartesi-rollups-prt/root/root.go +++ b/cmd/cartesi-rollups-prt/root/root.go @@ -69,19 +69,21 @@ func run(cmd *cobra.Command, args []string) { defer cancel() createInfo := prt.CreateInfo{ - CreateInfo: service.CreateInfo{ - Name: config.ServicePrt, - LogLevel: config.ResolveServiceLogLevel(config.ServicePrt, cfg.LogLevel), - LogColor: cfg.LogColor, - EnableSignalHandling: true, - TelemetryCreate: true, - TelemetryAddress: cfg.PrtTelemetryAddress, - PollInterval: cfg.PrtPollingInterval, + TickServiceConfigs: service.TickServiceConfigs{ + PollInterval: cfg.PrtPollingInterval, + ServiceConfigs: service.ServiceConfigs{ + Name: config.ServicePrt, + LogLevel: config.ResolveServiceLogLevel(config.ServicePrt, cfg.LogLevel), + LogColor: cfg.LogColor, + EnableSignalHandling: true, + TelemetryCreate: true, + TelemetryAddress: cfg.PrtTelemetryAddress, + }, }, Config: *cfg, } - logger := service.NewServiceLogger(&createInfo.CreateInfo) - createInfo.CreateInfo.Logger = logger + logger := service.NewServiceLogger(&createInfo.ServiceConfigs) + createInfo.ServiceConfigs.Logger = logger var err error authOpt, err := config.HTTPAuthorizationOption() @@ -101,7 +103,6 @@ func run(cmd *cobra.Command, args []string) { prtService, err := prt.Create(ctx, &createInfo) cli.CheckErr(logger, err) - prtService.LogConfig(createInfo.Config) cli.CheckErr(logger, prtService.Serve()) } diff --git a/cmd/cartesi-rollups-validator/root/root.go b/cmd/cartesi-rollups-validator/root/root.go index 1fa08a336..7c0347529 100644 --- a/cmd/cartesi-rollups-validator/root/root.go +++ b/cmd/cartesi-rollups-validator/root/root.go @@ -68,19 +68,21 @@ func run(cmd *cobra.Command, args []string) { defer cancel() createInfo := validator.CreateInfo{ - CreateInfo: service.CreateInfo{ - Name: config.ServiceValidator, - LogLevel: config.ResolveServiceLogLevel(config.ServiceValidator, cfg.LogLevel), - LogColor: cfg.LogColor, - EnableSignalHandling: true, - TelemetryCreate: true, - TelemetryAddress: cfg.ValidatorTelemetryAddress, + TickServiceConfigs: service.TickServiceConfigs{ PollInterval: cfg.ValidatorPollingInterval, + ServiceConfigs: service.ServiceConfigs{ + Name: config.ServiceValidator, + LogLevel: config.ResolveServiceLogLevel(config.ServiceValidator, cfg.LogLevel), + LogColor: cfg.LogColor, + EnableSignalHandling: true, + TelemetryCreate: true, + TelemetryAddress: cfg.ValidatorTelemetryAddress, + }, }, Config: *cfg, } - logger := service.NewServiceLogger(&createInfo.CreateInfo) - createInfo.CreateInfo.Logger = logger + logger := service.NewServiceLogger(&createInfo.ServiceConfigs) + createInfo.ServiceConfigs.Logger = logger var err error createInfo.Repository, err = factory.NewRepositoryFromConnectionString(ctx, cfg.DatabaseConnection.Raw()) @@ -89,7 +91,6 @@ func run(cmd *cobra.Command, args []string) { validatorService, err := validator.Create(ctx, &createInfo) cli.CheckErr(logger, err) - validatorService.LogConfig(createInfo.Config) cli.CheckErr(logger, validatorService.Serve()) } diff --git a/internal/advancer/advancer.go b/internal/advancer/advancer.go index 68a6b639c..0c2f3abaa 100644 --- a/internal/advancer/advancer.go +++ b/internal/advancer/advancer.go @@ -70,15 +70,11 @@ func getUnprocessedInputs( // potentially has more work. Callers use this to decide whether to re-tick immediately // (via the Reschedule channel) or wait for the next timer/event. func (s *Service) Step(ctx context.Context) (bool, error) { - // Check for context cancellation or shutdown in progress. - // The framework sets Stopping before calling Impl.Stop(), so this + // Check for context cancellation or shutdown in progress. This // prevents starting new work while the machine manager is being torn down. if err := ctx.Err(); err != nil { return false, err } - if s.IsStopping() { - return false, nil - } // Update the machine manager with any new or disabled applications err := s.machineManager.UpdateMachines(ctx) @@ -306,7 +302,7 @@ func (s *Service) processInputs(ctx context.Context, app *Application, inputs [] "epoch", input.EpochIndex, "index", input.Index, "error", err) - s.Cancel() // triggers graceful shutdown of all services + s.Stop(true) // triggers graceful shutdown of all services return err } diff --git a/internal/advancer/advancer_test.go b/internal/advancer/advancer_test.go index 229825d29..454f82502 100644 --- a/internal/advancer/advancer_test.go +++ b/internal/advancer/advancer_test.go @@ -33,22 +33,47 @@ func TestAdvancer(t *testing.T) { type AdvancerSuite struct{ suite.Suite } +const defaultBatchSize = 500 + func newMockAdvancerService(machineManager *MockMachineManager, repo *MockRepository) (*Service, error) { - return newMockAdvancerServiceWithBatchSize(machineManager, repo, 500) + return newMockAdvancerServiceWithBatchSize(machineManager, repo, defaultBatchSize) } func newMockAdvancerServiceWithBatchSize( machineManager *MockMachineManager, repo *MockRepository, batchSize uint64, +) (*Service, error) { + ctx, cf := context.WithCancel(context.Background()) + return newMockAdvancerServiceWithContextAndBatchSize( + ctx, + cf, + machineManager, + repo, + batchSize, + ) +} + +func newMockAdvancerServiceWithContextAndBatchSize( + ctx context.Context, + cancelCtx context.CancelFunc, + machineManager *MockMachineManager, + repo *MockRepository, + batchSize uint64, ) (*Service, error) { s := &Service{ inputBatchSize: batchSize, machineManager: machineManager, repository: repo, } - serviceArgs := &service.CreateInfo{Name: "advancer", Impl: s, EnableReschedule: true} - err := service.Create(context.Background(), serviceArgs, &s.Service) + serviceArgs := &service.TickServiceConfigs{ + ServiceConfigs: service.ServiceConfigs{ + Name: "advancer", + Context: ctx, + Cancel: cancelCtx, + }, + } + err := service.InitTickServiceTemplate(serviceArgs, &s.TickServiceTemplate, s, s) if err != nil { return nil, err } @@ -59,6 +84,7 @@ func newMockAdvancerServiceWithBatchSize( // the mock machine manager, and the mock repository. type testEnv struct { service *Service + ctx context.Context app *MockMachineImpl mm *MockMachineManager repo *MockRepository @@ -67,13 +93,14 @@ type testEnv struct { // setupOneApp creates a standard test environment with one application. // The repository is empty; callers can configure it after the call. func (s *AdvancerSuite) setupOneApp() testEnv { + ctx, cf := context.WithCancel(context.Background()) mm := newMockMachineManager() app := newMockMachine(1) mm.Map[1] = newMockInstance(app) repo := &MockRepository{} - svc, err := newMockAdvancerService(mm, repo) + svc, err := newMockAdvancerServiceWithContextAndBatchSize(ctx, cf, mm, repo, defaultBatchSize) s.Require().NoError(err) - return testEnv{service: svc, app: app, mm: mm, repo: repo} + return testEnv{service: svc, ctx: ctx, app: app, mm: mm, repo: repo} } func (s *AdvancerSuite) TestServiceInterface() { @@ -97,12 +124,12 @@ func (s *AdvancerSuite) TestServiceInterface() { repository.GetEpochsReturn = map[common.Address][]*Epoch{ machineManager.Map[1].application.IApplicationAddress: {}, } - tickErrors := advancer.Tick() + _, tickErrors := advancer.Tick(context.Background()) require.Empty(tickErrors) // Test Tick with error repository.GetEpochsError = errors.New("list epochs error") - tickErrors = advancer.Tick() + _, tickErrors = advancer.Tick(context.Background()) require.NotEmpty(tickErrors) require.Contains(tickErrors[0].Error(), "list epochs error") @@ -405,7 +432,7 @@ func (s *AdvancerSuite) TestProcess() { require.Len(env.repo.StoredResults, 1) // Verify that the node shutdown was triggered (context cancelled) - require.Error(env.service.Context.Err(), "shared context should be cancelled") + require.Error(env.ctx.Err(), "shared context should be cancelled") }) }) } @@ -503,7 +530,7 @@ func (s *AdvancerSuite) TestErrorRecovery() { err := env.service.processInputs(context.Background(), env.app.Application, inputs) require.Error(err) require.Contains(err.Error(), "temporary failure") - require.Error(env.service.Context.Err(), "shared context should be cancelled") + require.Error(env.ctx.Err(), "shared context should be cancelled") }) } @@ -1003,8 +1030,8 @@ func (s *AdvancerSuite) TestRemoveSnapshot() { tmpDir := s.T().TempDir() advancer := &Service{snapshotsDir: tmpDir} - serviceArgs := &service.CreateInfo{Name: "advancer", Impl: advancer} - require.Nil(service.Create(context.Background(), serviceArgs, &advancer.Service)) + serviceArgs := &service.TickServiceConfigs{ServiceConfigs: service.ServiceConfigs{Name: "advancer"}} + require.Nil(service.InitTickServiceTemplate(serviceArgs, &advancer.TickServiceTemplate, advancer, advancer)) // Create a snapshot directory snapshotPath := filepath.Join(tmpDir, "myapp_epoch0_input0") @@ -1022,8 +1049,8 @@ func (s *AdvancerSuite) TestRemoveSnapshot() { tmpDir := s.T().TempDir() advancer := &Service{snapshotsDir: tmpDir} - serviceArgs := &service.CreateInfo{Name: "advancer", Impl: advancer} - require.Nil(service.Create(context.Background(), serviceArgs, &advancer.Service)) + serviceArgs := &service.TickServiceConfigs{ServiceConfigs: service.ServiceConfigs{Name: "advancer"}} + require.Nil(service.InitTickServiceTemplate(serviceArgs, &advancer.TickServiceTemplate, advancer, advancer)) snapshotPath := filepath.Join(tmpDir, "myapp_epoch0_input0") err := advancer.removeSnapshot(snapshotPath, "myapp") @@ -1035,8 +1062,8 @@ func (s *AdvancerSuite) TestRemoveSnapshot() { tmpDir := s.T().TempDir() advancer := &Service{snapshotsDir: tmpDir} - serviceArgs := &service.CreateInfo{Name: "advancer", Impl: advancer} - require.Nil(service.Create(context.Background(), serviceArgs, &advancer.Service)) + serviceArgs := &service.TickServiceConfigs{ServiceConfigs: service.ServiceConfigs{Name: "advancer"}} + require.Nil(service.InitTickServiceTemplate(serviceArgs, &advancer.TickServiceTemplate, advancer, advancer)) // Try to traverse outside snapshotsDir maliciousPath := filepath.Join(tmpDir, "..", "outside", "myapp_evil") @@ -1050,8 +1077,8 @@ func (s *AdvancerSuite) TestRemoveSnapshot() { tmpDir := s.T().TempDir() advancer := &Service{snapshotsDir: tmpDir} - serviceArgs := &service.CreateInfo{Name: "advancer", Impl: advancer} - require.Nil(service.Create(context.Background(), serviceArgs, &advancer.Service)) + serviceArgs := &service.TickServiceConfigs{ServiceConfigs: service.ServiceConfigs{Name: "advancer"}} + require.Nil(service.InitTickServiceTemplate(serviceArgs, &advancer.TickServiceTemplate, advancer, advancer)) snapshotPath := filepath.Join(tmpDir, "otherapp_epoch0_input0") err := advancer.removeSnapshot(snapshotPath, "myapp") @@ -1547,10 +1574,10 @@ func (s *AdvancerSuite) TestSelfWakeOnSuccess() { require.NoError(err) // Call Tick() which internally calls Step() and signals reschedule. - svc.Tick() + reschedule, _ := svc.Tick(context.Background()) // The reschedule channel should have a pending signal. - require.True(svc.DrainReschedule(), + require.True(reschedule, "reschedule channel should have a pending signal after Tick with work") } @@ -1571,9 +1598,9 @@ func (s *AdvancerSuite) TestNoSelfWakeWhenIdle() { svc, err := newMockAdvancerService(mm, repo) require.NoError(err) - svc.Tick() + reschedule, _ := svc.Tick(context.Background()) - require.False(svc.DrainReschedule(), + require.False(reschedule, "reschedule channel should be empty when no work exists") } @@ -1590,10 +1617,10 @@ func (s *AdvancerSuite) TestNoSelfWakeOnError() { svc, err := newMockAdvancerService(mm, repo) require.NoError(err) - errs := svc.Tick() + reschedule, errs := svc.Tick(context.Background()) require.NotEmpty(errs) - require.False(svc.DrainReschedule(), + require.False(reschedule, "reschedule should NOT be signaled on error") } @@ -1631,12 +1658,12 @@ func (s *AdvancerSuite) TestPartialSuccessStillReschedules() { // Call Tick — app1 fails, app2 succeeds with more work remaining (batch limit hit). // Tick should surface the error AND signal reschedule for app2's pending work. - errs := svc.Tick() + reschedule, errs := svc.Tick(context.Background()) require.NotEmpty(errs, "Tick should surface app1's error") // Reschedule SHOULD fire: app2 had work, and one failing app must not // delay healthy apps by suppressing the reschedule signal. - require.True(svc.DrainReschedule(), + require.True(reschedule, "reschedule should be signaled when hadWork is true, even with errors") } diff --git a/internal/advancer/service.go b/internal/advancer/service.go index 3c6d4d7c2..1154f7073 100644 --- a/internal/advancer/service.go +++ b/internal/advancer/service.go @@ -24,7 +24,7 @@ const httpShutdownTimeout = 10 * time.Second // Service is the main advancer service that processes inputs through Cartesi machines type Service struct { - service.Service + service.TickServiceTemplate inputBatchSize uint64 snapshotsDir string repository AdvancerRepository @@ -39,23 +39,21 @@ type Service struct { // CreateInfo contains the configuration for creating an advancer service type CreateInfo struct { - service.CreateInfo + service.TickServiceConfigs Config config.AdvancerConfig Repository repository.Repository } // Create initializes a new advancer service -func Create(ctx context.Context, c *CreateInfo) (*Service, error) { +func Create(ctx context.Context, c *CreateInfo) (service.IService, error) { var err error if err = ctx.Err(); err != nil { return nil, err // This returns context.Canceled or context.DeadlineExceeded. } s := &Service{} - c.Impl = s - c.EnableReschedule = true - err = service.Create(ctx, &c.CreateInfo, &s.Service) + err = service.InitTickServiceTemplate(&c.TickServiceConfigs, &s.TickServiceTemplate, s, s) if err != nil { return nil, err } @@ -102,48 +100,32 @@ func Create(ctx context.Context, c *CreateInfo) (*Service, error) { s.snapshotsDir = c.Config.SnapshotsDir + s.LogConfig(c.Config) + return s, nil } // Service interface implementation -func (s *Service) Alive() bool { return true } -func (s *Service) Ready() bool { return true } -func (s *Service) Reload() []error { return nil } -func (s *Service) Tick() []error { - hadWork, err := s.Step(s.Context) - +func (s *Service) Tick(ctx context.Context) (bool, []error) { // Signal reschedule whenever work was done, even if some apps errored. // Failed apps are marked Failed and removed by the machine manager, // so they won't cause amplified retries on the next tick. // Without this, one failing app delays all healthy apps by a full poll interval. - if hadWork { - s.SignalReschedule() - } + hadWork, err := s.Step(ctx) if err == nil { - return nil + return hadWork, nil } // During shutdown, the machine manager is closed and GetMachine() may // return ErrNoApp. Suppress this to avoid spurious ERR log entries. - if errors.Is(err, ErrNoApp) && s.IsStopping() { + if errors.Is(err, ErrNoApp) && ctx.Err() != nil { s.Logger.Warn("Tick interrupted by shutdown", "error", err) - return nil + return hadWork, nil } - return []error{err} + return hadWork, []error{err} } -func (s *Service) Stop(b bool) []error { - // CAS achieves once-semantics: the second caller returns immediately - // (fire-and-forget) rather than blocking like sync.Once. This is safe - // because the orchestrator calls Cancel() after Stop() and waits for - // the Serve goroutine to exit. - if !s.cleanedUp.CompareAndSwap(false, true) { - return nil // already stopped - } - // This method shadows service.Service.Stop(), so set the stopping flag - // explicitly. Without this, a concurrent Tick that observes closed - // resources would not see IsStopping() == true. - s.SetStopping() +func (s *Service) OnStop(b bool) []error { var errs []error if s.inspector != nil { s.Logger.Info("Shutting down inspect HTTP server") @@ -161,17 +143,14 @@ func (s *Service) Stop(b bool) []error { } return errs } -func (s *Service) Serve() error { +func (s *Service) OnServe(ctx context.Context) error { if s.inspector != nil { go func() { if err := s.inspector.Serve(); err != nil && !errors.Is(err, http.ErrServerClosed) { s.Logger.Error("Inspect HTTP server failed — shutting down", "error", err) - s.Cancel() + s.Stop(true) } }() } - return s.Service.Serve() -} -func (s *Service) String() string { - return s.Name + return s.TickServiceTemplate.OnServe(ctx) } diff --git a/internal/claimer/claimer.go b/internal/claimer/claimer.go index 57eaecec7..b2b8e5a08 100644 --- a/internal/claimer/claimer.go +++ b/internal/claimer/claimer.go @@ -110,6 +110,7 @@ func hashToHex(h *common.Hash) string { // database. The epoch is now "submitted" and no longer "computed". // Returns the number of confirmed transitions and any error. func (s *Service) checkClaimsInFlight( + ctx context.Context, computedEpochs map[int64]*model.Epoch, apps map[int64]*model.Application, endBlock *big.Int, @@ -117,7 +118,7 @@ func (s *Service) checkClaimsInFlight( confirmed := 0 // check claims in flight. NOTE: map mutation + iteration is safe in Go for key, txHash := range s.claimsInFlight { - ready, receipt, err := s.blockchain.pollTransaction(s.Context, txHash, endBlock) + ready, receipt, err := s.blockchain.pollTransaction(ctx, txHash, endBlock) if err != nil { s.Logger.Warn("Claim submission failed, retrying.", "txHash", txHash, @@ -139,7 +140,7 @@ func (s *Service) checkClaimsInFlight( } if computedEpoch, ok := computedEpochs[key]; ok { err = s.repository.UpdateEpochWithSubmittedClaim( - s.Context, + ctx, computedEpoch.ApplicationID, computedEpoch.Index, receipt.TxHash, @@ -168,7 +169,7 @@ func (s *Service) checkClaimsInFlight( // Parse the receipt to transition directly to accepted, saving a // full tick round-trip. Quorum waits for a separate acceptance scan. if app != nil && app.ConsensusType == model.Consensus_Authority { - if accepted := s.tryAcceptFromReceipt(receipt, app, computedEpoch); accepted { + if accepted := s.tryAcceptFromReceipt(ctx, receipt, app, computedEpoch); accepted { confirmed++ } } @@ -194,6 +195,7 @@ func (s *Service) checkClaimsInFlight( // Errors are logged but not propagated — the normal acceptance scan on the // next tick will handle the transition if this fast path fails. func (s *Service) tryAcceptFromReceipt( + ctx context.Context, receipt *types.Receipt, app *model.Application, epoch *model.Epoch, @@ -213,7 +215,7 @@ func (s *Service) tryAcceptFromReceipt( continue } err = s.repository.UpdateEpochWithAcceptedClaim( - s.Context, epoch.ApplicationID, epoch.Index) + ctx, epoch.ApplicationID, epoch.Index) if err != nil { s.Logger.Warn("Authority fast-accept: DB update failed, "+ "will retry via normal acceptance scan", @@ -252,7 +254,7 @@ func (s *Service) findClaimSubmittedEventAndSucc( err := checkEpochSequenceConstraint(prevEpoch, currEpoch) if err != nil { err = s.setApplicationInoperable( - s.Context, + ctx, app, "%v. epoch: %v (%v).", err, @@ -270,7 +272,7 @@ func (s *Service) findClaimSubmittedEventAndSucc( if prevClaimSubmissionEvent == nil { err = s.setApplicationInoperable( - s.Context, + ctx, app, "application has an invalid epoch: %v (%v). No claim submission event to match.", prevEpoch.Index, @@ -281,7 +283,7 @@ func (s *Service) findClaimSubmittedEventAndSucc( if !claimSubmittedEventMatches(app, prevEpoch, prevClaimSubmissionEvent) { err = s.setApplicationInoperable( - s.Context, + ctx, app, "application has an invalid epoch: %v (%v), missing claim submitted event (%v).", prevEpoch.Index, @@ -296,12 +298,13 @@ func (s *Service) findClaimSubmittedEventAndSucc( // transition epoch claims from computed to submitted. // Returns the number of successful transitions and any errors. func (s *Service) submitClaimsAndUpdateDatabase( + ctx context.Context, acceptedOrSubmittedEpochs map[int64]*model.Epoch, computedEpochs map[int64]*model.Epoch, apps map[int64]*model.Application, defaultBlockNumber *big.Int, ) (int, []error) { - confirmed, err := s.checkClaimsInFlight(computedEpochs, apps, defaultBlockNumber) + confirmed, err := s.checkClaimsInFlight(ctx, computedEpochs, apps, defaultBlockNumber) if err != nil { return confirmed, []error{err} } @@ -321,18 +324,18 @@ func (s *Service) submitClaimsAndUpdateDatabase( prevEpoch, prevEpochExists := acceptedOrSubmittedEpochs[key] // check address for changes - if err := s.checkConsensusForAddressChange(app); err != nil { + if err := s.checkConsensusForAddressChange(ctx, app); err != nil { delete(computedEpochs, key) errs = append(errs, err) continue } if prevEpochExists { ic, _, currEvent, err = s.findClaimSubmittedEventAndSucc( - s.Context, app, prevEpoch, currEpoch, prevEpoch.LastBlock+1, defaultBlockNumber.Uint64(), + ctx, app, prevEpoch, currEpoch, prevEpoch.LastBlock+1, defaultBlockNumber.Uint64(), ) } else { ic, currEvent, _, err = s.blockchain.findClaimSubmittedEventAndSucc( - s.Context, app, currEpoch, currEpoch.LastBlock+1, defaultBlockNumber.Uint64(), + ctx, app, currEpoch, currEpoch.LastBlock+1, defaultBlockNumber.Uint64(), ) } if err != nil { @@ -349,7 +352,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( ) if !claimSubmittedEventMatches(app, currEpoch, currEvent) { err = s.setApplicationInoperable( - s.Context, + ctx, app, "computed claim does not match event. computed_claim=%v, current_event=%v", currEpoch, currEvent, @@ -365,7 +368,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( ) txHash := currEvent.Raw.TxHash err = s.repository.UpdateEpochWithSubmittedClaim( - s.Context, + ctx, currEpoch.ApplicationID, currEpoch.Index, txHash, @@ -439,7 +442,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( // to be deterministic, so recomputing // will produce the same divergent hash. err = s.setApplicationInoperable( - s.Context, + ctx, app, "NotFirstClaim from Quorum consensus: "+ "computed claim hash %s differs from "+ @@ -535,6 +538,7 @@ func (s *Service) findClaimAcceptedEventAndSucc( // transition claims from submitted to accepted. // Returns the number of successful transitions and any errors. func (s *Service) acceptClaimsAndUpdateDatabase( + ctx context.Context, acceptedEpochs map[int64]*model.Epoch, submittedEpochs map[int64]*model.Epoch, apps map[int64]*model.Application, @@ -551,7 +555,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( app := apps[key] prevEpoch, prevEpochExists := acceptedEpochs[key] // check address for changes - if err := s.checkConsensusForAddressChange(app); err != nil { + if err := s.checkConsensusForAddressChange(ctx, app); err != nil { delete(submittedEpochs, key) errs = append(errs, err) continue @@ -559,11 +563,11 @@ func (s *Service) acceptClaimsAndUpdateDatabase( if prevEpochExists { _, _, currEvent, err = s.findClaimAcceptedEventAndSucc( - s.Context, app, prevEpoch, currEpoch, prevEpoch.LastBlock+1, defaultBlockNumber.Uint64(), + ctx, app, prevEpoch, currEpoch, prevEpoch.LastBlock+1, defaultBlockNumber.Uint64(), ) } else { _, currEvent, _, err = s.blockchain.findClaimAcceptedEventAndSucc( - s.Context, app, currEpoch, currEpoch.LastBlock+1, defaultBlockNumber.Uint64(), + ctx, app, currEpoch, currEpoch.LastBlock+1, defaultBlockNumber.Uint64(), ) } if err != nil { @@ -585,7 +589,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( "err", ErrEventMismatch, ) err := s.setApplicationInoperable( - s.Context, + ctx, app, "event mismatch for epoch %v, event tx_hash: %v", currEpoch.Index, @@ -601,7 +605,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( "last_block", currEpoch.LastBlock, ) txHash := currEvent.Raw.TxHash - err = s.repository.UpdateEpochWithAcceptedClaim(s.Context, currEpoch.ApplicationID, currEpoch.Index) + err = s.repository.UpdateEpochWithAcceptedClaim(ctx, currEpoch.ApplicationID, currEpoch.Index) if err != nil { delete(submittedEpochs, key) errs = append(errs, err) @@ -630,15 +634,16 @@ func (s *Service) setApplicationInoperable( } func (s *Service) checkConsensusForAddressChange( + ctx context.Context, app *model.Application, ) error { - newConsensusAddress, err := s.blockchain.getConsensusAddress(s.Context, app) + newConsensusAddress, err := s.blockchain.getConsensusAddress(ctx, app) if err != nil { return fmt.Errorf("getting consensus address for app %v: %w", app.IApplicationAddress, err) } if app.IConsensusAddress != newConsensusAddress { err = s.setApplicationInoperable( - s.Context, + ctx, app, "consensus change detected. application: %v.", app.IApplicationAddress, diff --git a/internal/claimer/claimer_test.go b/internal/claimer/claimer_test.go index aece7f9c1..23cbc98f7 100644 --- a/internal/claimer/claimer_test.go +++ b/internal/claimer/claimer_test.go @@ -188,8 +188,10 @@ func newServiceMock() (*Service, *claimerRepositoryMock, *claimerBlockchainMock) blockchain := &claimerBlockchainMock{} claimer := &Service{ - Service: service.Service{ - Logger: slog.New(handler), + TickServiceTemplate: service.TickServiceTemplate{ + ServiceTemplate: service.ServiceTemplate{ + Logger: slog.New(handler), + }, }, submissionEnabled: true, claimsInFlight: map[int64]common.Hash{}, @@ -331,7 +333,7 @@ func TestDoNothing(t *testing.T) { prevEpochs := makeEpochMap() currEpochs := makeEpochMap() - transitions, errs := m.submitClaimsAndUpdateDatabase(prevEpochs, currEpochs, makeApplicationMap(), big.NewInt(0)) + transitions, errs := m.submitClaimsAndUpdateDatabase(context.Background(), prevEpochs, currEpochs, makeApplicationMap(), big.NewInt(0)) assert.Equal(t, 0, len(errs)) assert.Equal(t, 0, transitions, "no transitions when no epochs to process") } @@ -354,7 +356,7 @@ func TestSubmitFirstClaim(t *testing.T) { b.On("submitClaimToBlockchain", mock.Anything, app, currEpoch). Return(common.HexToHash("0x10"), nil).Once() - transitions, errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + transitions, errs := m.submitClaimsAndUpdateDatabase(context.Background(), makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 0, len(errs)) assert.Equal(t, 1, len(m.claimsInFlight)) assert.Equal(t, 1, transitions, "submitting a claim counts as a transition") @@ -379,7 +381,7 @@ func TestSubmitClaimWithAntecessor(t *testing.T) { b.On("submitClaimToBlockchain", mock.Anything, app, currEpoch). Return(common.HexToHash("0x10"), nil).Once() - transitions, errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + transitions, errs := m.submitClaimsAndUpdateDatabase(context.Background(), makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 0, len(errs)) assert.Equal(t, 1, len(m.claimsInFlight)) assert.Equal(t, 1, transitions, "submitting a claim counts as a transition") @@ -402,7 +404,7 @@ func TestSkipSubmitFirstClaim(t *testing.T) { b.On("findClaimSubmittedEventAndSucc", mock.Anything, app, currEpoch, currEpoch.LastBlock+1, endBlock.Uint64()). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil).Once() - transitions, errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + transitions, errs := m.submitClaimsAndUpdateDatabase(context.Background(), makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 0, len(errs)) assert.Equal(t, 0, len(m.claimsInFlight)) assert.Equal(t, 0, transitions, "no transition when submission is disabled") @@ -426,7 +428,7 @@ func TestSkipSubmitClaimWithAntecessor(t *testing.T) { b.On("findClaimSubmittedEventAndSucc", mock.Anything, app, prevEpoch, prevEpoch.LastBlock+1, endBlock.Uint64()). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil).Once() - _, errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + _, errs := m.submitClaimsAndUpdateDatabase(context.Background(), makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, len(errs), 0) assert.Equal(t, len(m.claimsInFlight), 0) } @@ -460,7 +462,7 @@ func TestInFlightCompleted(t *testing.T) { r.On("UpdateEpochWithAcceptedClaim", mock.Anything, app.ID, currEpoch.Index). Return(nil).Once() - transitions, errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + transitions, errs := m.submitClaimsAndUpdateDatabase(context.Background(), makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 0, len(errs)) assert.Equal(t, 0, len(m.claimsInFlight)) // Authority fast path: submitted (1) + accepted (1) = 2 transitions. @@ -497,7 +499,7 @@ func TestInFlightReverted(t *testing.T) { b.On("submitClaimToBlockchain", mock.Anything, app, currEpoch). Return(common.HexToHash("0x10"), nil).Once() - _, errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + _, errs := m.submitClaimsAndUpdateDatabase(context.Background(), makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, len(errs), 0) assert.Equal(t, len(m.claimsInFlight), 1) } @@ -520,7 +522,7 @@ func TestUpdateFirstClaim(t *testing.T) { r.On("UpdateEpochWithSubmittedClaim", mock.Anything, app.ID, currEpoch.Index, currEvent.Raw.TxHash). Return(nil).Once() - transitions, errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + transitions, errs := m.submitClaimsAndUpdateDatabase(context.Background(), makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 0, len(errs)) assert.Equal(t, 0, len(m.claimsInFlight)) assert.Equal(t, 1, transitions, "finding on-chain event counts as a transition") @@ -545,7 +547,7 @@ func TestUpdateClaimWithAntecessor(t *testing.T) { r.On("UpdateEpochWithSubmittedClaim", mock.Anything, app.ID, currEpoch.Index, currEvent.Raw.TxHash). Return(nil).Once() - _, errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + _, errs := m.submitClaimsAndUpdateDatabase(context.Background(), makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, len(errs), 0) assert.Equal(t, len(m.claimsInFlight), 0) } @@ -566,7 +568,7 @@ func TestAcceptFirstClaim(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(app.IConsensusAddress, nil).Once() - _, errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + _, errs := m.acceptClaimsAndUpdateDatabase(context.Background(), makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, len(errs), 0) } @@ -589,7 +591,7 @@ func TestAcceptClaimWithAntecessor(t *testing.T) { r.On("UpdateEpochWithAcceptedClaim", mock.Anything, app.ID, currEpoch.Index). Return(nil).Once() - transitions, errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + transitions, errs := m.acceptClaimsAndUpdateDatabase(context.Background(), makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 0, len(errs)) assert.Equal(t, 1, transitions, "accepting a claim counts as a transition") } @@ -613,7 +615,7 @@ func TestClaimInFlightMissingFromCurrClaims(t *testing.T) { b.On("pollTransaction", mock.Anything, reqHash, endBlock). Return(true, receipt, nil).Once() - _, errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(), makeEpochMap(), makeApplicationMap(app), endBlock) + _, errs := m.submitClaimsAndUpdateDatabase(context.Background(), makeEpochMap(), makeEpochMap(), makeApplicationMap(app), endBlock) assert.Equal(t, len(errs), 0) } @@ -645,7 +647,7 @@ func TestSubmitFailedClaim(t *testing.T) { b.On("submitClaimToBlockchain", mock.Anything, app, currEpoch). Return(common.HexToHash("0x10"), nil).Once() - _, errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + _, errs := m.submitClaimsAndUpdateDatabase(context.Background(), makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 0, len(errs)) } @@ -673,7 +675,7 @@ func TestNotFirstClaimHandledGracefully(t *testing.T) { Return(common.Hash{}, notFirstClaimError()).Once() _, errs := m.submitClaimsAndUpdateDatabase( - makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + context.Background(), makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 0, len(errs)) assert.Equal(t, 0, len(m.claimsInFlight)) } @@ -704,7 +706,7 @@ func TestNotFirstClaimQuorumSetsInoperable(t *testing.T) { Return(nil).Once() _, errs := m.submitClaimsAndUpdateDatabase( - makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + context.Background(), makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 1, len(errs)) assert.Equal(t, 0, len(m.claimsInFlight)) } @@ -737,7 +739,7 @@ func TestSubmitClaimWithAntecessorMismatch(t *testing.T) { r.On("UpdateApplicationState", mock.Anything, int64(0), model.ApplicationState_Inoperable, mock.Anything). Return(nil).Once() - _, errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + _, errs := m.submitClaimsAndUpdateDatabase(context.Background(), makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 1, len(errs)) } @@ -761,7 +763,7 @@ func TestSubmitClaimWithEventMismatch(t *testing.T) { r.On("UpdateApplicationState", mock.Anything, int64(0), model.ApplicationState_Inoperable, mock.Anything). Return(nil).Once() - _, errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + _, errs := m.submitClaimsAndUpdateDatabase(context.Background(), makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 1, len(errs)) } @@ -780,7 +782,7 @@ func TestSubmitClaimWithAntecessorOutOfOrder(t *testing.T) { r.On("UpdateApplicationState", mock.Anything, int64(0), model.ApplicationState_Inoperable, mock.Anything). Return(nil).Once() - _, errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), big.NewInt(0)) + _, errs := m.submitClaimsAndUpdateDatabase(context.Background(), makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), big.NewInt(0)) assert.Equal(t, 1, len(errs)) } @@ -803,7 +805,7 @@ func TestErrSubmittedMissingEvent(t *testing.T) { r.On("UpdateApplicationState", mock.Anything, int64(0), model.ApplicationState_Inoperable, mock.Anything). Return(nil).Once() - _, errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + _, errs := m.submitClaimsAndUpdateDatabase(context.Background(), makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 1, len(errs)) } @@ -824,7 +826,7 @@ func TestConsensusAddressChangedOnSubmittedClaims(t *testing.T) { r.On("UpdateApplicationState", mock.Anything, int64(0), model.ApplicationState_Inoperable, mock.Anything). Return(nil).Once() - _, errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + _, errs := m.submitClaimsAndUpdateDatabase(context.Background(), makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, len(errs), 1) } @@ -848,7 +850,7 @@ func TestFindClaimAcceptedEventAndSuccFailure0(t *testing.T) { b.On("findClaimAcceptedEventAndSucc", mock.Anything, app, currEpoch, currEpoch.LastBlock+1, endBlock.Uint64()). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, expectedErr).Once() - _, errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + _, errs := m.acceptClaimsAndUpdateDatabase(context.Background(), makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 1, len(errs)) } @@ -871,7 +873,7 @@ func TestFindClaimAcceptedEventAndSuccFailure1(t *testing.T) { b.On("findClaimAcceptedEventAndSucc", mock.Anything, app, prevEpoch, prevEpoch.LastBlock+1, endBlock.Uint64()). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, expectedErr).Once() - _, errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + _, errs := m.acceptClaimsAndUpdateDatabase(context.Background(), makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 1, len(errs)) } @@ -900,7 +902,7 @@ func TestAcceptClaimWithAntecessorMismatch(t *testing.T) { r.On("UpdateApplicationState", mock.Anything, mock.Anything, model.ApplicationState_Inoperable, mock.Anything). Return(nil).Once() - _, errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + _, errs := m.acceptClaimsAndUpdateDatabase(context.Background(), makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 1, len(errs)) } @@ -925,7 +927,7 @@ func TestAcceptClaimWithEventMismatch(t *testing.T) { r.On("UpdateApplicationState", mock.Anything, mock.Anything, model.ApplicationState_Inoperable, mock.Anything). Return(nil).Once() - _, errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + _, errs := m.acceptClaimsAndUpdateDatabase(context.Background(), makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 1, len(errs)) } @@ -945,7 +947,7 @@ func TestAcceptClaimWithAntecessorOutOfOrder(t *testing.T) { Return(nil). Once() - _, errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(wrongEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), big.NewInt(0)) + _, errs := m.acceptClaimsAndUpdateDatabase(context.Background(), makeEpochMap(wrongEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), big.NewInt(0)) assert.Equal(t, 1, len(errs)) } @@ -968,7 +970,7 @@ func TestErrAcceptedMissingEvent(t *testing.T) { r.On("UpdateApplicationState", mock.Anything, mock.Anything, model.ApplicationState_Inoperable, mock.Anything). Return(nil).Once() - _, errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + _, errs := m.acceptClaimsAndUpdateDatabase(context.Background(), makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 1, len(errs)) } @@ -993,7 +995,7 @@ func TestUpdateEpochWithAcceptedClaimFailed(t *testing.T) { r.On("UpdateEpochWithAcceptedClaim", mock.Anything, app.ID, currEpoch.Index). Return(expectedErr).Once() - _, errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + _, errs := m.acceptClaimsAndUpdateDatabase(context.Background(), makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 1, len(errs)) } @@ -1015,6 +1017,6 @@ func TestConsensusAddressChangedOnAcceptedClaims(t *testing.T) { Return(nil). Once() - _, errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) + _, errs := m.acceptClaimsAndUpdateDatabase(context.Background(), makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, len(errs), 1) } diff --git a/internal/claimer/service.go b/internal/claimer/service.go index 41039cca1..950501e13 100644 --- a/internal/claimer/service.go +++ b/internal/claimer/service.go @@ -21,7 +21,7 @@ import ( ) type CreateInfo struct { - service.CreateInfo + service.TickServiceConfigs Config config.ClaimerConfig @@ -30,7 +30,7 @@ type CreateInfo struct { } type Service struct { - service.Service + service.TickServiceTemplate repository iclaimerRepository blockchain iclaimerBlockchain @@ -51,7 +51,7 @@ type PersistentConfig struct { ChainID uint64 } -func Create(ctx context.Context, c *CreateInfo) (*Service, error) { +func Create(ctx context.Context, c *CreateInfo) (service.IService, error) { var err error if c == nil { @@ -68,10 +68,8 @@ func Create(ctx context.Context, c *CreateInfo) (*Service, error) { } s := &Service{} - c.Impl = s - c.EnableReschedule = true - err = service.Create(ctx, &c.CreateInfo, &s.Service) + err = service.InitTickServiceTemplate(&c.TickServiceConfigs, &s.TickServiceTemplate, s, s) if err != nil { return nil, fmt.Errorf("creating base service: %w", err) } @@ -112,43 +110,28 @@ func Create(ctx context.Context, c *CreateInfo) (*Service, error) { defaultBlock: c.Config.BlockchainDefaultBlock, } - return s, nil -} - -func (s *Service) Alive() bool { - return true -} - -func (s *Service) Ready() bool { - return true -} - -func (s *Service) Reload() []error { - return nil -} + s.LogConfig(c.Config) -func (s *Service) Stop(bool) []error { - s.SetStopping() - return nil + return s, nil } // NOTE: tick is not re-entrant! -func (s *Service) Tick() []error { +func (s *Service) Tick(ctx context.Context) (bool, []error) { errs := []error{} // gather epochs pairs with open claims, either: // - computed but not yet submitted - acceptedOrSubmittedEpochs, computedEpochs, computedApps, errSubmitted := s.repository.SelectSubmittedClaimPairsPerApp(s.Context) + acceptedOrSubmittedEpochs, computedEpochs, computedApps, errSubmitted := s.repository.SelectSubmittedClaimPairsPerApp(ctx) if errSubmitted != nil { errs = append(errs, errSubmitted) - return errs + return false, errs } // - submitted but not yet accepted. - acceptedEpochs, submittedEpochs, submittedApps, errAccepted := s.repository.SelectAcceptedClaimPairsPerApp(s.Context) + acceptedEpochs, submittedEpochs, submittedApps, errAccepted := s.repository.SelectAcceptedClaimPairsPerApp(ctx) if errAccepted != nil { errs = append(errs, errAccepted) - return errs + return false, errs } s.Logger.Debug("Processing claims for epochs", @@ -158,18 +141,18 @@ func (s *Service) Tick() []error { // return early if there is nothing to do if len(computedEpochs) == 0 && len(submittedEpochs) == 0 { - return nil + return false, nil } // we have claims to check. Get the latest/safe/finalized, etc. block - defaultBlockNumber, err := s.blockchain.getDefaultBlockNumber(s.Context) + defaultBlockNumber, err := s.blockchain.getDefaultBlockNumber(ctx) if err != nil { errs = append(errs, err) - return errs + return false, errs } - submitted, submitErrs := s.submitClaimsAndUpdateDatabase(acceptedOrSubmittedEpochs, computedEpochs, computedApps, defaultBlockNumber) - accepted, acceptErrs := s.acceptClaimsAndUpdateDatabase(acceptedEpochs, submittedEpochs, submittedApps, defaultBlockNumber) + submitted, submitErrs := s.submitClaimsAndUpdateDatabase(ctx, acceptedOrSubmittedEpochs, computedEpochs, computedApps, defaultBlockNumber) + accepted, acceptErrs := s.acceptClaimsAndUpdateDatabase(ctx, acceptedEpochs, submittedEpochs, submittedApps, defaultBlockNumber) errs = append(errs, submitErrs...) errs = append(errs, acceptErrs...) @@ -178,10 +161,7 @@ func (s *Service) Tick() []error { // Confirming a submission enables the acceptance scan on the next tick. // Erring apps are retried on the next tick regardless; suppressing // reschedule would delay healthy apps by a full poll interval. - if submitted > 0 || accepted > 0 { - s.SignalReschedule() - } - return errs + return submitted > 0 || accepted > 0, errs } func setupPersistentConfig( diff --git a/internal/evmreader/evmreader_test.go b/internal/evmreader/evmreader_test.go index 8413878b0..ac93a61ac 100644 --- a/internal/evmreader/evmreader_test.go +++ b/internal/evmreader/evmreader_test.go @@ -73,8 +73,8 @@ func (s *EvmReaderSuite) SetupTest() { logLevel, err := config.GetLogLevel() s.Require().NoError(err) - serviceArgs := &service.CreateInfo{Name: "evm-reader", Impl: s.evmReader, LogLevel: logLevel} - err = service.Create(context.Background(), serviceArgs, &s.evmReader.Service) + serviceArgs := &service.ServiceConfigs{Name: "evm-reader", LogLevel: logLevel} + err = service.InitServiceTemplate(serviceArgs, &s.evmReader.ServiceTemplate, s.evmReader) s.Require().NoError(err) } diff --git a/internal/evmreader/output_test.go b/internal/evmreader/output_test.go index 43fd597cc..978e72d1a 100644 --- a/internal/evmreader/output_test.go +++ b/internal/evmreader/output_test.go @@ -4,7 +4,6 @@ package evmreader import ( - "context" "errors" "math/big" "time" @@ -475,8 +474,8 @@ func (s *EvmReaderSuite) setupOutputMismatchTest() { logLevel, err := config.GetLogLevel() s.Require().NoError(err) - serviceArgs := &service.CreateInfo{Name: "evm-reader", Impl: s.evmReader, LogLevel: logLevel} - err = service.Create(context.Background(), serviceArgs, &s.evmReader.Service) + serviceArgs := &service.ServiceConfigs{Name: "evm-reader", LogLevel: logLevel} + err = service.InitServiceTemplate(serviceArgs, &s.evmReader.ServiceTemplate, s.evmReader) s.Require().NoError(err) apps := copyApplications(applications) diff --git a/internal/evmreader/sealedepochs_test.go b/internal/evmreader/sealedepochs_test.go index 8c94e9a7b..28bb6ee90 100644 --- a/internal/evmreader/sealedepochs_test.go +++ b/internal/evmreader/sealedepochs_test.go @@ -56,8 +56,8 @@ func (s *SealedEpochsSuite) SetupTest() { logLevel, err := config.GetLogLevel() s.Require().NoError(err) - serviceArgs := &service.CreateInfo{Name: "evm-reader", Impl: s.evmReader, LogLevel: logLevel} - err = service.Create(context.Background(), serviceArgs, &s.evmReader.Service) + serviceArgs := &service.ServiceConfigs{Name: "evm-reader", LogLevel: logLevel} + err = service.InitServiceTemplate(serviceArgs, &s.evmReader.ServiceTemplate, s.evmReader) s.Require().NoError(err) } diff --git a/internal/evmreader/service.go b/internal/evmreader/service.go index 5c00ff7e9..b8a292e8d 100644 --- a/internal/evmreader/service.go +++ b/internal/evmreader/service.go @@ -20,7 +20,7 @@ import ( ) type CreateInfo struct { - service.CreateInfo + service.ServiceConfigs Config config.EvmreaderConfig @@ -31,7 +31,7 @@ type CreateInfo struct { } type Service struct { - service.Service + service.ServiceTemplate client EthClientInterface wsClient EthClientInterface @@ -56,16 +56,15 @@ type PersistentConfig struct { ChainID uint64 } -func Create(ctx context.Context, c *CreateInfo) (*Service, error) { +func Create(ctx context.Context, c *CreateInfo) (service.IService, error) { var err error if err = ctx.Err(); err != nil { return nil, err // This returns context.Canceled or context.DeadlineExceeded. } s := &Service{} - c.Impl = s - err = service.Create(ctx, &c.CreateInfo, &s.Service) + err = service.InitServiceTemplate(&c.ServiceConfigs, &s.ServiceTemplate, s) if err != nil { return nil, err } @@ -127,6 +126,8 @@ func Create(ctx context.Context, c *CreateInfo) (*Service, error) { }, } + s.LogConfig(c.Config) + return s, nil } @@ -138,43 +139,20 @@ func (s *Service) Ready() bool { return s.ready.Load() } -func (s *Service) Reload() []error { - return nil -} - -func (s *Service) Stop(bool) []error { - s.SetStopping() - return nil -} - -func (s *Service) Tick() []error { - return []error{} -} - -func (s *Service) Serve() error { +func (s *Service) OnServe(ctx context.Context) error { s.alive.Store(true) ready := make(chan struct{}, 1) - go func() { - defer s.alive.Store(false) - defer s.ready.Store(false) - err := s.Run(s.Context, ready) - if err != nil && s.Context.Err() == nil { - s.Logger.Error("Run exited with error", "error", err) - } - s.Cancel() - }() go func() { select { case <-ready: s.ready.Store(true) - case <-s.Context.Done(): + case <-ctx.Done(): } }() - return s.Service.Serve() -} -func (s *Service) String() string { - return s.Name + defer s.alive.Store(false) + defer s.ready.Store(false) + return s.Run(ctx, ready) } func (s *Service) setupPersistentConfig( diff --git a/internal/evmreader/service_config_test.go b/internal/evmreader/service_config_test.go index d8e251e87..997778f42 100644 --- a/internal/evmreader/service_config_test.go +++ b/internal/evmreader/service_config_test.go @@ -35,7 +35,7 @@ func TestCreateWithNilEthClient(t *testing.T) { require.NoError(t, err) _, err = Create(context.Background(), &CreateInfo{ - CreateInfo: service.CreateInfo{Name: "evm-reader", LogLevel: logLevel}, + ServiceConfigs: service.ServiceConfigs{Name: "evm-reader", LogLevel: logLevel}, }) require.ErrorContains(t, err, "EthClient on evmreader service Create is nil") } diff --git a/internal/jsonrpc/service.go b/internal/jsonrpc/service.go index f39ad2157..d4124f906 100644 --- a/internal/jsonrpc/service.go +++ b/internal/jsonrpc/service.go @@ -28,7 +28,7 @@ const jsonrpcShutdownTimeout = 5 * time.Second // Service implements the IService interface. type Service struct { - service.Service + service.ServiceTemplate repository repository.Repository server *http.Server admission *service.SemaphoreAdmission @@ -40,23 +40,22 @@ type Service struct { } type CreateInfo struct { - service.CreateInfo + service.ServiceConfigs Config config.JsonrpcConfig Repository repository.Repository } -func Create(ctx context.Context, c *CreateInfo) (*Service, error) { +func Create(ctx context.Context, c *CreateInfo) (service.IService, error) { var err error if err = ctx.Err(); err != nil { return nil, err // This returns context.Canceled or context.DeadlineExceeded. } s := &Service{} - c.Impl = s - err = service.Create(ctx, &c.CreateInfo, &s.Service) + err = service.InitServiceTemplate(&c.ServiceConfigs, &s.ServiceTemplate, s) if err != nil { return nil, err } @@ -97,28 +96,12 @@ func Create(ctx context.Context, c *CreateInfo) (*Service, error) { s.listen = net.Listen } - return s, nil -} - -func (s *Service) Alive() bool { - return true -} - -func (s *Service) Ready() bool { - return true -} - -func (s *Service) Reload() []error { - return nil -} + s.LogConfig(c.Config) -func (s *Service) Tick() []error { - // No periodic tasks. - return nil + return s, nil } -func (s *Service) Stop(_ bool) []error { - s.SetStopping() +func (s *Service) OnStop(_ bool) []error { var errs []error s.Logger.Info("Shutting down JSON-RPC HTTP server", "addr", s.server.Addr) ctx, cancel := context.WithTimeout(context.Background(), jsonrpcShutdownTimeout) @@ -129,11 +112,7 @@ func (s *Service) Stop(_ bool) []error { return errs } -func (s *Service) String() string { - return s.Name -} - -func (s *Service) Serve() error { +func (s *Service) OnServe(ctx context.Context) error { listener, err := s.listen("tcp", s.server.Addr) if err != nil { return err @@ -155,26 +134,18 @@ func (s *Service) Serve() error { serverDone <- err }() - serviceDone := make(chan error, 1) - go func() { - // Run the shared service loop concurrently because it blocks waiting - // for signals/context cancellation while the HTTP server blocks - // waiting for connections. - serviceDone <- s.Service.Serve() - }() - select { case err := <-serverDone: // The HTTP loop exited first. This is unexpected unless the listener // failed or the server was already closed, so cancel the framework // loop and wait for it to observe the cancellation before returning. - s.Cancel() - serviceErr := <-serviceDone + s.Stop(true) + <-ctx.Done() if err != nil { return err } - return serviceErr - case err := <-serviceDone: + return nil + case <-ctx.Done(): // The framework loop exited first because it handled a shutdown signal // or context cancellation and called Stop(), which should trigger // s.server.Shutdown(). Wait for the HTTP loop to finish so Serve() @@ -183,6 +154,6 @@ func (s *Service) Serve() error { if serverErr != nil { return serverErr } - return err + return nil } } diff --git a/internal/jsonrpc/util_test.go b/internal/jsonrpc/util_test.go index e68660282..9e55f88b3 100644 --- a/internal/jsonrpc/util_test.go +++ b/internal/jsonrpc/util_test.go @@ -101,7 +101,7 @@ func newTestServiceFull(t *testing.T, name string, maxInflight uint64, corsOrigi require.NoError(t, err) ci := CreateInfo{ - CreateInfo: service.CreateInfo{ + ServiceConfigs: service.ServiceConfigs{ Name: name, LogLevel: logLevel, LogColor: true, @@ -115,7 +115,7 @@ func newTestServiceFull(t *testing.T, name string, maxInflight uint64, corsOrigi s, err := Create(ctx, &ci) require.NoError(t, err, "on new test service") - return s + return s.(*Service) } func nameToNumber(in string) uint64 { diff --git a/internal/node/node.go b/internal/node/node.go index a351ae224..6121e3462 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -29,7 +29,7 @@ type serviceResult struct { } type CreateInfo struct { - service.CreateInfo + service.ServiceConfigs Config config.NodeConfig @@ -41,13 +41,13 @@ type CreateInfo struct { } type Service struct { - service.Service + service.ServiceTemplate Children []service.IService Repository repository.Repository } -func Create(ctx context.Context, c *CreateInfo) (*Service, error) { +func Create(ctx context.Context, c *CreateInfo) (service.IService, error) { var err error if err = ctx.Err(); err != nil { @@ -55,9 +55,8 @@ func Create(ctx context.Context, c *CreateInfo) (*Service, error) { } s := &Service{} - c.Impl = s - err = service.Create(ctx, &c.CreateInfo, &s.Service) + err = service.InitServiceTemplate(&c.ServiceConfigs, &s.ServiceTemplate, s) if err != nil { return nil, err } @@ -67,6 +66,9 @@ func Create(ctx context.Context, c *CreateInfo) (*Service, error) { s.Logger.Error(fmt.Sprint(err)) return nil, err } + + s.LogConfig(c.Config) + return s, nil } @@ -139,10 +141,7 @@ func (me *Service) Ready() bool { return allReady } -func (s *Service) Reload() []error { return nil } -func (s *Service) Tick() []error { return nil } -func (me *Service) Stop(force bool) []error { - me.SetStopping() +func (me *Service) OnStop(force bool) []error { errs := []error{} for _, s := range me.Children { errs = append(errs, s.Stop(force)...) @@ -150,21 +149,22 @@ func (me *Service) Stop(force bool) []error { return errs } -func (me *Service) Serve() error { +func (me *Service) OnServe(ctx context.Context) error { for _, s := range me.Children { go s.Serve() } - return me.Service.Serve() + <-ctx.Done() + return nil } // services creation func newEVMReader(ctx context.Context, c *CreateInfo, s *Service) (service.IService, error) { readerArgs := evmreader.CreateInfo{ - CreateInfo: service.CreateInfo{ + ServiceConfigs: service.ServiceConfigs{ Name: config.ServiceEvmReader, - Context: s.Context, - Cancel: s.Cancel, + Context: c.ServiceConfigs.Context, + Cancel: c.ServiceConfigs.Cancel, LogLevel: config.ResolveServiceLogLevel(config.ServiceEvmReader, c.Config.LogLevel), LogColor: c.Config.LogColor, EnableSignalHandling: false, @@ -186,16 +186,17 @@ func newEVMReader(ctx context.Context, c *CreateInfo, s *Service) (service.IServ func newAdvancer(ctx context.Context, c *CreateInfo, s *Service) (service.IService, error) { advancerArgs := advancer.CreateInfo{ - CreateInfo: service.CreateInfo{ - Name: config.ServiceAdvancer, - Context: s.Context, - Cancel: s.Cancel, - LogLevel: config.ResolveServiceLogLevel(config.ServiceAdvancer, c.Config.LogLevel), - LogColor: c.Config.LogColor, - EnableSignalHandling: false, - TelemetryCreate: false, - PollInterval: c.Config.AdvancerPollingInterval, - ServeMux: s.ServeMux, + TickServiceConfigs: service.TickServiceConfigs{ + PollInterval: c.Config.AdvancerPollingInterval, + ServiceConfigs: service.ServiceConfigs{ + Name: config.ServiceAdvancer, + Context: c.ServiceConfigs.Context, + Cancel: c.ServiceConfigs.Cancel, + LogLevel: config.ResolveServiceLogLevel(config.ServiceAdvancer, c.Config.LogLevel), + LogColor: c.Config.LogColor, + EnableSignalHandling: false, + TelemetryCreate: false, + }, }, Repository: c.Repository, Config: *c.Config.ToAdvancerConfig(), @@ -210,16 +211,17 @@ func newAdvancer(ctx context.Context, c *CreateInfo, s *Service) (service.IServi func newValidator(ctx context.Context, c *CreateInfo, s *Service) (service.IService, error) { validatorArgs := validator.CreateInfo{ - CreateInfo: service.CreateInfo{ - Name: config.ServiceValidator, - Context: s.Context, - Cancel: s.Cancel, - LogLevel: config.ResolveServiceLogLevel(config.ServiceValidator, c.Config.LogLevel), - LogColor: c.Config.LogColor, - EnableSignalHandling: false, - TelemetryCreate: false, - PollInterval: c.Config.ValidatorPollingInterval, - ServeMux: s.ServeMux, + TickServiceConfigs: service.TickServiceConfigs{ + PollInterval: c.Config.ValidatorPollingInterval, + ServiceConfigs: service.ServiceConfigs{ + Name: config.ServiceValidator, + Context: c.ServiceConfigs.Context, + Cancel: c.ServiceConfigs.Cancel, + LogLevel: config.ResolveServiceLogLevel(config.ServiceValidator, c.Config.LogLevel), + LogColor: c.Config.LogColor, + EnableSignalHandling: false, + TelemetryCreate: false, + }, }, Repository: c.Repository, Config: *c.Config.ToValidatorConfig(), @@ -234,16 +236,17 @@ func newValidator(ctx context.Context, c *CreateInfo, s *Service) (service.IServ func newClaimer(ctx context.Context, c *CreateInfo, s *Service) (service.IService, error) { claimerArgs := claimer.CreateInfo{ - CreateInfo: service.CreateInfo{ - Name: config.ServiceClaimer, - Context: s.Context, - Cancel: s.Cancel, - LogLevel: config.ResolveServiceLogLevel(config.ServiceClaimer, c.Config.LogLevel), - LogColor: c.Config.LogColor, - EnableSignalHandling: false, - TelemetryCreate: false, - PollInterval: c.Config.ClaimerPollingInterval, - ServeMux: s.ServeMux, + TickServiceConfigs: service.TickServiceConfigs{ + PollInterval: c.Config.ClaimerPollingInterval, + ServiceConfigs: service.ServiceConfigs{ + Name: config.ServiceClaimer, + Context: c.ServiceConfigs.Context, + Cancel: c.ServiceConfigs.Cancel, + LogLevel: config.ResolveServiceLogLevel(config.ServiceClaimer, c.Config.LogLevel), + LogColor: c.Config.LogColor, + EnableSignalHandling: false, + TelemetryCreate: false, + }, }, EthConn: c.ClaimerClient, Repository: c.Repository, @@ -259,10 +262,10 @@ func newClaimer(ctx context.Context, c *CreateInfo, s *Service) (service.IServic func newJsonrpc(ctx context.Context, c *CreateInfo, s *Service) (service.IService, error) { jsonrpcArgs := jsonrpc.CreateInfo{ - CreateInfo: service.CreateInfo{ + ServiceConfigs: service.ServiceConfigs{ Name: config.ServiceJsonrpc, - Context: s.Context, - Cancel: s.Cancel, + Context: c.ServiceConfigs.Context, + Cancel: c.ServiceConfigs.Cancel, LogLevel: config.ResolveServiceLogLevel(config.ServiceJsonrpc, c.Config.LogLevel), LogColor: c.Config.LogColor, EnableSignalHandling: false, @@ -282,16 +285,17 @@ func newJsonrpc(ctx context.Context, c *CreateInfo, s *Service) (service.IServic func newPrt(ctx context.Context, c *CreateInfo, s *Service) (service.IService, error) { prtArgs := prt.CreateInfo{ - CreateInfo: service.CreateInfo{ - Name: config.ServicePrt, - Context: s.Context, - Cancel: s.Cancel, - LogLevel: config.ResolveServiceLogLevel(config.ServicePrt, c.Config.LogLevel), - LogColor: c.Config.LogColor, - EnableSignalHandling: false, - TelemetryCreate: false, - PollInterval: c.Config.PrtPollingInterval, - ServeMux: s.ServeMux, + TickServiceConfigs: service.TickServiceConfigs{ + PollInterval: c.Config.PrtPollingInterval, + ServiceConfigs: service.ServiceConfigs{ + Name: config.ServicePrt, + Context: c.ServiceConfigs.Context, + Cancel: c.ServiceConfigs.Cancel, + LogLevel: config.ResolveServiceLogLevel(config.ServicePrt, c.Config.LogLevel), + LogColor: c.Config.LogColor, + EnableSignalHandling: false, + TelemetryCreate: false, + }, }, EthClient: c.PrtClient, Repository: c.Repository, diff --git a/internal/prt/service.go b/internal/prt/service.go index b60c4971b..c5e7a7748 100644 --- a/internal/prt/service.go +++ b/internal/prt/service.go @@ -21,7 +21,7 @@ import ( ) type CreateInfo struct { - service.CreateInfo + service.TickServiceConfigs Config config.PrtConfig Repository repository.Repository EthClient EthClientInterface @@ -29,7 +29,7 @@ type CreateInfo struct { } type Service struct { - service.Service + service.TickServiceTemplate repository prtRepository client EthClientInterface adapterFactory AdapterFactory @@ -49,16 +49,15 @@ type PersistentConfig struct { ChainID uint64 } -func Create(ctx context.Context, c *CreateInfo) (*Service, error) { +func Create(ctx context.Context, c *CreateInfo) (service.IService, error) { var err error if err = ctx.Err(); err != nil { return nil, err // This returns context.Canceled or context.DeadlineExceeded. } s := &Service{} - c.Impl = s - err = service.Create(ctx, &c.CreateInfo, &s.Service) + err = service.InitTickServiceTemplate(&c.TickServiceConfigs, &s.TickServiceTemplate, s, s) if err != nil { return nil, err } @@ -118,41 +117,39 @@ func Create(ctx context.Context, c *CreateInfo) (*Service, error) { } } + s.LogConfig(c.Config) + return s, nil } -func (s *Service) Alive() bool { return true } -func (s *Service) Ready() bool { return true } -func (s *Service) Reload() []error { return nil } - // Tick executes the Validator main logic of producing claims and/or proofs // for processed epochs of all running applications. -func (s *Service) Tick() []error { +func (s *Service) Tick(ctx context.Context) (bool, []error) { // Check for shutdown before starting work, consistent with the advancer. - if s.IsStopping() { - return nil + if ctx.Err() != nil { + return false, nil } - apps, _, err := getAllRunningApplications(s.Context, s.repository) + apps, _, err := getAllRunningApplications(ctx, s.repository) if err != nil { // Only suppress context errors during shutdown; surface real DB errors. - if s.IsStopping() && errors.Is(err, context.Canceled) { + if errors.Is(err, context.Canceled) { s.Logger.Warn("Tick interrupted by shutdown", "error", err) - return nil + return false, nil } - return []error{fmt.Errorf("failed to get running applications. %w", err)} + return false, []error{fmt.Errorf("failed to get running applications. %w", err)} } // validate each application errs := []error{} for idx := range apps { - if s.Context.Err() != nil { - return errs + if ctx.Err() != nil { + return false, errs } - if err := s.validateApplication(s.Context, apps[idx]); err != nil { + if err := s.validateApplication(ctx, apps[idx]); err != nil { // During shutdown, in-flight L1 requests see context cancellation. // Suppress these to avoid spurious ERR log entries. - if s.IsStopping() && errors.Is(err, context.Canceled) { + if errors.Is(err, context.Canceled) { s.Logger.Warn("Tick interrupted by shutdown", "application", apps[idx].IApplicationAddress, "error", err) continue @@ -160,16 +157,7 @@ func (s *Service) Tick() []error { errs = append(errs, err) } } - return errs -} - -func (s *Service) Stop(_ bool) []error { - s.SetStopping() - return nil -} - -func (s *Service) String() string { - return s.Name + return false, errs } func (s *Service) setupPersistentConfig( diff --git a/internal/validator/validator.go b/internal/validator/validator.go index fcb214865..c6764e9d1 100644 --- a/internal/validator/validator.go +++ b/internal/validator/validator.go @@ -22,7 +22,8 @@ import ( ) type Service struct { - service.Service + service.TickServiceTemplate + repository ValidatorRepository // cached constants @@ -31,23 +32,22 @@ type Service struct { } type CreateInfo struct { - service.CreateInfo + service.TickServiceConfigs Config config.ValidatorConfig Repository repository.Repository } -func Create(ctx context.Context, c *CreateInfo) (*Service, error) { +func Create(ctx context.Context, c *CreateInfo) (service.IService, error) { var err error if err = ctx.Err(); err != nil { return nil, err // This returns context.Canceled or context.DeadlineExceeded. } s := &Service{} - c.Impl = s - err = service.Create(ctx, &c.CreateInfo, &s.Service) + err = service.InitTickServiceTemplate(&c.TickServiceConfigs, &s.TickServiceTemplate, s, s) if err != nil { return nil, err } @@ -60,37 +60,27 @@ func Create(ctx context.Context, c *CreateInfo) (*Service, error) { s.pristinePostContext = merkle.CreatePostContext() s.pristineRootHash = s.pristinePostContext[merkle.TREE_DEPTH] + s.LogConfig(c.Config) + return s, nil } -func (s *Service) Alive() bool { return true } -func (s *Service) Ready() bool { return true } -func (s *Service) Reload() []error { return nil } - // Tick executes the Validator main logic of producing claims and/or proofs // for processed epochs of all running applications. -func (s *Service) Tick() []error { - apps, _, err := getAllRunningApplications(s.Context, s.repository) +func (s *Service) Tick(ctx context.Context) (bool, []error) { + apps, _, err := getAllRunningApplications(ctx, s.repository) if err != nil { - return []error{fmt.Errorf("failed to get running applications. %w", err)} + return false, []error{fmt.Errorf("failed to get running applications. %w", err)} } // validate each application errs := []error{} for idx := range apps { - if err := s.validateApplication(s.Context, apps[idx]); err != nil { + if err := s.validateApplication(ctx, apps[idx]); err != nil { errs = append(errs, err) } } - return errs -} -func (s *Service) Stop(_ bool) []error { - s.SetStopping() - return nil -} - -func (s *Service) String() string { - return s.Name + return false, errs } // The maximum height for the Merkle tree of all outputs produced diff --git a/internal/validator/validator_test.go b/internal/validator/validator_test.go index fb5cc866e..a5d279705 100644 --- a/internal/validator/validator_test.go +++ b/internal/validator/validator_test.go @@ -40,8 +40,8 @@ func (s *ValidatorSuite) SetupSubTest() { pristinePostContext: postContext, pristineRootHash: postContext[merkle.TREE_DEPTH], } - serviceArgs := &service.CreateInfo{Name: "validator", Impl: validator} - err := service.Create(context.Background(), serviceArgs, &validator.Service) + serviceArgs := &service.ServiceConfigs{Name: "validator"} + err := service.InitServiceTemplate(serviceArgs, &validator.ServiceTemplate, validator) s.Require().Nil(err) dummyOutputsMerkleRoot := common.HexToHash("0x0a162946e56158bac0673e6dd3bdfdc1e4a0e7744a120fdb640050c8d7abe1c6") dummyEpochs = []Epoch{ diff --git a/pkg/service/service.go b/pkg/service/service.go index 90af6fa1f..282e04111 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -6,12 +6,10 @@ // The runtime information is then stored in the Service. // // The recommended way to implement a new service is to: -// - embed a [CreateInfo] struct into a new CreateInfo struct. -// - embed a [Service] struct into a new Service struct. +// - embed a [ServiceConfigs] struct into a new CreateInfo struct. +// - embed a [ServiceTemplate] struct into a new Service struct. // - embed a [Create] call into a new Create function. // -// Check DummyService, SlowService and ListService source code for examples of how to do it. -// // To use a service, call its corresponding Create function with a matching CreateInfo and Service, // then fill in the appropriate CreateInfo fields. // Here are a few of the available options: @@ -74,110 +72,74 @@ const telemetryShutdownTimeout = 5 * time.Second var ( ErrInvalid = fmt.Errorf("Invalid Argument") // invalid argument + ErrServiceStopped = fmt.Errorf("Service was stopped") ) -// ServiceImpl is the interface that concrete services must implement. -// -// IMPORTANT: Stop() implementations that shadow Service.Stop() MUST call -// s.SetStopping() as their first action. This sets the stopping flag so that -// a concurrent Tick() can detect shutdown-in-progress via IsStopping() and -// suppress expected teardown errors (e.g., context.Canceled from in-flight -// RPCs). Without this call, the race window between Stop() tearing down -// resources and Tick() observing the cancellation produces spurious errors. -// -// When Stop() is called through the framework's Service.Stop() dispatch, -// the flag is set automatically before Impl.Stop() runs. But the node -// orchestrator calls child.Stop() directly (Go method resolution picks the -// concrete type's Stop, bypassing Service.Stop), so the impl's SetStopping() -// is the only thing that sets the flag on that path. -type ServiceImpl interface { - Alive() bool - Ready() bool - Reload() []error - Tick() []error - Stop(bool) []error -} - +// Public interface with methods to manipulate the service. type IService interface { Alive() bool Ready() bool Reload() []error - Tick() []error Stop(bool) []error Serve() error String() string } -// CreateInfo stores initialization data for the Create function -type CreateInfo struct { - Name string - LogLevel slog.Level - LogColor bool - EnableSignalHandling bool - TelemetryCreate bool - TelemetryAddress string - PollInterval time.Duration - Impl ServiceImpl - Logger *slog.Logger - ServeMux *http.ServeMux - Context context.Context - Cancel context.CancelFunc +/* + * Service template for services that do continuous processing. + */ - // EnableReschedule, when true, creates a self-continuation channel. - // Services that discover remaining work after a Tick() call - // SignalReschedule() to re-tick immediately without waiting for the - // timer interval. - // - // Migration: When the events library (feature/events-library-research) - // ships, Serve() will gain an additional EventChannel case for external - // cross-service notifications. Reschedule remains complementary: - // Reschedule = internal self-continuation ("I have more work"), - // EventChannel = external stimulus ("another service produced work"). - // Both coexist in the select loop alongside the Ticker safety-net. - EnableReschedule bool +// Internal interface with abstract methods called by ServiceTemplate. +// These methods are not part of the public service interface. +type LifecycleImpl interface { + OnReload() []error + OnStop(bool) []error + OnServe(ctx context.Context) error } -// Service stores runtime information. -type Service struct { - Running atomic.Bool +// ServiceTemplate stores runtime information. +type ServiceTemplate struct { Name string - Impl ServiceImpl Logger *slog.Logger - Ticker *time.Ticker - PollInterval time.Duration - Context context.Context - Cancel context.CancelFunc - Sighup chan os.Signal // SIGHUP to reload - SigShutdown chan os.Signal // SIGINT/SIGTERM to exit gracefully + lifecycleImpl LifecycleImpl + context context.Context + cancelContext context.CancelFunc + sigHangUp chan os.Signal // SIGHUP to reload + sigShutdown chan os.Signal // SIGINT/SIGTERM to exit gracefully ServeMux *http.ServeMux - Telemetry *http.Server - TelemetryFunc func() error - reschedule chan struct{} // self-continuation signal; see CreateInfo.EnableReschedule - - // stopping is set to true at the beginning of Stop(), before Impl.Stop() - // is called. Services can check this via IsStopping() from Tick() to - // detect that shutdown is in progress and suppress errors that are - // expected during teardown (e.g., context.Canceled from in-flight RPC - // calls). This covers the race window between Stop() being called and - // ctx.Cancel() propagating. - stopping atomic.Bool + telemetry *http.Server + telemetryFunc func() error + + // stopped server Stop() run exactly once, even when Stop() is called + // multiple times (by the child's Serve() loop and by the parent orchestrator). + stopped atomic.Bool +} + +// ServiceConfigs stores configuration for the InitServiceTemplate function +type ServiceConfigs struct { + Name string + Logger *slog.Logger + LogLevel slog.Level + LogColor bool + Context context.Context + Cancel context.CancelFunc + EnableSignalHandling bool + TelemetryCreate bool + TelemetryAddress string + ServeMux *http.ServeMux // used only for unit testing } -// Create a service by: -// - using values from s if non zero, -// - using values from c, -// - using default values when applicable -func Create(ctx context.Context, c *CreateInfo, s *Service) error { - if c == nil || c.Impl == nil || c.Impl == s || s == nil { +// Initialize the 'ServiceTemplate' structure using values from 'CreateInfo'. +// 'impl' must be a reference to the concrete service implementation that +// embeds 'ServiceTemplate' +func InitServiceTemplate(c *ServiceConfigs, s *ServiceTemplate, impl LifecycleImpl) error { + if c == nil || s == nil || impl == nil { return ErrInvalid } - if err := ctx.Err(); err != nil { - return err // This returns context.Canceled or context.DeadlineExceeded. - } - s.Running.Store(false) + s.lifecycleImpl = impl + s.Name = c.Name - s.Impl = c.Impl s.Logger = c.Logger // log @@ -186,42 +148,24 @@ func Create(ctx context.Context, c *CreateInfo, s *Service) error { } // context and cancelation - if s.Context == nil { - if c.Context == nil { - c.Context = context.Background() - } - s.Context = c.Context - } - if s.Cancel == nil { - if c.Cancel == nil { - s.Context, c.Cancel = context.WithCancel(c.Context) - } - s.Cancel = c.Cancel + if c.Context == nil { + c.Context = context.Background() } - - // ticker - if s.Ticker == nil { - if c.PollInterval == 0 { - c.PollInterval = time.Minute - } - s.PollInterval = c.PollInterval - s.Ticker = time.NewTicker(s.PollInterval) - } - - // self-rescheduling - if c.EnableReschedule { - s.reschedule = make(chan struct{}, 1) + s.context = c.Context + if c.Cancel == nil { + s.context, c.Cancel = context.WithCancel(c.Context) } + s.cancelContext = c.Cancel // signal handling if c.EnableSignalHandling { - if s.Sighup == nil { - s.Sighup = make(chan os.Signal, 1) - signal.Notify(s.Sighup, syscall.SIGHUP) + if s.sigHangUp == nil { + s.sigHangUp = make(chan os.Signal, 1) + signal.Notify(s.sigHangUp, syscall.SIGHUP) } - if s.SigShutdown == nil { - s.SigShutdown = make(chan os.Signal, 1) - signal.Notify(s.SigShutdown, syscall.SIGINT, syscall.SIGTERM) + if s.sigShutdown == nil { + s.sigShutdown = make(chan os.Signal, 1) + signal.Notify(s.sigShutdown, syscall.SIGINT, syscall.SIGTERM) } } @@ -236,32 +180,36 @@ func Create(ctx context.Context, c *CreateInfo, s *Service) error { if c.TelemetryAddress == "" { c.TelemetryAddress = ":8080" } - s.Telemetry, s.TelemetryFunc = s.CreateDefaultTelemetry(c.TelemetryAddress) + s.telemetry, s.telemetryFunc = s.CreateDefaultTelemetry(c.TelemetryAddress) go func() { - if err := s.TelemetryFunc(); err != nil { + if err := s.telemetryFunc(); err != nil { s.Logger.Error("Telemetry HTTP server failed", "error", err) } }() } s.Logger.Info("Create", "version", version.BuildVersion, "log_level", c.LogLevel, "pid", os.Getpid()) - if s.Telemetry != nil { - s.Logger.Info("Telemetry", "address", s.Telemetry.Addr) + if s.telemetry != nil { + s.Logger.Info("Telemetry", "address", s.telemetry.Addr) } return nil } -func (s *Service) Alive() bool { - return s.Impl.Alive() -} - -func (s *Service) Ready() bool { - return s.Impl.Ready() -} +// Default implementation of some abstract methods (except `OnServe`). +// Remove them to force concrete services to provide implementation for them. +func (s *ServiceTemplate) OnReload() []error { return nil } +func (s *ServiceTemplate) OnStop(bool) []error { return nil } +func (s *ServiceTemplate) Alive() bool { return true } +func (s *ServiceTemplate) Ready() bool { return true } +func (s *ServiceTemplate) String() string { return s.Name } + +func (s *ServiceTemplate) Reload() []error { + if s.stopped.Load() { + return []error{ErrServiceStopped} + } -func (s *Service) Reload() []error { start := time.Now() - errs := s.Impl.Reload() + errs := s.lifecycleImpl.OnReload() elapsed := time.Since(start) if len(errs) > 0 { @@ -275,57 +223,34 @@ func (s *Service) Reload() []error { return errs } -func (s *Service) Tick() []error { - start := time.Now() - errs := s.Impl.Tick() - elapsed := time.Since(start) - - if len(errs) > 0 { - s.Logger.Error("Tick", - "duration", elapsed, - "error", errs) - } else { - s.Logger.Debug("Tick", - "duration", elapsed) +func (s *ServiceTemplate) Stop(force bool) []error { + // CAS achieves once-semantics: the second caller returns immediately + // (fire-and-forget) rather than blocking like sync.Once. This is safe + // because the orchestrator calls Cancel() after Stop() and waits for + // the Serve goroutine to exit. + if !s.stopped.CompareAndSwap(false, true) { + return nil // already stopped } - return errs -} -// IsStopping reports whether Stop() has been called. Services use this in -// Tick() to detect shutdown-in-progress and suppress expected teardown errors. -func (s *Service) IsStopping() bool { - return s.stopping.Load() -} + s.cancelContext() -// SetStopping sets the stopping flag. Services whose Stop() method shadows -// Service.Stop() (i.e., every ServiceImpl) must call this at the top of their -// Stop so that concurrent Tick goroutines can observe IsStopping() == true -// before resources are torn down. -func (s *Service) SetStopping() { - s.stopping.Store(true) -} - -func (s *Service) Stop(force bool) []error { - s.stopping.Store(true) start := time.Now() - errs := s.Impl.Stop(force) - if s.Telemetry != nil { + errs := s.lifecycleImpl.OnStop(force) + if s.telemetry != nil { shutdownCtx, cancel := context.WithTimeout(context.Background(), telemetryShutdownTimeout) defer cancel() - if err := s.Telemetry.Shutdown(shutdownCtx); err != nil { + if err := s.telemetry.Shutdown(shutdownCtx); err != nil { errs = append(errs, err) } } - if s.SigShutdown != nil { - signal.Stop(s.SigShutdown) + if s.sigShutdown != nil { + signal.Stop(s.sigShutdown) } - if s.Sighup != nil { - signal.Stop(s.Sighup) + if s.sigHangUp != nil { + signal.Stop(s.sigHangUp) } elapsed := time.Since(start) - s.Running.Store(false) - s.Cancel() if len(errs) > 0 { s.Logger.Error("Stop", "force", force, @@ -339,74 +264,128 @@ func (s *Service) Stop(force bool) []error { return errs } -// rescheduleChan returns the reschedule channel, or nil if rescheduling is disabled. -// A nil channel in a select case blocks forever, preserving timer-only behavior. -func (s *Service) rescheduleChan() <-chan struct{} { - return s.reschedule +func (s *ServiceTemplate) Serve() error { + if s.stopped.Load() { + return ErrServiceStopped + } + + go func() { + for { + select { + case <-s.sigHangUp: + s.Reload() + case <-s.sigShutdown: + s.Stop(false) // Graceful shutdown; errors are logged by Stop. + return + case <-s.context.Done(): + s.Stop(true) // Stop logs errors internally. + return + } + } + }() + + defer s.Stop(true) + + return s.lifecycleImpl.OnServe(s.context) } -// SignalReschedule performs a non-blocking send on the reschedule channel. -// If a signal is already pending, this is a no-op (one wake is sufficient). -// Does nothing if rescheduling is not enabled. -// INVARIANT: This method must never block. -func (s *Service) SignalReschedule() { - select { - case s.reschedule <- struct{}{}: - default: +// LogConfig logs the service configuration at debug level. +// Intended for use by standalone service binaries after Create. +func (s *ServiceTemplate) LogConfig(config any) { + s.Logger.Info("Starting service", "config", config) +} + +/* + * Alternative service template that implements the tick-based processing. + */ + +type TickImpl interface { + Tick(ctx context.Context) (bool, []error) +} + +type TickServiceTemplate struct { + ServiceTemplate + tickImpl TickImpl + ticker *time.Ticker +} + +type TickServiceConfigs struct { + ServiceConfigs + PollInterval time.Duration +} + +func InitTickServiceTemplate( + cfg *TickServiceConfigs, + tmpl *TickServiceTemplate, + lifecycleImpl LifecycleImpl, + tickImpl TickImpl, +) error { + if cfg == nil || tmpl == nil || tickImpl == nil { + return ErrInvalid + } + + err := InitServiceTemplate(&cfg.ServiceConfigs, &tmpl.ServiceTemplate, lifecycleImpl) + if err != nil { + return err + } + + tmpl.tickImpl = tickImpl + + // ticker + if cfg.PollInterval == 0 { + cfg.PollInterval = time.Minute } + tmpl.ticker = time.NewTicker(cfg.PollInterval) + + return nil } -// DrainReschedule consumes and discards a pending reschedule signal, if any. -// Returns true if a signal was pending. Intended for testing. -func (s *Service) DrainReschedule() bool { - select { - case <-s.reschedule: - return true - default: +func (s *TickServiceTemplate) tick(ctx context.Context) bool { + if ctx.Err() != nil { return false } + start := time.Now() + reschedule, errs := s.tickImpl.Tick(ctx) + elapsed := time.Since(start) + + if len(errs) > 0 { + s.Logger.Error("Tick", + "duration", elapsed, + "reschedule", reschedule, + "error", errs, + ) + } else { + s.Logger.Debug("Tick", + "duration", elapsed, + "reschedule", reschedule, + ) + } + return reschedule } -func (s *Service) Serve() error { - s.Running.Store(true) +func (s *TickServiceTemplate) OnStop(bool) []error { + s.ticker.Stop() + return nil +} - // Check for context cancellation before the first tick. - select { - case <-s.Context.Done(): - s.Stop(true) // Stop logs errors internally. +func (s *TickServiceTemplate) OnServe(ctx context.Context) error { + if ctx.Err() != nil { return nil - default: } - - s.Tick() - for s.Running.Load() { + for s.tick(ctx) {} + for { select { - case <-s.Sighup: - s.Reload() - case <-s.SigShutdown: - s.Stop(false) // Graceful shutdown; errors are logged by Stop. + case <-ctx.Done(): return nil - case <-s.Context.Done(): - s.Stop(true) // Stop logs errors internally. - return nil - case <-s.Ticker.C: - s.Tick() - case <-s.rescheduleChan(): - s.Tick() + case <-s.ticker.C: + for s.tick(ctx) {} } } - return nil } -func (s *Service) String() string { - return s.Name -} - -// LogConfig logs the service configuration at debug level. -// Intended for use by standalone service binaries after Create. -func (s *Service) LogConfig(config any) { - s.Logger.Info("Starting service", "config", config) -} +/* + * Service Logger + */ func NewLogger(level slog.Level, color bool) *slog.Logger { opts := &tint.Options{ @@ -420,12 +399,15 @@ func NewLogger(level slog.Level, color bool) *slog.Logger { return slog.New(handler) } -func NewServiceLogger(c *CreateInfo) *slog.Logger { +func NewServiceLogger(c *ServiceConfigs) *slog.Logger { return NewLogger(c.LogLevel, c.LogColor).With("service", c.Name) } -// Telemetry -func (s *Service) CreateDefaultTelemetry(addr string) (*http.Server, func() error) { +/* + * Service Telemetry + */ + +func (s *ServiceTemplate) CreateDefaultTelemetry(addr string) (*http.Server, func() error) { s.ServeMux.Handle("/readyz", http.HandlerFunc(s.ReadyHandler)) s.ServeMux.Handle("/livez", http.HandlerFunc(s.AliveHandler)) @@ -451,7 +433,7 @@ func (s *Service) CreateDefaultTelemetry(addr string) (*http.Server, func() erro } // HTTP handler for `/s.Name/readyz` that exposes the value of Ready() -func (s *Service) ReadyHandler(w http.ResponseWriter, r *http.Request) { +func (s *ServiceTemplate) ReadyHandler(w http.ResponseWriter, r *http.Request) { if !s.Ready() { http.Error(w, s.Name+": ready check failed", http.StatusInternalServerError) @@ -461,7 +443,7 @@ func (s *Service) ReadyHandler(w http.ResponseWriter, r *http.Request) { } // HTTP handler for `/s.Name/livez` that exposes the value of Alive() -func (s *Service) AliveHandler(w http.ResponseWriter, r *http.Request) { +func (s *ServiceTemplate) AliveHandler(w http.ResponseWriter, r *http.Request) { if !s.Alive() { http.Error(w, s.Name+": alive check failed", http.StatusInternalServerError) diff --git a/pkg/service/service_test.go b/pkg/service/service_test.go index f123763a3..6d9bc08b6 100644 --- a/pkg/service/service_test.go +++ b/pkg/service/service_test.go @@ -16,20 +16,20 @@ import ( // mockImpl is a minimal ServiceImpl for testing the Serve() loop. type mockImpl struct { + TickServiceTemplate tickCount atomic.Int32 - onTick func(n int32) // called on each Tick with the tick count (1-based) + onTick func(n int32) bool // called on each Tick with the tick count (1-based) } -func (m *mockImpl) Alive() bool { return true } -func (m *mockImpl) Ready() bool { return true } -func (m *mockImpl) Reload() []error { return nil } -func (m *mockImpl) Stop(bool) []error { return nil } -func (m *mockImpl) Tick() []error { +func (m *mockImpl) OnReload() []error { return nil } +func (m *mockImpl) OnStop(bool) []error { return nil } +func (m *mockImpl) Tick(ctx context.Context) (bool, []error) { n := m.tickCount.Add(1) + reschedule := false if m.onTick != nil { - m.onTick(n) + reschedule = m.onTick(n) } - return nil + return reschedule, nil } // createTestService creates a Service for testing with the given mock and @@ -38,22 +38,20 @@ func (m *mockImpl) Tick() []error { func createTestService( t *testing.T, impl *mockImpl, - enableReschedule bool, -) (*Service, context.CancelFunc) { +) (IService, context.CancelFunc) { t.Helper() ctx, cancel := context.WithCancel(context.Background()) - s := &Service{} - err := Create(ctx, &CreateInfo{ - Name: "test", - LogLevel: slog.LevelError, - Impl: impl, + err := InitTickServiceTemplate(&TickServiceConfigs{ + ServiceConfigs: ServiceConfigs{ + Name: "test", + LogLevel: slog.LevelError, + Context: ctx, + Cancel: cancel, + }, PollInterval: 10 * time.Minute, // long: tests control wakeup explicitly - Context: ctx, - Cancel: cancel, - EnableReschedule: enableReschedule, - }, s) + }, &impl.TickServiceTemplate, impl, impl) require.NoError(t, err) - return s, cancel + return impl, cancel } type ServeSuite struct { @@ -69,20 +67,20 @@ func (s *ServeSuite) TestDisabledReschedulePreservesExistingBehavior() { // Serve() should tick only on timer fires. impl := &mockImpl{} ctx, cancel := context.WithCancel(context.Background()) - svc := &Service{} - err := Create(ctx, &CreateInfo{ - Name: "test-no-resched", - LogLevel: slog.LevelError, - Impl: impl, + err := InitTickServiceTemplate(&TickServiceConfigs{ + ServiceConfigs: ServiceConfigs{ + Name: "test-no-resched", + LogLevel: slog.LevelError, + Context: ctx, + Cancel: cancel, + }, PollInterval: 20 * time.Millisecond, - Context: ctx, - Cancel: cancel, - }, svc) + }, &impl.TickServiceTemplate, impl, impl) s.Require().NoError(err) done := make(chan struct{}) go func() { - _ = svc.Serve() + _ = impl.Serve() close(done) }() @@ -102,18 +100,16 @@ func (s *ServeSuite) TestDisabledReschedulePreservesExistingBehavior() { func (s *ServeSuite) TestRescheduleTriggersImmediateRetick() { // When SignalReschedule() is called from Tick(), Serve() should call // Tick() again immediately without waiting for the timer. - var svc *Service - impl := &mockImpl{ - onTick: func(n int32) { + var impl *mockImpl + impl = &mockImpl{ + onTick: func(n int32) bool { // Signal reschedule on ticks 1 and 2 (the initial tick // and the first rescheduled tick). Stop on tick 3. - if n <= 2 { - svc.SignalReschedule() - } + return n <= 2 }, } - svc, cancel := createTestService(s.T(), impl, true) + svc, cancel := createTestService(s.T(), impl) defer cancel() done := make(chan struct{}) @@ -133,66 +129,17 @@ func (s *ServeSuite) TestRescheduleTriggersImmediateRetick() { "should have at least 3 ticks: initial + 2 rescheduled") } -func (s *ServeSuite) TestRescheduleCoalesces() { - // Multiple signals while Tick() is running should result in at most - // one extra tick, not one per signal. - var svc *Service - tickStarted := make(chan struct{}) - tickProceed := make(chan struct{}) - - impl := &mockImpl{ - onTick: func(n int32) { - if n == 1 { - // Signal that the first tick is running. - close(tickStarted) - // Block the first tick until the test is ready. - <-tickProceed - } - }, - } - - svc, cancel := createTestService(s.T(), impl, true) - defer cancel() - - done := make(chan struct{}) - go func() { - _ = svc.Serve() - close(done) - }() - - // Wait for the first tick to start. - <-tickStarted - - // Send multiple signals while tick is blocked. Only one fits in the buffer. - for range 10 { - svc.SignalReschedule() - } - - // Let the first tick complete. - close(tickProceed) - - // Wait for the rescheduled tick to fire, then shut down. - time.Sleep(50 * time.Millisecond) - cancel() - <-done - - // Should be exactly 2 ticks: the initial one + one rescheduled (coalesced). - ticks := impl.tickCount.Load() - s.Equal(int32(2), ticks, - "should have exactly 2 ticks: initial + 1 coalesced reschedule") -} - func (s *ServeSuite) TestContextCancellationExitsPromptly() { // When context is cancelled with a reschedule signal pending, // Serve() should exit promptly. - var svc *Service - impl := &mockImpl{ - onTick: func(_ int32) { - svc.SignalReschedule() + var impl *mockImpl + impl = &mockImpl{ + onTick: func(_ int32) bool { + return true }, } - svc, cancel := createTestService(s.T(), impl, true) + svc, cancel := createTestService(s.T(), impl) done := make(chan struct{}) go func() { @@ -217,7 +164,7 @@ func (s *ServeSuite) TestServeExitsOnContextCancelledBeforeFirstTick() { impl := &mockImpl{} // Create the service with a live context, then cancel before Serve(). - svc, cancel := createTestService(s.T(), impl, false) + svc, cancel := createTestService(s.T(), impl) cancel() err := svc.Serve() @@ -225,40 +172,3 @@ func (s *ServeSuite) TestServeExitsOnContextCancelledBeforeFirstTick() { // No ticks should have fired since context was already cancelled. s.Equal(int32(0), impl.tickCount.Load()) } - -func (s *ServeSuite) TestRescheduleEnabledCreatesChannel() { - impl := &mockImpl{} - svc, cancel := createTestService(s.T(), impl, true) - defer cancel() - - s.NotNil(svc.reschedule, "reschedule channel should be created when enabled") -} - -func (s *ServeSuite) TestRescheduleDisabledLeavesNilChannel() { - impl := &mockImpl{} - svc, cancel := createTestService(s.T(), impl, false) - defer cancel() - - s.Nil(svc.reschedule, "reschedule channel should be nil when disabled") -} - -func (s *ServeSuite) TestSignalRescheduleNoopWhenDisabled() { - impl := &mockImpl{} - svc, cancel := createTestService(s.T(), impl, false) - defer cancel() - - // Should not panic on nil channel. - s.NotPanics(func() { svc.SignalReschedule() }) -} - -func (s *ServeSuite) TestDrainReschedule() { - impl := &mockImpl{} - svc, cancel := createTestService(s.T(), impl, true) - defer cancel() - - s.False(svc.DrainReschedule(), "should be empty initially") - - svc.SignalReschedule() - s.True(svc.DrainReschedule(), "should drain pending signal") - s.False(svc.DrainReschedule(), "should be empty after drain") -} diff --git a/pkg/service/telemetry_test.go b/pkg/service/telemetry_test.go index c854bda3a..267544dd9 100644 --- a/pkg/service/telemetry_test.go +++ b/pkg/service/telemetry_test.go @@ -14,13 +14,13 @@ import ( // newTelemetryTestService returns a *Service ready to have CreateDefaultTelemetry // called on it. It wires a ServeMux, a mockImpl, and a discard logger. -func newTelemetryTestService() *Service { +func newTelemetryTestService() *ServiceTemplate { impl := &mockImpl{} - return &Service{ - Name: "test", - Logger: discardLogger(), - ServeMux: http.NewServeMux(), - Impl: impl, + return &ServiceTemplate{ + Name: "test", + Logger: discardLogger(), + ServeMux: http.NewServeMux(), + lifecycleImpl: impl, } } diff --git a/test/validator/validator_test.go b/test/validator/validator_test.go index e80f94b7f..b77dc9505 100644 --- a/test/validator/validator_test.go +++ b/test/validator/validator_test.go @@ -58,13 +58,16 @@ func (s *ValidatorRepositoryIntegrationSuite) SetupSubTest() { s.Require().Nil(err) serviceArgs := validator.CreateInfo{ - CreateInfo: service.CreateInfo{ - Name: "validator", - LogLevel: slog.LevelDebug, + TickServiceConfigs: service.TickServiceConfigs{ + ServiceConfigs: service.ServiceConfigs{ + Name: "validator", + LogLevel: slog.LevelDebug, + }, }, Repository: s.repository, } - s.validator, err = validator.Create(context.Background(), &serviceArgs) + srv, err := validator.Create(context.Background(), &serviceArgs) + s.validator = srv.(*validator.Service) s.Require().Nil(err) } @@ -131,7 +134,7 @@ func (s *ValidatorRepositoryIntegrationSuite) TestItReturnsPristineClaim() { err = s.repository.StoreAdvanceResult(s.ctx, 1, &advanceResult) s.Require().Nil(err) - errs := s.validator.Tick() + _, errs := s.validator.Tick(s.ctx) s.Require().Equal(0, len(errs)) updatedEpoch, err := s.repository.GetEpoch(s.ctx, app.IApplicationAddress.String(), epoch.Index) @@ -253,7 +256,7 @@ func (s *ValidatorRepositoryIntegrationSuite) TestItReturnsPreviousClaim() { err = s.repository.StoreAdvanceResult(s.ctx, 1, &advanceResult) s.Require().Nil(err) - errs := s.validator.Tick() + _, errs := s.validator.Tick(s.ctx) s.Require().Equal(0, len(errs)) updatedEpoch, err := s.repository.GetEpoch(s.ctx, app.IApplicationAddress.String(), secondEpoch.Index) @@ -335,7 +338,7 @@ func (s *ValidatorRepositoryIntegrationSuite) TestItReturnsANewClaimAndProofs() err = s.repository.StoreAdvanceResult(s.ctx, 1, &advanceResult) s.Require().Nil(err) - errs := s.validator.Tick() + _, errs := s.validator.Tick(s.ctx) s.Require().Equal(0, len(errs)) updatedEpoch, err := s.repository.GetEpoch(s.ctx, app.IApplicationAddress.String(), epoch.Index) @@ -488,7 +491,7 @@ func (s *ValidatorRepositoryIntegrationSuite) TestItReturnsANewClaimAndProofs() err = s.repository.StoreAdvanceResult(s.ctx, 1, &advanceResult) s.Require().Nil(err) - errs := s.validator.Tick() + _, errs := s.validator.Tick(s.ctx) s.Require().Equal(0, len(errs)) updatedSecondEpoch, err := s.repository.GetEpoch(