Skip to content

Batch Shard -> Index insertions#608

Merged
robskillington merged 37 commits intomasterfrom
prateek/index/rejig-queuing
May 16, 2018
Merged

Batch Shard -> Index insertions#608
robskillington merged 37 commits intomasterfrom
prateek/index/rejig-queuing

Conversation

@prateek
Copy link
Copy Markdown
Collaborator

@prateek prateek commented May 9, 2018

  • Batch shard -> index insertions to minimise lock contention

Pending:

  • emit metric for e2e indexing latency
  • emit metric for number of duplicates found during indexing
  • capture all errors from bach insert, filter any duplicate insert warnings
  • fix tests
  • more tests for inc/def ref guarantees - prop/unit/integration

These can probably be follow ups:

  • piping partial error back from index insert back to shard
  • []index.WriteBatchEntry pooling?

misc

  • can we use a single type instead of index.WriteBatchEntry and the other one in storage/?
    don't think so because one has ident's and the other docs

@codecov
Copy link
Copy Markdown

codecov Bot commented May 9, 2018

Codecov Report

Merging #608 into master will decrease coverage by 0.16%.
The diff coverage is 86.7%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #608      +/-   ##
==========================================
- Coverage    81.4%   81.23%   -0.17%     
==========================================
  Files         274      275       +1     
  Lines       24353    24575     +222     
==========================================
+ Hits        19825    19964     +139     
- Misses       3356     3413      +57     
- Partials     1172     1198      +26
Flag Coverage Δ
#coordinator 67.17% <ø> (ø) ⬆️
#db 82.18% <86.7%> (-0.2%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update b03695a...8dcc850. Read the comment docs.

@prateek prateek force-pushed the prateek/index/rejig-queuing branch 2 times, most recently from e76fda3 to 3819deb Compare May 11, 2018 05:22
Comment thread storage/index.go Outdated
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can keep this signal now?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah makes sense

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for sure, we should.

Comment thread storage/shard.go Outdated
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm really leaning towards using a RW lock here. It'll be 24 more bytes per shardEntry, but will so greatly reduce the complexity of the code that it's a no brainer. Only finished writing this version up to convince myself it's a terrible idea.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robskillington please could you give this a once over and see if i'm missing something more obvious.

@prateek prateek force-pushed the prateek/index/rejig-queuing branch from ebab281 to 5aebd48 Compare May 12, 2018 20:20
@prateek prateek force-pushed the prateek/index/rejig-queuing branch from 5aebd48 to 493098b Compare May 12, 2018 20:21
Comment thread storage/index/block.go Outdated
for _, insert := range inserts {
insert.OnIndexSeries.OnIndexFinalize()
}
WriteBatchEntriesFinalizer(inserts).Finalize()
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when you revert to the earlier version, would be good to retain this utility type - WriteBatchEntriesFinalizer

