Skip to content

Commit 32372b9

Browse files
authored
REP-6875 Compute phase dynamically, and tighten locking. (#196)
This fixes a small quirk (bug?) where the `/progress` API could report 0 tasks for generation 0 at the start of checking. It also removes the verifier’s static `phase` property. This value in the `/progress` API is now computed dynamically. That API’s response also is now fully assembled under a read lock.
1 parent 151dc4b commit 32372b9

File tree

4 files changed

+58
-46
lines changed

4 files changed

+58
-46
lines changed

internal/verifier/check.go

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -211,12 +211,9 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
211211
}
212212
}
213213

214-
verifier.logger.Info().Msg("Starting change readers.")
215-
216214
// Now that we’ve initialized verifier.generation we can
217215
// start the change readers.
218216
verifier.initializeChangeReaders()
219-
verifier.mux.Unlock()
220217

221218
err = retry.New().WithCallback(
222219
func(ctx context.Context, _ *retry.FuncInfo) error {
@@ -241,28 +238,10 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
241238

242239
verifier.logger.Debug().Msg("Starting Check")
243240

244-
verifier.phase = Check
245-
defer func() {
246-
verifier.phase = Idle
247-
}()
248-
249241
if err := verifier.startChangeHandling(ctx); err != nil {
250242
return err
251243
}
252244

253-
// Log the verification status when initially booting up so it's easy to see the current state
254-
verificationStatus, err := verifier.GetVerificationStatus(ctx)
255-
if err != nil {
256-
return errors.Wrapf(
257-
err,
258-
"failed to retrieve verification status",
259-
)
260-
} else {
261-
verifier.logger.Debug().
262-
Any("status", verificationStatus).
263-
Msg("Initial verification phase.")
264-
}
265-
266245
err = verifier.CreateInitialTasksIfNeeded(ctx)
267246
if err != nil {
268247
return err
@@ -272,7 +251,6 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
272251

273252
// Now enter the multi-generational steady check state
274253
for {
275-
verifier.mux.Lock()
276254
err = retry.New().WithCallback(
277255
func(ctx context.Context, _ *retry.FuncInfo) error {
278256
return verifier.persistGenerationWhileLocked(ctx)
@@ -359,7 +337,6 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
359337
verifier.generationStartTime = time.Now()
360338
verifier.srcChangeReader.getEventRecorder().Reset()
361339
verifier.dstChangeReader.getEventRecorder().Reset()
362-
verifier.phase = Recheck
363340
verifier.mux.Unlock()
364341

365342
// Generation of recheck tasks can partial-fail. The following will
@@ -381,6 +358,8 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
381358
Err(err).
382359
Msg("Failed to clear out old recheck docs. (This is probably unimportant.)")
383360
}
361+
362+
verifier.mux.Lock()
384363
}
385364
}
386365

@@ -608,6 +587,9 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error {
608587
}
609588

610589
func (v *Verifier) initializeChangeReaders() {
590+
591+
v.logger.Info().Msg("Starting change readers.")
592+
611593
v.srcChangeReader = v.newChangeStreamReader(
612594
v.srcNamespaces,
613595
src,

internal/verifier/migration_verifier.go

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ type Verifier struct {
9090
lastGeneration bool
9191
running bool
9292
generation int
93-
phase string
9493
port int
9594
metaURI string
9695
metaClient *mongo.Client
@@ -179,7 +178,6 @@ func NewVerifier(settings VerifierSettings, logPath string) *Verifier {
179178
logger: logger,
180179
writer: logWriter,
181180

182-
phase: Idle,
183181
numWorkers: NumWorkers,
184182
readPreference: readpref.Primary(),
185183
partitionSizeInBytes: 400 * 1024 * 1024,
@@ -433,14 +431,7 @@ func (verifier *Verifier) getGeneration() (generation int, lastGeneration bool)
433431
}
434432

435433
func (verifier *Verifier) getGenerationWhileLocked() (int, bool) {
436-
437-
// As long as no other goroutine has locked the mux this will
438-
// usefully panic if the caller neglected the lock.
439-
wasUnlocked := verifier.mux.TryLock()
440-
if wasUnlocked {
441-
verifier.mux.Unlock()
442-
panic("getGenerationWhileLocked() while unlocked")
443-
}
434+
verifier.assertLocked()
444435

445436
return verifier.generation, verifier.lastGeneration
446437
}
@@ -1250,9 +1241,17 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection(
12501241
}
12511242

12521243
func (verifier *Verifier) GetVerificationStatus(ctx context.Context) (*VerificationStatus, error) {
1253-
taskCollection := verifier.verificationTaskCollection()
12541244
generation, _ := verifier.getGeneration()
12551245

1246+
return verifier.getVerificationStatusForGeneration(ctx, generation)
1247+
}
1248+
1249+
func (verifier *Verifier) getVerificationStatusForGeneration(
1250+
ctx context.Context,
1251+
generation int,
1252+
) (*VerificationStatus, error) {
1253+
taskCollection := verifier.verificationTaskCollection()
1254+
12561255
var results []bson.Raw
12571256

12581257
err := retry.New().WithCallback(
@@ -1391,18 +1390,6 @@ func (verifier *Verifier) StartServer() error {
13911390
return server.Run(context.Background())
13921391
}
13931392

1394-
func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) {
1395-
status, err := verifier.GetVerificationStatus(ctx)
1396-
if err != nil {
1397-
return Progress{Error: err}, err
1398-
}
1399-
return Progress{
1400-
Phase: verifier.phase,
1401-
Generation: verifier.generation,
1402-
Status: status,
1403-
}, nil
1404-
}
1405-
14061393
// Returned boolean indicates that namespaces are cached, and
14071394
// whatever needs them can proceed.
14081395
func (verifier *Verifier) ensureNamespaces(ctx context.Context) bool {

internal/verifier/progress.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package verifier
2+
3+
import "context"
4+
5+
func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) {
6+
verifier.mux.RLock()
7+
defer verifier.mux.RUnlock()
8+
9+
generation := verifier.generation
10+
11+
status, err := verifier.getVerificationStatusForGeneration(ctx, generation)
12+
if err != nil {
13+
return Progress{Error: err}, err
14+
}
15+
return Progress{
16+
Phase: verifier.getPhaseWhileLocked(),
17+
Generation: verifier.generation,
18+
Status: status,
19+
}, nil
20+
}
21+
22+
func (verifier *Verifier) getPhaseWhileLocked() string {
23+
verifier.assertLocked()
24+
25+
if !verifier.running {
26+
return Idle
27+
}
28+
29+
if verifier.generation > 0 {
30+
return Recheck
31+
}
32+
33+
return Check
34+
}

internal/verifier/sync.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package verifier
2+
3+
func (verifier *Verifier) assertLocked() {
4+
if verifier.mux.TryLock() {
5+
verifier.mux.Unlock()
6+
7+
panic("INTERNAL ERROR: verifier should have been locked!")
8+
}
9+
}

0 commit comments

Comments
 (0)