diff --git a/internal/verifier/check.go b/internal/verifier/check.go index eb56d020..d00b2e27 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -211,12 +211,9 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh } } - verifier.logger.Info().Msg("Starting change readers.") - // Now that we’ve initialized verifier.generation we can // start the change readers. verifier.initializeChangeReaders() - verifier.mux.Unlock() err = retry.New().WithCallback( func(ctx context.Context, _ *retry.FuncInfo) error { @@ -241,28 +238,10 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh verifier.logger.Debug().Msg("Starting Check") - verifier.phase = Check - defer func() { - verifier.phase = Idle - }() - if err := verifier.startChangeHandling(ctx); err != nil { return err } - // Log the verification status when initially booting up so it's easy to see the current state - verificationStatus, err := verifier.GetVerificationStatus(ctx) - if err != nil { - return errors.Wrapf( - err, - "failed to retrieve verification status", - ) - } else { - verifier.logger.Debug(). - Any("status", verificationStatus). - Msg("Initial verification phase.") - } - err = verifier.CreateInitialTasksIfNeeded(ctx) if err != nil { return err @@ -272,7 +251,6 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh // Now enter the multi-generational steady check state for { - verifier.mux.Lock() err = retry.New().WithCallback( func(ctx context.Context, _ *retry.FuncInfo) error { return verifier.persistGenerationWhileLocked(ctx) @@ -359,7 +337,6 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh verifier.generationStartTime = time.Now() verifier.srcChangeReader.getEventRecorder().Reset() verifier.dstChangeReader.getEventRecorder().Reset() - verifier.phase = Recheck verifier.mux.Unlock() // Generation of recheck tasks can partial-fail. The following will @@ -381,6 +358,8 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh Err(err). Msg("Failed to clear out old recheck docs. (This is probably unimportant.)") } + + verifier.mux.Lock() } } @@ -608,6 +587,9 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error { } func (v *Verifier) initializeChangeReaders() { + + v.logger.Info().Msg("Starting change readers.") + v.srcChangeReader = v.newChangeStreamReader( v.srcNamespaces, src, diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 09f8d1e5..62cde7df 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -89,7 +89,6 @@ type Verifier struct { lastGeneration bool running bool generation int - phase string port int metaURI string metaClient *mongo.Client @@ -178,7 +177,6 @@ func NewVerifier(settings VerifierSettings, logPath string) *Verifier { logger: logger, writer: logWriter, - phase: Idle, numWorkers: NumWorkers, readPreference: readpref.Primary(), partitionSizeInBytes: 400 * 1024 * 1024, @@ -432,14 +430,7 @@ func (verifier *Verifier) getGeneration() (generation int, lastGeneration bool) } func (verifier *Verifier) getGenerationWhileLocked() (int, bool) { - - // As long as no other goroutine has locked the mux this will - // usefully panic if the caller neglected the lock. - wasUnlocked := verifier.mux.TryLock() - if wasUnlocked { - verifier.mux.Unlock() - panic("getGenerationWhileLocked() while unlocked") - } + verifier.assertLocked() return verifier.generation, verifier.lastGeneration } @@ -1247,9 +1238,17 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection( } func (verifier *Verifier) GetVerificationStatus(ctx context.Context) (*VerificationStatus, error) { - taskCollection := verifier.verificationTaskCollection() generation, _ := verifier.getGeneration() + return verifier.getVerificationStatusForGeneration(ctx, generation) +} + +func (verifier *Verifier) getVerificationStatusForGeneration( + ctx context.Context, + generation int, +) (*VerificationStatus, error) { + taskCollection := verifier.verificationTaskCollection() + var results []bson.Raw err := retry.New().WithCallback( @@ -1388,18 +1387,6 @@ func (verifier *Verifier) StartServer() error { return server.Run(context.Background()) } -func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { - status, err := verifier.GetVerificationStatus(ctx) - if err != nil { - return Progress{Error: err}, err - } - return Progress{ - Phase: verifier.phase, - Generation: verifier.generation, - Status: status, - }, nil -} - // Returned boolean indicates that namespaces are cached, and // whatever needs them can proceed. func (verifier *Verifier) ensureNamespaces(ctx context.Context) bool { diff --git a/internal/verifier/progress.go b/internal/verifier/progress.go new file mode 100644 index 00000000..f383b50f --- /dev/null +++ b/internal/verifier/progress.go @@ -0,0 +1,34 @@ +package verifier + +import "context" + +func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { + verifier.mux.RLock() + defer verifier.mux.RUnlock() + + generation := verifier.generation + + status, err := verifier.getVerificationStatusForGeneration(ctx, generation) + if err != nil { + return Progress{Error: err}, err + } + return Progress{ + Phase: verifier.getPhaseWhileLocked(), + Generation: verifier.generation, + Status: status, + }, nil +} + +func (verifier *Verifier) getPhaseWhileLocked() string { + verifier.assertLocked() + + if !verifier.running { + return Idle + } + + if verifier.generation > 0 { + return Recheck + } + + return Check +} diff --git a/internal/verifier/sync.go b/internal/verifier/sync.go new file mode 100644 index 00000000..5fead076 --- /dev/null +++ b/internal/verifier/sync.go @@ -0,0 +1,9 @@ +package verifier + +func (verifier *Verifier) assertLocked() { + if verifier.mux.TryLock() { + verifier.mux.Unlock() + + panic("INTERNAL ERROR: verifier should have been locked!") + } +}