Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 5 additions & 23 deletions internal/verifier/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,9 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
}
}

verifier.logger.Info().Msg("Starting change readers.")

// Now that we’ve initialized verifier.generation we can
// start the change readers.
verifier.initializeChangeReaders()
verifier.mux.Unlock()

err = retry.New().WithCallback(
func(ctx context.Context, _ *retry.FuncInfo) error {
Expand All @@ -241,28 +238,10 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh

verifier.logger.Debug().Msg("Starting Check")

verifier.phase = Check
defer func() {
verifier.phase = Idle
}()

if err := verifier.startChangeHandling(ctx); err != nil {
return err
}

// Log the verification status when initially booting up so it's easy to see the current state
verificationStatus, err := verifier.GetVerificationStatus(ctx)
if err != nil {
return errors.Wrapf(
err,
"failed to retrieve verification status",
)
} else {
verifier.logger.Debug().
Any("status", verificationStatus).
Msg("Initial verification phase.")
}

err = verifier.CreateInitialTasksIfNeeded(ctx)
if err != nil {
return err
Expand All @@ -272,7 +251,6 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh

// Now enter the multi-generational steady check state
for {
verifier.mux.Lock()
err = retry.New().WithCallback(
func(ctx context.Context, _ *retry.FuncInfo) error {
return verifier.persistGenerationWhileLocked(ctx)
Expand Down Expand Up @@ -359,7 +337,6 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
verifier.generationStartTime = time.Now()
verifier.srcChangeReader.getEventRecorder().Reset()
verifier.dstChangeReader.getEventRecorder().Reset()
verifier.phase = Recheck
verifier.mux.Unlock()

// Generation of recheck tasks can partial-fail. The following will
Expand All @@ -381,6 +358,8 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
Err(err).
Msg("Failed to clear out old recheck docs. (This is probably unimportant.)")
}

verifier.mux.Lock()
}
}

Expand Down Expand Up @@ -608,6 +587,9 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error {
}

func (v *Verifier) initializeChangeReaders() {

v.logger.Info().Msg("Starting change readers.")

v.srcChangeReader = v.newChangeStreamReader(
v.srcNamespaces,
src,
Expand Down
33 changes: 10 additions & 23 deletions internal/verifier/migration_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ type Verifier struct {
lastGeneration bool
running bool
generation int
phase string
port int
metaURI string
metaClient *mongo.Client
Expand Down Expand Up @@ -178,7 +177,6 @@ func NewVerifier(settings VerifierSettings, logPath string) *Verifier {
logger: logger,
writer: logWriter,

phase: Idle,
numWorkers: NumWorkers,
readPreference: readpref.Primary(),
partitionSizeInBytes: 400 * 1024 * 1024,
Expand Down Expand Up @@ -432,14 +430,7 @@ func (verifier *Verifier) getGeneration() (generation int, lastGeneration bool)
}

func (verifier *Verifier) getGenerationWhileLocked() (int, bool) {

// As long as no other goroutine has locked the mux this will
// usefully panic if the caller neglected the lock.
wasUnlocked := verifier.mux.TryLock()
if wasUnlocked {
verifier.mux.Unlock()
panic("getGenerationWhileLocked() while unlocked")
}
verifier.assertLocked()

return verifier.generation, verifier.lastGeneration
}
Expand Down Expand Up @@ -1247,9 +1238,17 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection(
}

func (verifier *Verifier) GetVerificationStatus(ctx context.Context) (*VerificationStatus, error) {
taskCollection := verifier.verificationTaskCollection()
generation, _ := verifier.getGeneration()

return verifier.getVerificationStatusForGeneration(ctx, generation)
}

func (verifier *Verifier) getVerificationStatusForGeneration(
ctx context.Context,
generation int,
) (*VerificationStatus, error) {
taskCollection := verifier.verificationTaskCollection()

var results []bson.Raw

err := retry.New().WithCallback(
Expand Down Expand Up @@ -1388,18 +1387,6 @@ func (verifier *Verifier) StartServer() error {
return server.Run(context.Background())
}

func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) {
status, err := verifier.GetVerificationStatus(ctx)
if err != nil {
return Progress{Error: err}, err
}
return Progress{
Phase: verifier.phase,
Generation: verifier.generation,
Status: status,
}, nil
}

// Returned boolean indicates that namespaces are cached, and
// whatever needs them can proceed.
func (verifier *Verifier) ensureNamespaces(ctx context.Context) bool {
Expand Down
34 changes: 34 additions & 0 deletions internal/verifier/progress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package verifier

import "context"

func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) {
verifier.mux.RLock()
defer verifier.mux.RUnlock()

generation := verifier.generation

status, err := verifier.getVerificationStatusForGeneration(ctx, generation)
if err != nil {
return Progress{Error: err}, err
}
return Progress{
Phase: verifier.getPhaseWhileLocked(),
Generation: verifier.generation,
Status: status,
}, nil
}

func (verifier *Verifier) getPhaseWhileLocked() string {
verifier.assertLocked()

if !verifier.running {
return Idle
}

if verifier.generation > 0 {
return Recheck
}

return Check
}
9 changes: 9 additions & 0 deletions internal/verifier/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package verifier

func (verifier *Verifier) assertLocked() {
if verifier.mux.TryLock() {
verifier.mux.Unlock()

panic("INTERNAL ERROR: verifier should have been locked!")
}
}