Batch Shard -> Index insertions#608
Conversation
Codecov Report
@@ Coverage Diff @@
## master #608 +/- ##
==========================================
- Coverage 81.4% 81.23% -0.17%
==========================================
Files 274 275 +1
Lines 24353 24575 +222
==========================================
+ Hits 19825 19964 +139
- Misses 3356 3413 +57
- Partials 1172 1198 +26
Continue to review full report at Codecov.
|
e76fda3 to
3819deb
Compare
There was a problem hiding this comment.
Maybe we can keep this signal now?
There was a problem hiding this comment.
for sure, we should.
There was a problem hiding this comment.
I'm really leaning towards using a RW lock here. It'll be 24 more bytes per shardEntry, but will so greatly reduce the complexity of the code that it's a no brainer. Only finished writing this version up to convince myself it's a terrible idea.
There was a problem hiding this comment.
@robskillington please could you give this a once over and see if i'm missing something more obvious.
ebab281 to
5aebd48
Compare
5aebd48 to
493098b
Compare
| for _, insert := range inserts { | ||
| insert.OnIndexSeries.OnIndexFinalize() | ||
| } | ||
| WriteBatchEntriesFinalizer(inserts).Finalize() |
There was a problem hiding this comment.
when you revert to the earlier version, would be good to retain this utility type - WriteBatchEntriesFinalizer
| for _, insert := range inserts { | ||
| insert.OnIndexSeries.OnIndexSuccess(b.endTime) | ||
| insert.OnIndexSeries.OnIndexFinalize() | ||
| var ( |
There was a problem hiding this comment.
can revert to the earlier batch api usage for this method
| } | ||
|
|
||
| // WriteBatchEntry captures a document to index, and the lifecycle hooks to call thereafter. | ||
| type WriteBatchEntry struct { |
There was a problem hiding this comment.
should retain the changes to this type
There was a problem hiding this comment.
might want to add on a ReceivedTime to capture e2e index latency with this struct too
| // Finalize finalizes all the references in the provided slice. | ||
| func (w WriteBatchEntriesFinalizer) Finalize() { | ||
| for _, entry := range w { | ||
| if entry.OnIndexSeries != nil { |
There was a problem hiding this comment.
need this check cause we set entries to the empty value if they don't need to be indexed (in index.go:InsertBatch)
| // based on the BlockStart field. | ||
| type WriteBatchEntryByBlockStart []WriteBatchEntry | ||
| // based on the Timestamp and ID fields. | ||
| type WriteBatchEntryByBlockStartAndID []WriteBatchEntry |
There was a problem hiding this comment.
can revert this to only sorting by blockstart
|
|
||
| // ForEachBlockStart iterates over the provided WriteBatchEntryByBlockStart, and calls `fn` on each | ||
| // ForEachIDFn is lambda to iterate over WriteBatchEntry(s) a single ID at a time. | ||
| type ForEachIDFn func(writes WriteBatchEntryByBlockStartAndID) |
|
|
||
| // ForEachID iterates over the provided WriteBatchEntryByBlockStartAndID, and calls `fn` on each | ||
| // group of elements with the same ID. | ||
| func (w WriteBatchEntryByBlockStartAndID) ForEachID(fn ForEachIDFn) { |
| if !futureLimit.After(timestamp) { | ||
| onIndexFn.OnIndexFinalize(blockStart) | ||
| entries[j] = emptyEntry // indicate we don't need to index this. | ||
| // TODO(prateek): capture that this needs to return m3dberrors.ErrTooFuture |
There was a problem hiding this comment.
can leave this around and convert to FOLLOWUP(prateek)
| if !pastLimit.Before(timestamp) { | ||
| onIndexFn.OnIndexFinalize(blockStart) | ||
| entries[j] = emptyEntry // indicate we don't need to index this. | ||
| // TODO(prateek): capture that this needs to return m3dberrors.ErrTooPast |
| defaultIndexBatchBackoff = time.Second | ||
| defaultIndexPerSecondLimit = 10000 | ||
| // TODO(prateek): undo this stuff | ||
| defaultIndexBatchBackoff = time.Millisecond |
There was a problem hiding this comment.
i say leave this around for now. they're sensible enough defaults. I'll come back and wire up to runtime/config in #604
| b.wg.Add(1) | ||
| for i := range b.inserts { | ||
| b.inserts[i] = nsIndexInsertZeroed | ||
| // TODO(prateek): if we start pooling `[]index.WriteBatchEntry`, then we could return to the pool here. |
There was a problem hiding this comment.
can change to a FOLLOWUP
| commitLogSeriesTags = entry.Series.Tags() | ||
| commitLogSeriesUniqueIndex = entry.Index | ||
| if err == nil && shouldReverseIndex { | ||
| if entry.NeedsIndexUpdate(s.reverseIndex.BlockStartForWriteTime(timestamp)) { |
There was a problem hiding this comment.
maybe add a comment that NeedsIndexUpdate has CAS semantics here so we don't change in an incompatible way later.
| wg.Wait() | ||
| if entry.IndexedForBlockStart(indexBlockStart) { | ||
| // i.e. indexing failed | ||
| return fmt.Errorf("internal error: unable to index series") |
There was a problem hiding this comment.
move to a const at the top of the file
|
|
||
| // i.e. we have the block and the inserts, perform the writes. | ||
| result, err := block.WriteBatch(inserts) | ||
|
|
There was a problem hiding this comment.
would be good to add the numDuplicates to the result type returned from the blockWriteBatch call, and a metric for it.
| } | ||
|
|
||
| func (b *dbBlock) stream(ctx context.Context) (xio.BlockReader, error) { | ||
| b.ctx.DependsOn(ctx) |
There was a problem hiding this comment.
Did we have a double DependsOn? I see one in Stream() too...that might be my bad
There was a problem hiding this comment.
I saw that, yeah wasn't 100% sure but it may have been a double. Either way we can get rid of that complexity with this change thankfully.
| b.retrieveID = nil | ||
| b.wasRetrievedFromDisk = false | ||
|
|
||
| b.ctx.RegisterFinalizer(&seg) |
There was a problem hiding this comment.
I see that you moved this to a synchronous call in resetRetrievableWithLock, but is that enough? Seems like it would never happen for the ResetRetrievable path
There was a problem hiding this comment.
Moved the RegisterFinalizer? We don't use the context anymore for finalization (block no longer has a context even) on the segment so this didn't get moved anywhere per se. Or do you mean something else?
| // the block may be closed before the underlying context is closed, which | ||
| // causes a deadlock if the block and the underlying context are closed | ||
| // from within the same goroutine. | ||
| b.opts.CloseContextWorkers().Go(b.ctx.BlockingClose) |
There was a problem hiding this comment.
Aren't we just leaking the ctx by removing this?
There was a problem hiding this comment.
I think the removed the context all together.
There was a problem hiding this comment.
Yup context is gone, we now just copy the bytes each time rather than depending on the caller's context and taking ref.
| // a lot cheaper than (1). | ||
| wg.Wait() | ||
|
|
||
| // Resort the batch by initial enqueue order |
| } | ||
| }) | ||
|
|
||
| // we sort the inserts by which block they're applicable for, and do the inserts |
There was a problem hiding this comment.
Comment would read better if it just started with "Sort the inserts..."
| return isIndexed | ||
| } | ||
|
|
||
| // NeedsIndexUpdate returns a bool to indicate if the Entry requires to be indexed |
There was a problem hiding this comment.
super nit, but "requires to be" sounds super weird to me. Presumably this comment was already here though
There was a problem hiding this comment.
It was, but I can update.
| // is going to be sent to the index, and other go routines should not attempt the | ||
| // same write. Callers are expected to ensure they follow this guideline. | ||
| // Further, every call to NeedsIndexUpdate which returns true needs to have a corresponding | ||
| // OnIndexFinalze() call. This is reqiured for correct lifecycle maintenance. |
| // finalized. | ||
| // Since series are purged so infrequently the overhead of not releasing | ||
| // back an ID to a pool is amortized over a long period of time. | ||
| clonedID := ident.BytesID(append([]byte(nil), id.Bytes()...)) |
There was a problem hiding this comment.
any reason to not call NoFinalize() here?
There was a problem hiding this comment.
ident.BytesID is a []byte, can't finalize it - https://github.com/m3db/m3x/blob/master/ident/bytes_id.go#L27-L55
Might be worth a comment indicating that here.
There was a problem hiding this comment.
Or better yet, just make the NoFinalize call. It'll get compiled away anyway
| // NewNoOpOptionsManager returns a no-op options manager that cannot | ||
| // be updated and does not spawn backround goroutines (useful for globals | ||
| // in test files). | ||
| func NewNoOpOptionsManager(opts Options) OptionsManager { |
There was a problem hiding this comment.
lol this came from annoyance eh?
There was a problem hiding this comment.
Yeah leaktest.(...) started going off 100% each test run due to a lingering goroutine from func init in the storage package (100% unrelated to the test) that registered a listener that never closes. To avoid that weirdness put this together.
There was a problem hiding this comment.
much appreciated. i've always chased the different leaks, this is cleaner
| t := tags.Current() | ||
| clone.Append(s.identifierPool.CloneTag(t)) | ||
|
|
||
| // NB(r): Optimization for workloads that embed the tags in the ID is to |
There was a problem hiding this comment.
I still can't decide if this is brilliant or terrible.
There was a problem hiding this comment.
Heheh, talked with Richie about it and agreed we should flag this with config but default it to on.
| now := time.Now().Truncate(time.Hour) | ||
| tn := func(n int64) xtime.UnixNano { | ||
| return xtime.ToUnixNano(now.Add(time.Duration(n) * time.Hour)) | ||
| func TestWriteBatchForEachUnmarkedBatchByBlockStart(t *testing.T) { |
There was a problem hiding this comment.
+1 for these tests
| b.SortByUnmarkedAndIndexBlockStart() | ||
|
|
||
| // What we do is a little funky but least alloc intensive, essentially we mutate | ||
| // this batch and then restore the pointers to the original docs after. |
There was a problem hiding this comment.
nit: could you add a comment to ForEachWriteBatchByBlockStartFn indicating that this will break if fn does async operations on the batch
There was a problem hiding this comment.
Good call, will do.
| lastNanos = elem.BlockStart | ||
| // We only want to call the the ForEachBlockStartFn once we have calculated the entire group, | ||
| for i := range allEntries { | ||
| if allEntries[i].OnIndexSeries == nil { |
There was a problem hiding this comment.
making sure i understand this - you can early terminate here because the sort above guarantees if the first entry is done, then all are done, yea?
| // spill over | ||
| if startIdx < len(w) { | ||
| fn(w[startIdx].BlockStart, w[startIdx:]) | ||
| if startIdx < len(allEntries) { |
There was a problem hiding this comment.
hm can you do this spill over un-conditionally? can't there be marked success entries in the back?
There was a problem hiding this comment.
So you only exit the loop if you haven't hit a "done" element yet, which means all the remaining entries haven't been marked for error or success yet (thanks to the sort order). I can add a comment to this effect perhaps?
| // by index block start time. | ||
| func (b *WriteBatch) SortByUnmarkedAndIndexBlockStart() { | ||
| b.sortBy = writeBatchSortByUnmarkedAndBlockStart | ||
| sort.Stable(b) |
| // MarkUnmarkedEntriesError marks all unmarked entries as error. | ||
| func (b *WriteBatch) MarkUnmarkedEntriesError(err error) { | ||
| for idx := range b.entries { | ||
| if b.entries[idx].OnIndexSeries != nil { |
There was a problem hiding this comment.
hm, can you reuse the method below, i.e.
func (b *WriteBatch) MarkUnmarkedEntriesError(err error) {
for idx := range b.entries {
b.MarkUnmarkedEntryError(err, idx)
}
}
There was a problem hiding this comment.
Sure thing, sounds good.
| if b.ctx != nil { | ||
| b.ctx.RegisterCloser(encoder) | ||
| } | ||
| encoder.Close() |
There was a problem hiding this comment.
do you need to take ctx as an arg in these methods?
There was a problem hiding this comment.
When we call encoder.Stream() we take a copy of the bytes and pass back a new Segment wrapping them, so don't need to involve ctx at all no, once we have the write lock on the series (which we have here) its safe to close.
| return xio.EmptyBlockReader, errReadFromClosedBlock | ||
| } | ||
|
|
||
| b.ctx.DependsOn(blocker) |
There was a problem hiding this comment.
this method too, can you just drop the context arg?
There was a problem hiding this comment.
We need it for if it goes to the retriever (which takes a ctx to register some finalizations).
| } | ||
|
|
||
| // NB: this function is called by the namespaceIndexInsertQueue. | ||
| // WriteBatches is called by the indexInsertQueue. |
There was a problem hiding this comment.
could you drop the FOLLOWUP note below this line, considering you just did it :D
| } | ||
| if err != nil { | ||
| i.logger.Errorf("unable to write to index, dropping inserts. [%v]", err) | ||
| i.logger.Errorf("error writing to index block: %v", err) |
There was a problem hiding this comment.
super nit: could you filter out any ErrDuplicateID from here?
There was a problem hiding this comment.
I see you are doing this =]
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/fortytw2/leaktest" |
| "time" | ||
|
|
||
| xtime "github.com/m3db/m3x/time" | ||
| "github.com/stretchr/testify/require" |
|
LGTM |
Pending:
These can probably be follow ups:
misc
storage/?don't think so because one has ident's and the other docs