Skip to content

Commit 00ea91e

Browse files
authored
REP-6875 Make ChangeReader distinguish last-handled & last operation times. (#194)
Previously the ChangeReader exposed only the reader lag, but external tooling needs to track not merely the lag but also the reader’s exact last-handled timestamp. This changeset reworks the ChangeReader accordingly so that we can add that information to the /progress endpoint in a later PR. (This is an internal-only change that will facilitate further PRs for this JIRA ticket.
1 parent 90ea8b2 commit 00ea91e

File tree

4 files changed

+35
-17
lines changed

4 files changed

+35
-17
lines changed

internal/verifier/change_reader.go

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,25 @@ const (
3131
changeReaderCollectionName = "changeReader"
3232
)
3333

34+
type readerCurrentTimes struct {
35+
LastHandledTime bson.Timestamp `json:"lastHandledTime"`
36+
LastOperationTime bson.Timestamp `json:"lastOperationTime"`
37+
}
38+
39+
func (rp readerCurrentTimes) Lag() time.Duration {
40+
return time.Second * time.Duration(
41+
int(rp.LastOperationTime.T)-int(rp.LastHandledTime.T),
42+
)
43+
}
44+
3445
type changeReader interface {
3546
getWhichCluster() whichCluster
3647
getReadChannel() <-chan changeEventBatch
3748
getEventRecorder() *EventRecorder
3849
getStartTimestamp() bson.Timestamp
3950
getLastSeenClusterTime() option.Option[bson.Timestamp]
4051
getEventsPerSecond() option.Option[float64]
41-
getLag() option.Option[time.Duration]
52+
getCurrentTimes() option.Option[readerCurrentTimes]
4253
getBufferSaturation() float64
4354
setWritesOff(bson.Timestamp)
4455
start(context.Context, *errgroup.Group) error
@@ -67,9 +78,10 @@ type ChangeReaderCommon struct {
6778

6879
lastChangeEventTime *msync.TypedAtomic[option.Option[bson.Timestamp]]
6980

81+
currentTimes *msync.TypedAtomic[option.Option[readerCurrentTimes]]
82+
7083
startAtTs *bson.Timestamp
7184

72-
lag *msync.TypedAtomic[option.Option[time.Duration]]
7385
batchSizeHistory *history.History[int]
7486

7587
onDDLEvent ddlEventHandling
@@ -81,7 +93,7 @@ func newChangeReaderCommon(clusterName whichCluster) ChangeReaderCommon {
8193
changeEventBatchChan: make(chan changeEventBatch, batchChanBufferSize),
8294
eventRecorder: NewEventRecorder(),
8395
writesOffTs: util.NewEventual[bson.Timestamp](),
84-
lag: msync.NewTypedAtomic(option.None[time.Duration]()),
96+
currentTimes: msync.NewTypedAtomic(option.None[readerCurrentTimes]()),
8597
lastChangeEventTime: msync.NewTypedAtomic(option.None[bson.Timestamp]()),
8698
batchSizeHistory: history.New[int](time.Minute),
8799
onDDLEvent: lo.Ternary(
@@ -131,10 +143,8 @@ func (rc *ChangeReaderCommon) getBufferSaturation() float64 {
131143
return util.DivideToF64(len(rc.changeEventBatchChan), cap(rc.changeEventBatchChan))
132144
}
133145

134-
// getLag returns the observed change stream lag (i.e., the delta between
135-
// cluster time and the most-recently-seen change event).
136-
func (rc *ChangeReaderCommon) getLag() option.Option[time.Duration] {
137-
return rc.lag.Load()
146+
func (rc *ChangeReaderCommon) getCurrentTimes() option.Option[readerCurrentTimes] {
147+
return rc.currentTimes.Load()
138148
}
139149

140150
// getEventsPerSecond returns the number of change events per second we’ve been
@@ -230,11 +240,19 @@ func (rc *ChangeReaderCommon) loadResumeToken(ctx context.Context) (option.Optio
230240
return option.Some(token), nil
231241
}
232242

233-
func (rc *ChangeReaderCommon) updateLag(sess *mongo.Session, token bson.Raw) {
243+
func (rc *ChangeReaderCommon) updateTimes(sess *mongo.Session, token bson.Raw) {
234244
tokenTs, err := rc.resumeTokenTSExtractor(token)
235245
if err == nil {
236-
lagSecs := int64(sess.OperationTime().T) - int64(tokenTs.T)
237-
rc.lag.Store(option.Some(time.Second * time.Duration(lagSecs)))
246+
opTime := sess.OperationTime()
247+
248+
if opTime == nil {
249+
panic("session operationTime is nil … did this get called prematurely?")
250+
}
251+
252+
rc.currentTimes.Store(option.Some(readerCurrentTimes{
253+
LastHandledTime: tokenTs,
254+
LastOperationTime: *opTime,
255+
}))
238256
} else {
239257
rc.logger.Warn().
240258
Err(err).

internal/verifier/change_stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
225225
eventsRead++
226226
}
227227

228-
csr.updateLag(sess, cs.ResumeToken())
228+
csr.updateTimes(sess, cs.ResumeToken())
229229

230230
if eventsRead == 0 {
231231
ri.NoteSuccess("received an empty change stream response")

internal/verifier/change_stream_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamLag() {
636636
verifierRunner.AwaitGenerationEnd(),
637637
)
638638

639-
return verifier.srcChangeReader.getLag().IsSome()
639+
return verifier.srcChangeReader.getCurrentTimes().IsSome()
640640
},
641641
time.Minute,
642642
100*time.Millisecond,
@@ -645,7 +645,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamLag() {
645645
// NB: The lag will include whatever time elapsed above before
646646
// verifier read the event, so it can be several seconds.
647647
suite.Assert().Less(
648-
verifier.srcChangeReader.getLag().MustGet(),
648+
verifier.srcChangeReader.getCurrentTimes().MustGet().Lag(),
649649
10*time.Minute,
650650
"verifier lag is as expected",
651651
)

internal/verifier/summary.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -586,10 +586,10 @@ func (verifier *Verifier) printChangeEventStatistics(builder io.Writer) {
586586
if eventsPerSec, has := cluster.csReader.getEventsPerSecond().Get(); has {
587587
var lagNote string
588588

589-
lag, hasLag := cluster.csReader.getLag().Get()
589+
prog, hasProg := cluster.csReader.getCurrentTimes().Get()
590590

591-
if hasLag {
592-
lagNote = fmt.Sprintf("lag: %s; ", reportutils.DurationToHMS(lag))
591+
if hasProg {
592+
lagNote = fmt.Sprintf("lag: %s; ", reportutils.DurationToHMS(prog.Lag()))
593593
}
594594

595595
saturation := cluster.csReader.getBufferSaturation()
@@ -603,7 +603,7 @@ func (verifier *Verifier) printChangeEventStatistics(builder io.Writer) {
603603
reportutils.FmtReal(100*saturation),
604604
)
605605

606-
if hasLag && lag > lagWarnThreshold {
606+
if hasProg && prog.Lag() > lagWarnThreshold {
607607
fmt.Fprint(
608608
builder,
609609
"⚠️ Lag is excessive. Verification may fail. See documentation.\n",

0 commit comments

Comments
 (0)