Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
2200131
Batch Shard -> Index insertions
prateek May 9, 2018
e06de11
use a batch of batches in the index insert queue
prateek May 10, 2018
2c503fd
delay ident -> doc conversion
prateek May 11, 2018
b8f118a
Track index write attempt state within dbShardEntry
prateek May 12, 2018
493098b
Re-work dbShardEntry
prateek May 12, 2018
1191750
fix some tests and the startTime v endTime issue
prateek May 12, 2018
dfc4c6a
Pull m3x and use new Tags type
May 13, 2018
555a3fb
Fix more test build errors
May 13, 2018
a3307fb
Revert unnecessary glide.lock changes
May 13, 2018
60b3d01
Fix integration tests build errors
May 13, 2018
99af4ef
Fix further test build errors
May 13, 2018
9465d94
Fix big unit tests
May 13, 2018
7f6be88
Finalize just tags on bootstrap
May 13, 2018
b66f6a4
Fix config test and remove unused consts
May 13, 2018
2ecd592
Merge branch 'master' into r/use-tags-nofinalize
May 13, 2018
68cbc46
Merge branch 'r/use-tags-nofinalize' into prateek/index/rejig-queuing
May 13, 2018
42933bc
Use new WriteBatch type to flow index batch insertion without realloc
May 13, 2018
689eee6
Merge branch 'master' into prateek/index/rejig-queuing
May 13, 2018
a7a3944
Restore the logging of indexing errors
May 13, 2018
716cdd7
Fix detecting if indexing succeeded or failed
May 14, 2018
d7b89dd
Encode commit log metadata before queueing commit log write
May 14, 2018
7c5363b
Finalize the encoded tags buffer after writing commit log entry
May 14, 2018
1352f2d
Revert "Finalize the encoded tags buffer after writing commit log entry"
May 14, 2018
3cbbd1f
Revert "Encode commit log metadata before queueing commit log write"
May 14, 2018
a6998c7
Fix the ForEachUnmarkedBatchByBlockStart logic during blocko rotation
May 14, 2018
27b9e34
Purge merged buffer encoders and bootstrap blocks
May 14, 2018
b5991f2
Remove depends on pattern from database block
May 14, 2018
526375d
Check buffers are not nil before blindly appending
May 14, 2018
7258d1c
Use slices into cloned ID when creating tags for new series
May 15, 2018
c2d48e5
Update m3x
May 15, 2018
8cf6104
Merge branch 'master' into prateek/index/rejig-queuing
May 15, 2018
1318f4b
Fix a lot of tests, still some not quite there
May 15, 2018
7ab107e
Fix remaining tests
May 15, 2018
e136525
Fix build error
May 15, 2018
43645e8
Fix final test build error and metalint issues
May 15, 2018
bdbbb6c
Remove debugging statements
May 15, 2018
8dcc850
Address feedback
May 16, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions client/host_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func (q *queue) asyncTaggedWrite(
// TODO(r): Use a worker pool to avoid creating new go routines for async writes
go func() {
req := q.writeTaggedBatchRawRequestPool.Get()
req.NameSpace = namespace.Data().Bytes()
req.NameSpace = namespace.Bytes()
req.Elements = elems

// NB(r): Defer is slow in the hot path unfortunately
Expand Down Expand Up @@ -351,7 +351,7 @@ func (q *queue) asyncWrite(
// TODO(r): Use a worker pool to avoid creating new go routines for async writes
go func() {
req := q.writeBatchRawRequestPool.Get()
req.NameSpace = namespace.Data().Bytes()
req.NameSpace = namespace.Bytes()
req.Elements = elems

// NB(r): Defer is slow in the hot path unfortunately
Expand Down
18 changes: 9 additions & 9 deletions client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,7 @@ func (s *session) writeAttemptWithRLock(
wop := s.pools.writeOperation.Get()
wop.namespace = nsID
wop.shardID = s.state.topoMap.ShardSet().Lookup(tsID)
wop.request.ID = tsID.Data().Bytes()
wop.request.ID = tsID.Bytes()
wop.request.Datapoint.Value = value
wop.request.Datapoint.Timestamp = timestamp
wop.request.Datapoint.TimestampTimeType = timeType
Expand All @@ -986,7 +986,7 @@ func (s *session) writeAttemptWithRLock(
wop := s.pools.writeTaggedOperation.Get()
wop.namespace = nsID
wop.shardID = s.state.topoMap.ShardSet().Lookup(tsID)
wop.request.ID = tsID.Data().Bytes()
wop.request.ID = tsID.Bytes()
encodedTagBytes, ok := tagEncoder.Data()
if !ok {
return nil, 0, 0, errUnableToEncodeTags
Expand Down Expand Up @@ -1445,7 +1445,7 @@ func (s *session) fetchIDsAttempt(
}

// Append IDWithNamespace to this request
f.append(namespace.Data().Bytes(), tsID.Data().Bytes(), completionFn)
f.append(namespace.Bytes(), tsID.Bytes(), completionFn)
}); err != nil {
routeErr = err
break
Expand Down Expand Up @@ -1576,7 +1576,7 @@ func (s *session) Truncate(namespace ident.ID) (int64, error) {
)

t := &truncateOp{}
t.request.NameSpace = namespace.Data().Bytes()
t.request.NameSpace = namespace.Bytes()
t.completionFn = func(result interface{}, err error) {
if err != nil {
resultErrLock.Lock()
Expand Down Expand Up @@ -2044,7 +2044,7 @@ func (s *session) streamBlocksMetadataFromPeer(
attemptFn := func(client rpc.TChanNode) error {
tctx, _ := thrift.NewContext(s.streamBlocksMetadataBatchTimeout)
req := rpc.NewFetchBlocksMetadataRawRequest()
req.NameSpace = namespace.Data().Bytes()
req.NameSpace = namespace.Bytes()
req.Shard = int32(shard)
req.RangeStart = start.UnixNano()
req.RangeEnd = end.UnixNano()
Expand Down Expand Up @@ -2209,7 +2209,7 @@ func (s *session) streamBlocksMetadataFromPeerV2(
attemptFn := func(client rpc.TChanNode) error {
tctx, _ := thrift.NewContext(s.streamBlocksMetadataBatchTimeout)
req := rpc.NewFetchBlocksMetadataRawV2Request()
req.NameSpace = namespace.Data().Bytes()
req.NameSpace = namespace.Bytes()
req.Shard = int32(shard)
req.RangeStart = start.UnixNano()
req.RangeEnd = end.UnixNano()
Expand Down Expand Up @@ -2808,7 +2808,7 @@ func (s *session) streamBlocksBatchFromPeer(
retention = ropts.RetentionPeriod()
earliestBlockStart = nowFn().Add(-retention).Truncate(ropts.BlockSize())
)
req.NameSpace = namespaceMetadata.ID().Data().Bytes()
req.NameSpace = namespaceMetadata.ID().Bytes()
req.Shard = int32(shard)
req.Elements = make([]*rpc.FetchBlocksRawRequestElement, 0, len(batch))
for i := range batch {
Expand All @@ -2817,7 +2817,7 @@ func (s *session) streamBlocksBatchFromPeer(
continue // Fell out of retention while we were streaming blocks
}
req.Elements = append(req.Elements, &rpc.FetchBlocksRawRequestElement{
ID: batch[i].id.Data().Bytes(),
ID: batch[i].id.Bytes(),
Starts: []int64{blockStart.UnixNano()},
})
reqBlocksLen++
Expand Down Expand Up @@ -2871,7 +2871,7 @@ func (s *session) streamBlocksBatchFromPeer(
}

id := batch[i].id
if !bytes.Equal(id.Data().Bytes(), result.Elements[i].ID) {
if !bytes.Equal(id.Bytes(), result.Elements[i].ID) {
blocksErr := fmt.Errorf(
"stream blocks mismatched ID: expectedID=%s, actualID=%s, indexID=%d, peer=%s",
batch[i].id.String(), id.String(), i, peer.Host().String(),
Expand Down
23 changes: 16 additions & 7 deletions client/session_fetch_bulk_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"bytes"
"fmt"
"io"
"io/ioutil"
"math"
"sort"
"sync"
Expand Down Expand Up @@ -714,7 +715,7 @@ func assertFetchBlocksFromPeersResult(
seg, err := stream.Segment()
require.NoError(t, err)

actualData := append(seg.Head.Bytes(), seg.Tail.Bytes()...)
actualData := append(bytesFor(seg.Head), bytesFor(seg.Tail)...)

// compare actual v expected data
if len(expectedData) != len(actualData) {
Expand Down Expand Up @@ -1716,8 +1717,9 @@ func TestBlocksResultAddBlockFromPeerReadMerged(t *testing.T) {
require.NoError(t, err)

// Assert block has data
assert.Equal(t, []byte{1, 2}, seg.Head.Bytes())
assert.Equal(t, []byte{3}, seg.Tail.Bytes())
data, err := ioutil.ReadAll(xio.NewSegmentReader(seg))
require.NoError(t, err)
assert.Equal(t, []byte{1, 2, 3}, data)
}

func TestBlocksResultAddBlockFromPeerReadUnmerged(t *testing.T) {
Expand Down Expand Up @@ -2107,7 +2109,7 @@ func expectFetchMetadataAndReturn(
)
for j := beginIdx; j < len(result) && j < beginIdx+batchSize; j++ {
elem := &rpc.BlocksMetadata{}
elem.ID = result[j].id.Data().Bytes()
elem.ID = result[j].id.Bytes()
for k := 0; k < len(result[j].blocks); k++ {
bl := &rpc.BlockMetadata{}
bl.Start = result[j].blocks[k].start.UnixNano()
Expand Down Expand Up @@ -2159,7 +2161,7 @@ func expectFetchMetadataAndReturnV2(
beginIdx = i * batchSize
)
for j := beginIdx; j < len(result) && j < beginIdx+batchSize; j++ {
id := result[j].id.Data().Bytes()
id := result[j].id.Bytes()
for k := 0; k < len(result[j].blocks); k++ {
bl := &rpc.BlockMetadataV2{}
bl.ID = id
Expand Down Expand Up @@ -2311,7 +2313,7 @@ func expectFetchBlocksAndReturn(
ret := &rpc.FetchBlocksRawResult_{}
for _, res := range result[i] {
blocks := &rpc.Blocks{}
blocks.ID = res.id.Data().Bytes()
blocks.ID = res.id.Bytes()
for j := range res.blocks {
bl := &rpc.Block{}
bl.Start = res.blocks[j].start.UnixNano()
Expand Down Expand Up @@ -2468,7 +2470,7 @@ func assertFetchBootstrapBlocksResult(
require.NoError(t, err)
seg, err := stream.Segment()
require.NoError(t, err)
actualData := append(seg.Head.Bytes(), seg.Tail.Bytes()...)
actualData := append(bytesFor(seg.Head), bytesFor(seg.Tail)...)
assert.Equal(t, expectedData, actualData)
} else if block.segments.unmerged != nil {
assert.Fail(t, "unmerged comparison not supported")
Expand All @@ -2477,6 +2479,13 @@ func assertFetchBootstrapBlocksResult(
}
}

func bytesFor(data checked.Bytes) []byte {
if data == nil {
return nil
}
return data.Bytes()
}

func assertEnqueueChannel(
t *testing.T,
expected []receivedBlockMetadata,
Expand Down
9 changes: 5 additions & 4 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions glide.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package: github.com/m3db/m3db
import:
- package: github.com/m3db/m3x
version: df82cd7aacfd8439586821ecd6e073da35bfbb02
version: 80455550b18244f71637a568f097ae929748e5be
vcs: git
subpackages:
- checked
Expand All @@ -26,7 +26,7 @@ import:
version: ed532baee45a440f0b08b6893c816634c6978d4d

- package: github.com/m3db/m3ninx
version: 12b8ac4f173f9d539b0e94f3bca475318ab1a8db
version: 2a492ea6d91d3e2b5b9deb1b5cab343c80de373b

- package: github.com/m3db/bitset
version: 07973db6b78acb62ac207d0538055e874b49d90d
Expand Down
4 changes: 2 additions & 2 deletions integration/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func tchannelClientWriteBatch(client rpc.TChanNode, timeout time.Duration, names
for _, series := range seriesList {
for _, dp := range series.Data {
elem := &rpc.WriteBatchRawRequestElement{
ID: series.ID.Data().Bytes(),
ID: series.ID.Bytes(),
Datapoint: &rpc.Datapoint{
Timestamp: xtime.ToNormalizedTime(dp.Timestamp, time.Second),
Value: dp.Value,
Expand All @@ -72,7 +72,7 @@ func tchannelClientWriteBatch(client rpc.TChanNode, timeout time.Duration, names

ctx, _ := thrift.NewContext(timeout)
batchReq := &rpc.WriteBatchRawRequest{
NameSpace: namespace.Data().Bytes(),
NameSpace: namespace.Bytes(),
Elements: elems,
}
return client.WriteBatchRaw(ctx, batchReq)
Expand Down
4 changes: 2 additions & 2 deletions integration/generate/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
func (l SeriesBlock) Len() int { return len(l) }
func (l SeriesBlock) Swap(i, j int) { l[i], l[j] = l[j], l[i] }
func (l SeriesBlock) Less(i, j int) bool {
return bytes.Compare(l[i].ID.Data().Bytes(), l[j].ID.Data().Bytes()) < 0
return bytes.Compare(l[i].ID.Bytes(), l[j].ID.Bytes()) < 0
}

// Block generates a SeriesBlock based on provided config
Expand Down Expand Up @@ -120,5 +120,5 @@ func (l SeriesDataPointsByTime) Less(i, j int) bool {
if !l[i].Timestamp.Equal(l[j].Timestamp) {
return l[i].Timestamp.Before(l[j].Timestamp)
}
return bytes.Compare(l[i].ID.Data().Bytes(), l[j].ID.Data().Bytes()) < 0
return bytes.Compare(l[i].ID.Bytes(), l[j].ID.Bytes()) < 0
}
2 changes: 1 addition & 1 deletion integration/integration_data_verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func compareSeriesList(
require.Equal(t, len(expected), len(actual))

for i := range expected {
require.Equal(t, expected[i].ID.Data().Bytes(), actual[i].ID.Data().Bytes())
require.Equal(t, expected[i].ID.Bytes(), actual[i].ID.Bytes())
require.Equal(t, expected[i].Data, expected[i].Data)
}
}
2 changes: 1 addition & 1 deletion integration/truncate_namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestTruncateNamespace(t *testing.T) {

log.Debugf("truncate namespace %s", testNamespaces[0])
truncateReq := rpc.NewTruncateRequest()
truncateReq.NameSpace = testNamespaces[0].Data().Bytes()
truncateReq.NameSpace = testNamespaces[0].Bytes()
truncated, err := testSetup.truncate(truncateReq)
require.NoError(t, err)
require.Equal(t, int64(1), truncated)
Expand Down
4 changes: 2 additions & 2 deletions network/server/tchannelthrift/node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func NewService(db storage.Database, opts tchannelthrift.Options) rpc.TChanNode
scope := iopts.
MetricsScope().
SubScope("service").
Tagged(map[string]string{"serviceName": "node"})
Tagged(map[string]string{"service-name": "node"})

wrapperPoolOpts := pool.NewObjectPoolOptions().
SetSize(checkedBytesPoolSize).
Expand Down Expand Up @@ -527,7 +527,7 @@ func (s *service) FetchBlocksMetadataRaw(tctx thrift.Context, req *rpc.FetchBloc

for _, fetchedMetadata := range fetchedResults {
blocksMetadata := s.pools.blocksMetadata.Get()
blocksMetadata.ID = fetchedMetadata.ID.Data().Bytes()
blocksMetadata.ID = fetchedMetadata.ID.Bytes()
fetchedMetadataBlocks := fetchedMetadata.Blocks.Results()
blocksMetadata.Blocks = s.pools.blockMetadataSlice.Get()

Expand Down
4 changes: 2 additions & 2 deletions persist/fs/commitlog/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ func (w *writer) Write(
// If "idx" likely hasn't been written to commit log
// yet we need to include series metadata
var metadata schema.LogMetadata
metadata.ID = series.ID.Data().Bytes()
metadata.Namespace = series.Namespace.Data().Bytes()
metadata.ID = series.ID.Bytes()
metadata.Namespace = series.Namespace.Bytes()
metadata.Shard = series.Shard
metadata.EncodedTags = encodedTags
w.metadataEncoder.Reset()
Expand Down
2 changes: 1 addition & 1 deletion persist/fs/index_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (il *nearestIndexOffsetLookup) concurrentClone() (*nearestIndexOffsetLookup
// In other words, the returned offset can always be used as a starting point to
// begin scanning the index file for the desired series.
func (il *nearestIndexOffsetLookup) getNearestIndexFileOffset(id ident.ID) (int64, error) {
idBytes := id.Data().Bytes()
idBytes := id.Bytes()

min := 0
max := len(il.summaryIDsOffsets) - 1
Expand Down
2 changes: 1 addition & 1 deletion persist/fs/index_lookup_prop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestIndexLookupWriteRead(t *testing.T) {
writes := []generatedWrite{}
unique := map[string]struct{}{}
for _, write := range input.realWrites {
s := string(write.id.Data().Bytes())
s := string(write.id.Bytes())
if _, ok := unique[s]; ok {
continue
}
Expand Down
4 changes: 2 additions & 2 deletions persist/fs/read_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func readTestData(t *testing.T, r DataFileSetReader, shard uint32, timestamp tim

// Verify that the bloomFilter was bootstrapped properly by making sure it
// at least contains every ID
assert.True(t, bloomFilter.Test(id.Data().Bytes()))
assert.True(t, bloomFilter.Test(id.Bytes()))

id.Finalize()
tags.Close()
Expand All @@ -191,7 +191,7 @@ func readTestData(t *testing.T, r DataFileSetReader, shard uint32, timestamp tim

// Verify that the bloomFilter was bootstrapped properly by making sure it
// at least contains every ID
assert.True(t, bloomFilter.Test(id.Data().Bytes()))
assert.True(t, bloomFilter.Test(id.Bytes()))

id.Finalize()
tags.Close()
Expand Down
2 changes: 1 addition & 1 deletion persist/fs/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func (r *blockRetriever) Stream(

// If the ID is not in the seeker's bloom filter, then it's definitely not on
// disk and we can return immediately
if !bloomFilter.Test(id.Data().Bytes()) {
if !bloomFilter.Test(id.Bytes()) {
// No need to call req.onRetrieve.OnRetrieveBlock if there is no data
req.onRetrieved(ts.Segment{})
return req.toBlock(), nil
Expand Down
2 changes: 1 addition & 1 deletion persist/fs/seek.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (s *seeker) SeekIndexEntry(id ident.ID) (IndexEntry, error) {
stream := msgpack.NewDecoderStream(s.indexMmap[offset:])
s.decoder.Reset(stream)

idBytes := id.Data().Bytes()
idBytes := id.Bytes()
// Prevent panic's when we're scanning to the end of the buffer
for stream.Remaining() != 0 {
entry, err := s.decoder.DecodeIndexEntry()
Expand Down
6 changes: 3 additions & 3 deletions persist/fs/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (e indexEntries) Len() int {
}

func (e indexEntries) Less(i, j int) bool {
return bytes.Compare(e[i].id.Data().Bytes(), e[j].id.Data().Bytes()) < 0
return bytes.Compare(e[i].id.Bytes(), e[j].id.Bytes()) < 0
}

func (e indexEntries) Swap(i, j int) {
Expand Down Expand Up @@ -421,7 +421,7 @@ func (w *writer) writeIndexFileContents(
)
defer tagsEncoder.Finalize()
for i := range w.indexEntries {
id := w.indexEntries[i].id.Data().Bytes()
id := w.indexEntries[i].id.Bytes()
// Need to check if i > 0 or we can never write an empty string ID
if i > 0 && bytes.Equal(id, prevID) {
// Should never happen, Write() should only be called once per ID
Expand Down Expand Up @@ -491,7 +491,7 @@ func (w *writer) writeSummariesFileContents(

summary := schema.IndexSummary{
Index: w.indexEntries[i].index,
ID: w.indexEntries[i].id.Data().Bytes(),
ID: w.indexEntries[i].id.Bytes(),
IndexEntryOffset: w.indexEntries[i].indexFileOffset,
}

Expand Down
Loading