Comment thread storage/index/block.go Outdated
for _, insert := range inserts {
insert.OnIndexSeries.OnIndexSuccess(b.endTime)
insert.OnIndexSeries.OnIndexFinalize()
var (
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can revert to the earlier batch api usage for this method

Comment thread storage/index/types.go Outdated
}

// WriteBatchEntry captures a document to index, and the lifecycle hooks to call thereafter.
type WriteBatchEntry struct {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should retain the changes to this type

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might want to add on a ReceivedTime to capture e2e index latency with this struct too

Comment thread storage/index/types.go Outdated
// Finalize finalizes all the references in the provided slice.
func (w WriteBatchEntriesFinalizer) Finalize() {
for _, entry := range w {
if entry.OnIndexSeries != nil {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need this check cause we set entries to the empty value if they don't need to be indexed (in index.go:InsertBatch)

Comment thread storage/index/types.go Outdated
// based on the BlockStart field.
type WriteBatchEntryByBlockStart []WriteBatchEntry
// based on the Timestamp and ID fields.
type WriteBatchEntryByBlockStartAndID []WriteBatchEntry
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can revert this to only sorting by blockstart

Comment thread storage/index/types.go Outdated

// ForEachBlockStart iterates over the provided WriteBatchEntryByBlockStart, and calls `fn` on each
// ForEachIDFn is lambda to iterate over WriteBatchEntry(s) a single ID at a time.
type ForEachIDFn func(writes WriteBatchEntryByBlockStartAndID)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can delete this

Comment thread storage/index/types.go Outdated

// ForEachID iterates over the provided WriteBatchEntryByBlockStartAndID, and calls `fn` on each
// group of elements with the same ID.
func (w WriteBatchEntryByBlockStartAndID) ForEachID(fn ForEachIDFn) {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can delete this

Comment thread storage/index.go Outdated
if !futureLimit.After(timestamp) {
onIndexFn.OnIndexFinalize(blockStart)
entries[j] = emptyEntry // indicate we don't need to index this.
// TODO(prateek): capture that this needs to return m3dberrors.ErrTooFuture
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can leave this around and convert to FOLLOWUP(prateek)

Comment thread storage/index.go Outdated
if !pastLimit.Before(timestamp) {
onIndexFn.OnIndexFinalize(blockStart)
entries[j] = emptyEntry // indicate we don't need to index this.
// TODO(prateek): capture that this needs to return m3dberrors.ErrTooPast
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here too

defaultIndexBatchBackoff = time.Second
defaultIndexPerSecondLimit = 10000
// TODO(prateek): undo this stuff
defaultIndexBatchBackoff = time.Millisecond
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i say leave this around for now. they're sensible enough defaults. I'll come back and wire up to runtime/config in #604

b.wg.Add(1)
for i := range b.inserts {
b.inserts[i] = nsIndexInsertZeroed
// TODO(prateek): if we start pooling `[]index.WriteBatchEntry`, then we could return to the pool here.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can change to a FOLLOWUP

Comment thread storage/shard.go
commitLogSeriesTags = entry.Series.Tags()
commitLogSeriesUniqueIndex = entry.Index
if err == nil && shouldReverseIndex {
if entry.NeedsIndexUpdate(s.reverseIndex.BlockStartForWriteTime(timestamp)) {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a comment that NeedsIndexUpdate has CAS semantics here so we don't change in an incompatible way later.

Comment thread storage/shard.go
wg.Wait()
if entry.IndexedForBlockStart(indexBlockStart) {
// i.e. indexing failed
return fmt.Errorf("internal error: unable to index series")
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to a const at the top of the file

Comment thread storage/index.go

// i.e. we have the block and the inserts, perform the writes.
result, err := block.WriteBatch(inserts)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be good to add the numDuplicates to the result type returned from the blockWriteBatch call, and a metric for it.

@robskillington robskillington changed the title [WIP] Batch Shard -> Index insertions Batch Shard -> Index insertions May 15, 2018
Comment thread storage/block/block.go
}

func (b *dbBlock) stream(ctx context.Context) (xio.BlockReader, error) {
b.ctx.DependsOn(ctx)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we have a double DependsOn? I see one in Stream() too...that might be my bad

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw that, yeah wasn't 100% sure but it may have been a double. Either way we can get rid of that complexity with this change thankfully.

Comment thread storage/block/block.go
b.retrieveID = nil
b.wasRetrievedFromDisk = false

b.ctx.RegisterFinalizer(&seg)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that you moved this to a synchronous call in resetRetrievableWithLock, but is that enough? Seems like it would never happen for the ResetRetrievable path

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the RegisterFinalizer? We don't use the context anymore for finalization (block no longer has a context even) on the segment so this didn't get moved anywhere per se. Or do you mean something else?

Comment thread storage/block/block.go
// the block may be closed before the underlying context is closed, which
// causes a deadlock if the block and the underlying context are closed
// from within the same goroutine.
b.opts.CloseContextWorkers().Go(b.ctx.BlockingClose)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't we just leaking the ctx by removing this?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the removed the context all together.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup context is gone, we now just copy the bytes each time rather than depending on the caller's context and taking ref.

Comment thread storage/index.go Outdated
// a lot cheaper than (1).
wg.Wait()

// Resort the batch by initial enqueue order
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-sort

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ta, will update.

Comment thread storage/index.go Outdated
}
})

// we sort the inserts by which block they're applicable for, and do the inserts
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment would read better if it just started with "Sort the inserts..."

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing.

Comment thread storage/series/lookup/entry.go Outdated
return isIndexed
}

// NeedsIndexUpdate returns a bool to indicate if the Entry requires to be indexed
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit, but "requires to be" sounds super weird to me. Presumably this comment was already here though

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was, but I can update.

Comment thread storage/series/lookup/entry.go Outdated
// is going to be sent to the index, and other go routines should not attempt the
// same write. Callers are expected to ensure they follow this guideline.
// Further, every call to NeedsIndexUpdate which returns true needs to have a corresponding
// OnIndexFinalze() call. This is reqiured for correct lifecycle maintenance.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

required*

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, ta.

Comment thread storage/shard.go
// finalized.
// Since series are purged so infrequently the overhead of not releasing
// back an ID to a pool is amortized over a long period of time.
clonedID := ident.BytesID(append([]byte(nil), id.Bytes()...))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason to not call NoFinalize() here?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ident.BytesID is a []byte, can't finalize it - https://github.com/m3db/m3x/blob/master/ident/bytes_id.go#L27-L55

Might be worth a comment indicating that here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or better yet, just make the NoFinalize call. It'll get compiled away anyway

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing.

// NewNoOpOptionsManager returns a no-op options manager that cannot
// be updated and does not spawn backround goroutines (useful for globals
// in test files).
func NewNoOpOptionsManager(opts Options) OptionsManager {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol this came from annoyance eh?

Copy link
Copy Markdown
Collaborator

@robskillington robskillington May 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah leaktest.(...) started going off 100% each test run due to a lingering goroutine from func init in the storage package (100% unrelated to the test) that registered a listener that never closes. To avoid that weirdness put this together.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

much appreciated. i've always chased the different leaks, this is cleaner

Comment thread storage/shard.go Outdated
t := tags.Current()
clone.Append(s.identifierPool.CloneTag(t))

// NB(r): Optimization for workloads that embed the tags in the ID is to
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still can't decide if this is brilliant or terrible.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heheh, talked with Richie about it and agreed we should flag this with config but default it to on.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add this.

now := time.Now().Truncate(time.Hour)
tn := func(n int64) xtime.UnixNano {
return xtime.ToUnixNano(now.Add(time.Duration(n) * time.Hour))
func TestWriteBatchForEachUnmarkedBatchByBlockStart(t *testing.T) {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for these tests

Comment thread storage/index/types.go
b.SortByUnmarkedAndIndexBlockStart()

// What we do is a little funky but least alloc intensive, essentially we mutate
// this batch and then restore the pointers to the original docs after.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you add a comment to ForEachWriteBatchByBlockStartFn indicating that this will break if fn does async operations on the batch

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, will do.

Comment thread storage/index/types.go
lastNanos = elem.BlockStart
// We only want to call the the ForEachBlockStartFn once we have calculated the entire group,
for i := range allEntries {
if allEntries[i].OnIndexSeries == nil {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

making sure i understand this - you can early terminate here because the sort above guarantees if the first entry is done, then all are done, yea?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right yeah.

Comment thread storage/index/types.go
// spill over
if startIdx < len(w) {
fn(w[startIdx].BlockStart, w[startIdx:])
if startIdx < len(allEntries) {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm can you do this spill over un-conditionally? can't there be marked success entries in the back?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you only exit the loop if you haven't hit a "done" element yet, which means all the remaining entries haven't been marked for error or success yet (thanks to the sort order). I can add a comment to this effect perhaps?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep sounds good

Comment thread storage/index/types.go
// by index block start time.
func (b *WriteBatch) SortByUnmarkedAndIndexBlockStart() {
b.sortBy = writeBatchSortByUnmarkedAndBlockStart
sort.Stable(b)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment thread storage/index/types.go Outdated
// MarkUnmarkedEntriesError marks all unmarked entries as error.
func (b *WriteBatch) MarkUnmarkedEntriesError(err error) {
for idx := range b.entries {
if b.entries[idx].OnIndexSeries != nil {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, can you reuse the method below, i.e.

func (b *WriteBatch) MarkUnmarkedEntriesError(err error) {
  for idx := range b.entries {
    b.MarkUnmarkedEntryError(err, idx)
  }
}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing, sounds good.

Comment thread storage/series/buffer.go
if b.ctx != nil {
b.ctx.RegisterCloser(encoder)
}
encoder.Close()
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need to take ctx as an arg in these methods?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we call encoder.Stream() we take a copy of the bytes and pass back a new Segment wrapping them, so don't need to involve ctx at all no, once we have the write lock on the series (which we have here) its safe to close.

Comment thread storage/block/block.go
return xio.EmptyBlockReader, errReadFromClosedBlock
}

b.ctx.DependsOn(blocker)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method too, can you just drop the context arg?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need it for if it goes to the retriever (which takes a ctx to register some finalizations).

Comment thread storage/index.go
}

// NB: this function is called by the namespaceIndexInsertQueue.
// WriteBatches is called by the indexInsertQueue.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you drop the FOLLOWUP note below this line, considering you just did it :D

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing.

Comment thread storage/index.go
}
if err != nil {
i.logger.Errorf("unable to write to index, dropping inserts. [%v]", err)
i.logger.Errorf("error writing to index block: %v", err)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: could you filter out any ErrDuplicateID from here?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you are doing this =]

"testing"
"time"

"github.com/fortytw2/leaktest"
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: import order

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing.

"time"

xtime "github.com/m3db/m3x/time"
"github.com/stretchr/testify/require"
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: import order

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing.

@prateek
Copy link
Copy Markdown
Collaborator Author

prateek commented May 16, 2018

LGTM

@robskillington robskillington merged commit c8c2d0d into master May 16, 2018
@robskillington robskillington deleted the prateek/index/rejig-queuing branch May 16, 2018 05:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants