From f23cbc1227ab5c6f1b79899dc3c70403446052a9 Mon Sep 17 00:00:00 2001 From: Evgenii Guguchkin Date: Mon, 15 Jun 2026 19:45:48 +0300 Subject: [PATCH 1/7] feat: add random search downsampling --- api/seqproxyapi/v1/seq_proxy_api.proto | 19 +- api/storeapi/store_api.proto | 1 + frac/fraction_test.go | 216 ++++++++++++++++++ frac/processor/search.go | 103 ++++++--- frac/processor/search_params.go | 2 + pkg/seqproxyapi/v1/seq_proxy_api.pb.go | 28 ++- .../v1/seq_proxy_api_vtproto.pb.go | 110 +++++++++ pkg/storeapi/store_api.pb.go | 16 +- pkg/storeapi/store_api_vtproto.pb.go | 55 +++++ proxy/search/search_request.go | 24 +- proxyapi/grpc_search.go | 13 +- proxyapi/grpc_v1.go | 1 + storeapi/grpc_search.go | 1 + tests/integration_tests/integration_test.go | 79 +++++++ tests/setup/env.go | 6 + 15 files changed, 610 insertions(+), 64 deletions(-) diff --git a/api/seqproxyapi/v1/seq_proxy_api.proto b/api/seqproxyapi/v1/seq_proxy_api.proto index 22ca2f1da..6da046859 100644 --- a/api/seqproxyapi/v1/seq_proxy_api.proto +++ b/api/seqproxyapi/v1/seq_proxy_api.proto @@ -6,6 +6,7 @@ package seqproxyapi.v1; import "google/protobuf/duration.proto"; import "google/protobuf/timestamp.proto"; + import "google/api/annotations.proto"; // seq-db public api. Exposes APIs related to document querying. @@ -216,17 +217,19 @@ message SearchRequest { bool with_total = 4; // Should total number of documents be returned in response. Order order = 5; // Document order ORDER_DESC/ORDER_ASC. string offset_id = 6; // ID offset for pagination. + uint32 downsample = 7; // If set, returns roughly 1 in N documents on a probabilistic basis } message ComplexSearchRequest { - SearchQuery query = 1; // Search query. - repeated AggQuery aggs = 2; // List of aggregation queries. - optional HistQuery hist = 3; // Histogram query. - int64 size = 4; // Maximum number of documents to return. - int64 offset = 5; // Search offset. - bool with_total = 6; // Should total number of documents be returned in response. - Order order = 7; // Document order ORDER_DESC/ORDER_ASC. - string offset_id = 8; // ID offset for pagination. + SearchQuery query = 1; // Search query. + repeated AggQuery aggs = 2; // List of aggregation queries. + optional HistQuery hist = 3; // Histogram query. + int64 size = 4; // Maximum number of documents to return. + int64 offset = 5; // Search offset. + bool with_total = 6; // Should total number of documents be returned in response. + Order order = 7; // Document order ORDER_DESC/ORDER_ASC. + string offset_id = 8; // ID offset for pagination. + uint32 downsample = 9; // If set, returns roughly 1 in N documents on a probabilistic basis } message SearchResponse { diff --git a/api/storeapi/store_api.proto b/api/storeapi/store_api.proto index 43a343b81..d3b2e2b42 100644 --- a/api/storeapi/store_api.proto +++ b/api/storeapi/store_api.proto @@ -77,6 +77,7 @@ message SearchRequest { repeated AggQuery aggs = 12; Order order = 13; string offset_id = 14; + uint32 downsample = 15; // If set, returns roughly 1 in N documents on a probabilistic basis } message SearchResponse { diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 244aeb99f..35e02713d 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -1912,6 +1912,215 @@ func (s *FractionTestSuite) TestFractionInfo() { } } +func (s *FractionTestSuite) TestSearchDownsample() { + const ( + totalDocs = 5000 + bulkSize = 200 + queryAll = "message:*" + queryFiltered = "message:started" + tolerancePct = 3 // ±3% tolerance due to probabilistic sampling + ) + + _, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize) + s.insertDocuments(bulks...) + + baseOpts := []searchOption{ + withFrom(fromTime.Format(time.RFC3339Nano)), + withTo(toTime.Format(time.RFC3339Nano)), + } + + // Step 1: verify that all documents are indexed and searchable + allResult, err := s.fraction.Search(context.Background(), *s.query(queryAll, baseOpts...)) + s.Require().NoError(err, "search for all documents should succeed") + s.Require().Equal(totalDocs, allResult.IDs.Len(), + "all %d documents should be found without downsample", totalDocs) + + // Step 2: find how many documents match the filtered query (message:started) + // This count serves as the baseline for downsample expectations. + filteredResult, err := s.fraction.Search(context.Background(), *s.query(queryFiltered, baseOpts...)) + s.Require().NoError(err, "search for filtered documents should succeed") + filteredDocCount := filteredResult.IDs.Len() + s.Require().Greater(filteredDocCount, 0, "at least one document should match %q", queryFiltered) + + // Step 3: verify downsample produces approximately expected document counts + // With downsample=k, each document has a 1/k probability of being included, + // so we expect approximately total/k documents with ±tolerancePercent tolerance. + tolerance := filteredDocCount * tolerancePct / 100 + downsampleValues := []int{10, 20, 50, 100} + for _, ds := range downsampleValues { + s.T().Run(fmt.Sprintf("downsample=%d", ds), func(t *testing.T) { + query := s.query(queryFiltered, append(baseOpts, withDownsample(uint32(ds)))...) + result, err := s.fraction.Search(context.Background(), *query) + s.Require().NoError(err, "search with downsample=%d should succeed", ds) + + sampledLen := result.IDs.Len() + expectedCount := filteredDocCount / ds + lowerBound := expectedCount - tolerance + upperBound := expectedCount + tolerance + + s.Require().GreaterOrEqual(sampledLen, lowerBound, + "downsample=%d: sampled count %d should be >= %d (expected %d - tolerance %d)", + ds, sampledLen, lowerBound, expectedCount, tolerance) + s.Require().LessOrEqual(sampledLen, upperBound, + "downsample=%d: sampled count %d should be <= %d (expected %d + tolerance %d)", + ds, sampledLen, upperBound, expectedCount, tolerance) + }) + } +} + +func (s *FractionTestSuite) TestSearchDownsampleWithTotal() { + const ( + totalDocs = 1000 + bulkSize = 200 + tolerancePct = 3 // percent tolerance for sampled document count + ) + + _, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize) + s.insertDocuments(bulks...) + + // tolerance window: ±3% of total documents + tolerance := totalDocs * tolerancePct / 100 + + // downsample values to test: each should return ~1/ds of total documents + downsampleValues := []int{10, 20, 50, 100} + + for _, ds := range downsampleValues { + ds := ds // capture loop variable + s.T().Run(fmt.Sprintf("downsample=%d", ds), func(t *testing.T) { + params := s.query( + "message:*", + withFrom(fromTime.Format(time.RFC3339Nano)), + withTo(toTime.Format(time.RFC3339Nano)), + withDownsample(uint32(ds)), + withTotal(), + ) + result, err := s.fraction.Search(context.Background(), *params) + s.Require().NoError(err, "search with downsample=%d failed", ds) + + sampledDocs := result.IDs.Len() + expectedDocs := totalDocs / ds // with downsample=k, expect approximately totalDocs/k documents + + s.Require().Less(sampledDocs, expectedDocs+tolerance, + "downsample=%d: sampled docs (%d) should be less than expected (%d) + tolerance (%d)", + ds, sampledDocs, expectedDocs, tolerance) + s.Require().Greater(sampledDocs, expectedDocs-tolerance, + "downsample=%d: sampled docs (%d) should be greater than expected (%d) - tolerance (%d)", + ds, sampledDocs, expectedDocs, tolerance) + + // Total field must always reflect the full document count, regardless of downsample + s.Require().Equal(totalDocs, int(result.Total), + "downsample=%d: total should not be affected by downsample", ds) + }) + } +} + +func (s *FractionTestSuite) TestSearchDownsampleWithAggAndHist() { + const ( + totalDocs = 1000 + bulkSize = 200 + hist = 8 + downsample = 20 + ) + + _, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize) + s.insertDocuments(bulks...) + + commonOpts := []searchOption{ + withFrom(fromTime.Format(time.RFC3339Nano)), + withTo(toTime.Format(time.RFC3339Nano)), + withHist(uint64(hist)), + withAggQuery(processor.AggQuery{ + GroupBy: aggField("service"), + Func: seq.AggFuncCount, + }), + } + + s.T().Run("without downsample", func(t *testing.T) { + paramsNoDS := s.query("message:started", commonOpts...) + qprNoDS, err := s.fraction.Search(context.Background(), *paramsNoDS) + s.Require().NoError(err, "search without downsample failed") + s.Require().NotNil(qprNoDS, "search result must not be nil") + s.Require().Greater(len(qprNoDS.Aggs), 0, "should have aggregation results") + + // Verify the histogram has a reasonable number of buckets. + s.Require().Greater(len(qprNoDS.Histogram), 0, "histogram should have at least one bucket") + s.Require().LessOrEqual(len(qprNoDS.Histogram), totalDocs/hist, + "histogram buckets (%d) should not exceed cntDocs/hist=%d", + len(qprNoDS.Histogram), totalDocs/hist) + + s.T().Run("with downsample", func(t *testing.T) { + paramsDS := s.query("message:started", append(commonOpts, withDownsample(downsample))...) + qprDS, err := s.fraction.Search(context.Background(), *paramsDS) + s.Require().NoError(err, "search with downsample=%d failed", downsample) + s.Require().NotNil(qprDS, "search result must not be nil") + s.Require().Equal(qprNoDS.Histogram, qprDS.Histogram, "histogram should match without downsample") + assertAggregationsEqual(s, qprNoDS, qprDS) + }) + }) +} + +// assertAggregationsEqual verifies that two search results have identical aggregation data. +func assertAggregationsEqual(s *FractionTestSuite, expected, actual *seq.QPR) { + s.Require().Equal(len(expected.Aggs), len(actual.Aggs), + "number of aggregation groups should be the same; "+ + "aggregations are computed on the full document set and are not affected by downsample") + + for i := range expected.Aggs { + expAgg := &expected.Aggs[i] + actAgg := &actual.Aggs[i] + + s.Require().Equal(len(expAgg.SamplesByBin), len(actAgg.SamplesByBin), + "number of aggregation bins should be the same for agg group %d", i) + + for bin, expSample := range expAgg.SamplesByBin { + actSample, ok := actAgg.SamplesByBin[bin] + s.Require().True(ok, "bin %v should exist in downsample results for agg group %d", bin, i) + // Total count is computed from the full document set and must match exactly. + s.Require().Equal(expSample.Total, actSample.Total, + "aggregation total for bin %v in agg group %d should be the same "+ + "(aggregations are computed on the full document set, not sampled)", bin, i) + } + } +} + +func (s *FractionTestSuite) TestSearchDownsampleZeroAndOne() { + const ( + totalDocs = 5000 + bulkSize = 200 + queryAll = "message:*" + ) + + _, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize) + s.insertDocuments(bulks...) + + baseOpts := []searchOption{ + withFrom(fromTime.Format(time.RFC3339Nano)), + withTo(toTime.Format(time.RFC3339Nano)), + } + + // searchAndAssertIDs is a local helper that runs a search with the given options + // and asserts that the result contains exactly totalDocs documents. + searchAndAssertIDs := func(name string, opts ...searchOption) { + s.T().Run(name, func(t *testing.T) { + params := s.query(queryAll, append(baseOpts, opts...)...) + qpr, err := s.fraction.Search(context.Background(), *params) + s.Require().NoError(err, "%s: search failed", name) + s.Require().NotNil(qpr, "%s: search result must not be nil", name) + s.Require().Equal(totalDocs, qpr.IDs.Len(), + "%s: expected %d documents, got %d", name, totalDocs, qpr.IDs.Len()) + }) + } + + // downsample=0 (default) — should return all documents + searchAndAssertIDs("downsample=0 (default)") + + // downsample=0 explicitly — should return all documents + searchAndAssertIDs("downsample=0 (explicit)", withDownsample(0)) + + // downsample=1 — should return all documents + searchAndAssertIDs("downsample=1", withDownsample(1)) +} + type searchOption func(*processor.SearchParams) error func (s *FractionTestSuite) query(queryString string, options ...searchOption) *processor.SearchParams { @@ -1976,6 +2185,13 @@ func withHist(histInterval uint64) searchOption { } } +func withDownsample(k uint32) searchOption { + return func(sp *processor.SearchParams) error { + sp.Downsample = k + return nil + } +} + func aggField(field string) *parser.Literal { searchAll := []parser.Term{{ Kind: parser.TermSymbol, Data: "*", diff --git a/frac/processor/search.go b/frac/processor/search.go index 30ed4f45e..8734bcc7f 100644 --- a/frac/processor/search.go +++ b/frac/processor/search.go @@ -4,6 +4,7 @@ import ( "context" "errors" "math" + "math/rand/v2" "sync" "time" @@ -43,9 +44,10 @@ type searchIndex interface { } type searchBuffers struct { - lids []node.LID - mids []seq.MID - rids []seq.RID + mids []seq.MID + rids []seq.RID + lids []node.LID + sampled []node.LID } var searchBuffersPool = sync.Pool{ @@ -222,8 +224,10 @@ func iterateEvalTree( buffers := searchBuffersPool.Get().(*searchBuffers) defer searchBuffersPool.Put(buffers) + mids := buffers.mids rids := buffers.rids + sampled := buffers.sampled batchedEvalTree := batcher(evalTree, buffers.lids) @@ -244,7 +248,10 @@ func iterateEvalTree( break } needLIDs := params.Limit - len(ids) - if needScanAllRange { + + // if full range scan is required OR downsampling is active, + // we must fetch as many LIDs as possible in one batch. + if needScanAllRange || params.Downsample > 1 { needLIDs = math.MaxUint32 } @@ -256,43 +263,48 @@ func iterateEvalTree( break } - needMIDs := min(params.Limit-len(ids), len(lidsSlice)) - if hasHist { - // need to fetch mids for all lids for hist - needMIDs = len(lidsSlice) - } + mids = mids[:0] - // Get MIDs - if needMIDs > 0 { + if hasHist { + // when histogram is needed, retrieve MIDs for ALL LIDs in the batch timerMID.Start() - mids = idsIndex.GetMIDs(lidsSlice[:needMIDs], mids[:0]) + mids = idsIndex.GetMIDs(lidsSlice, mids) timerMID.Stop() - } - // Get RIDs - // compute number of ids we can get here, since some MIDs might have been filtered out - needIDs := min(params.Limit-len(ids), len(lidsSlice)) - if needIDs > 0 { - timerRID.Start() - rids = idsIndex.GetRIDs(lidsSlice[0:needIDs], rids[:0]) - timerRID.Stop() + timerUpdateHist.Start() + hist.Update(mids) + timerUpdateHist.Stop() + + sampled, mids = sampleLIDsWithMIDs(lidsSlice, mids, sampled[:0], params.Downsample) + } else { + sampled = sampleLIDs(lidsSlice, sampled[:0], params.Downsample) } - // Fill IDs for search - for i := 0; i < needIDs; i++ { - id := seq.ID{MID: mids[i], RID: rids[i]} + // trim sampled slice to respect the remaining limit + if params.Limit-len(ids) < len(sampled) { + sampled = sampled[:params.Limit-len(ids)] + } - if i == 0 || lastID != id { // lids increase monotonically, it's enough to compare current id with the last one - ids = append(ids, seq.IDSource{ID: id}) + if len(sampled) > 0 { + if len(mids) == 0 { + // if we don't already have MIDs (i.e., no histogram case), fetch them now + timerMID.Start() + mids = idsIndex.GetMIDs(sampled, mids) + timerMID.Stop() } - lastID = id - } - // Update hist map - if hasHist { - timerUpdateHist.Start() - hist.Update(mids) - timerUpdateHist.Stop() + timerRID.Start() + rids = idsIndex.GetRIDs(sampled, rids[:0]) + timerRID.Stop() + + // Fill IDs for search + for i := range sampled { + id := seq.ID{MID: mids[i], RID: rids[i]} + if i == 0 || lastID != id { // lids increase monotonically, it's enough to compare current id with the last one + ids = append(ids, seq.IDSource{ID: id}) + } + lastID = id + } } // Update aggregators @@ -323,6 +335,33 @@ func iterateEvalTree( return total, ids, hist, aggs, nil } +func sampleLIDs(in, out []node.LID, k uint32) []node.LID { + if k <= 1 { // 0 or 1 -> no sample + return in + } + for _, lid := range in { + if rand.N(k) == 0 { + out = append(out, lid) + } + } + return out +} + +func sampleLIDsWithMIDs(in []node.LID, mids []seq.MID, out []node.LID, k uint32) ([]node.LID, []seq.MID) { + if k <= 1 { // 0 or 1 -> no sample + return in, mids + } + i := 0 + for j, lid := range in { + if rand.N(k) == 0 { + out = append(out, lid) + mids[i] = mids[j] + i++ + } + } + return out, mids[:i] +} + func tryConvertToBatchedTree(evalTree node.Node) (node.BatchedNode, bool) { switch it := evalTree.(type) { case *lids.IteratorDesc: diff --git a/frac/processor/search_params.go b/frac/processor/search_params.go index de2594b86..4d7279d78 100644 --- a/frac/processor/search_params.go +++ b/frac/processor/search_params.go @@ -47,6 +47,8 @@ type SearchParams struct { WithTotal bool Order seq.DocsOrder + + Downsample uint32 } func (p SearchParams) MarshalLogObject(enc zapcore.ObjectEncoder) error { diff --git a/pkg/seqproxyapi/v1/seq_proxy_api.pb.go b/pkg/seqproxyapi/v1/seq_proxy_api.pb.go index eac9fd1cc..29a70a723 100644 --- a/pkg/seqproxyapi/v1/seq_proxy_api.pb.go +++ b/pkg/seqproxyapi/v1/seq_proxy_api.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.5 -// protoc v5.29.3 +// protoc v7.35.0 // source: seqproxyapi/v1/seq_proxy_api.proto package seqproxyapi @@ -719,6 +719,7 @@ type SearchRequest struct { WithTotal bool `protobuf:"varint,4,opt,name=with_total,json=withTotal,proto3" json:"with_total,omitempty"` // Should total number of documents be returned in response. Order Order `protobuf:"varint,5,opt,name=order,proto3,enum=seqproxyapi.v1.Order" json:"order,omitempty"` // Document order ORDER_DESC/ORDER_ASC. OffsetId string `protobuf:"bytes,6,opt,name=offset_id,json=offsetId,proto3" json:"offset_id,omitempty"` // ID offset for pagination. + Downsample uint32 `protobuf:"varint,7,opt,name=downsample,proto3" json:"downsample,omitempty"` // If set, returns roughly 1 in N documents, statistically. unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -795,6 +796,13 @@ func (x *SearchRequest) GetOffsetId() string { return "" } +func (x *SearchRequest) GetDownsample() uint32 { + if x != nil { + return x.Downsample + } + return 0 +} + type ComplexSearchRequest struct { state protoimpl.MessageState `protogen:"open.v1"` Query *SearchQuery `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"` // Search query. @@ -805,6 +813,7 @@ type ComplexSearchRequest struct { WithTotal bool `protobuf:"varint,6,opt,name=with_total,json=withTotal,proto3" json:"with_total,omitempty"` // Should total number of documents be returned in response. Order Order `protobuf:"varint,7,opt,name=order,proto3,enum=seqproxyapi.v1.Order" json:"order,omitempty"` // Document order ORDER_DESC/ORDER_ASC. OffsetId string `protobuf:"bytes,8,opt,name=offset_id,json=offsetId,proto3" json:"offset_id,omitempty"` // ID offset for pagination. + Downsample uint32 `protobuf:"varint,9,opt,name=downsample,proto3" json:"downsample,omitempty"` // If set, returns roughly 1 in N documents, statistically. unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -895,6 +904,13 @@ func (x *ComplexSearchRequest) GetOffsetId() string { return "" } +func (x *ComplexSearchRequest) GetDownsample() uint32 { + if x != nil { + return x.Downsample + } + return 0 +} + type SearchResponse struct { state protoimpl.MessageState `protogen:"open.v1"` // Deprecated: Marked as deprecated in seqproxyapi/v1/seq_proxy_api.proto. @@ -2724,7 +2740,7 @@ var file_seqproxyapi_v1_seq_proxy_api_proto_rawDesc = string([]byte{ 0x6e, 0x12, 0x38, 0x0a, 0x08, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x73, 0x65, 0x71, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x45, 0x78, 0x70, 0x6c, 0x61, 0x69, 0x6e, 0x45, 0x6e, 0x74, 0x72, - 0x79, 0x52, 0x08, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x22, 0xd7, 0x01, 0x0a, 0x0d, + 0x79, 0x52, 0x08, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x22, 0xf7, 0x01, 0x0a, 0x0d, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x31, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x73, 0x65, 0x71, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x65, @@ -2738,7 +2754,9 @@ var file_seqproxyapi_v1_seq_proxy_api_proto_rawDesc = string([]byte{ 0x70, 0x72, 0x6f, 0x78, 0x79, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x52, 0x05, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x12, 0x1b, 0x0a, 0x09, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6f, 0x66, 0x66, - 0x73, 0x65, 0x74, 0x49, 0x64, 0x22, 0xc9, 0x02, 0x0a, 0x14, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, + 0x73, 0x65, 0x74, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x64, 0x6f, 0x77, 0x6e, 0x73, 0x61, 0x6d, + 0x70, 0x6c, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x64, 0x6f, 0x77, 0x6e, 0x73, + 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x22, 0xe9, 0x02, 0x0a, 0x14, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x78, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x31, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x73, 0x65, 0x71, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x53, @@ -2758,7 +2776,9 @@ var file_seqproxyapi_v1_seq_proxy_api_proto_rawDesc = string([]byte{ 0x73, 0x65, 0x71, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x52, 0x05, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x12, 0x1b, 0x0a, 0x09, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, - 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x49, 0x64, 0x42, 0x07, 0x0a, 0x05, 0x5f, 0x68, 0x69, 0x73, + 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x64, 0x6f, 0x77, 0x6e, + 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x64, 0x6f, + 0x77, 0x6e, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x42, 0x07, 0x0a, 0x05, 0x5f, 0x68, 0x69, 0x73, 0x74, 0x22, 0xb0, 0x01, 0x0a, 0x0e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2d, 0x0a, 0x10, 0x70, 0x61, 0x72, 0x74, 0x69, 0x61, 0x6c, 0x5f, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x42, 0x02, diff --git a/pkg/seqproxyapi/v1/seq_proxy_api_vtproto.pb.go b/pkg/seqproxyapi/v1/seq_proxy_api_vtproto.pb.go index 97f47d659..b45009b1a 100644 --- a/pkg/seqproxyapi/v1/seq_proxy_api_vtproto.pb.go +++ b/pkg/seqproxyapi/v1/seq_proxy_api_vtproto.pb.go @@ -262,6 +262,7 @@ func (m *SearchRequest) CloneVT() *SearchRequest { r.WithTotal = m.WithTotal r.Order = m.Order r.OffsetId = m.OffsetId + r.Downsample = m.Downsample if len(m.unknownFields) > 0 { r.unknownFields = make([]byte, len(m.unknownFields)) copy(r.unknownFields, m.unknownFields) @@ -285,6 +286,7 @@ func (m *ComplexSearchRequest) CloneVT() *ComplexSearchRequest { r.WithTotal = m.WithTotal r.Order = m.Order r.OffsetId = m.OffsetId + r.Downsample = m.Downsample if rhs := m.Aggs; rhs != nil { tmpContainer := make([]*AggQuery, len(rhs)) for k, v := range rhs { @@ -1206,6 +1208,9 @@ func (this *SearchRequest) EqualVT(that *SearchRequest) bool { if this.OffsetId != that.OffsetId { return false } + if this.Downsample != that.Downsample { + return false + } return string(this.unknownFields) == string(that.unknownFields) } @@ -1260,6 +1265,9 @@ func (this *ComplexSearchRequest) EqualVT(that *ComplexSearchRequest) bool { if this.OffsetId != that.OffsetId { return false } + if this.Downsample != that.Downsample { + return false + } return string(this.unknownFields) == string(that.unknownFields) } @@ -3256,6 +3264,11 @@ func (m *SearchRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.Downsample != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Downsample)) + i-- + dAtA[i] = 0x38 + } if len(m.OffsetId) > 0 { i -= len(m.OffsetId) copy(dAtA[i:], m.OffsetId) @@ -3331,6 +3344,11 @@ func (m *ComplexSearchRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error) i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.Downsample != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Downsample)) + i-- + dAtA[i] = 0x48 + } if len(m.OffsetId) > 0 { i -= len(m.OffsetId) copy(dAtA[i:], m.OffsetId) @@ -5524,6 +5542,11 @@ func (m *SearchRequest) MarshalToSizedBufferVTStrict(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.Downsample != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Downsample)) + i-- + dAtA[i] = 0x38 + } if len(m.OffsetId) > 0 { i -= len(m.OffsetId) copy(dAtA[i:], m.OffsetId) @@ -5599,6 +5622,11 @@ func (m *ComplexSearchRequest) MarshalToSizedBufferVTStrict(dAtA []byte) (int, e i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.Downsample != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Downsample)) + i-- + dAtA[i] = 0x48 + } if len(m.OffsetId) > 0 { i -= len(m.OffsetId) copy(dAtA[i:], m.OffsetId) @@ -7440,6 +7468,9 @@ func (m *SearchRequest) SizeVT() (n int) { if l > 0 { n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } + if m.Downsample != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.Downsample)) + } n += len(m.unknownFields) return n } @@ -7480,6 +7511,9 @@ func (m *ComplexSearchRequest) SizeVT() (n int) { if l > 0 { n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } + if m.Downsample != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.Downsample)) + } n += len(m.unknownFields) return n } @@ -9613,6 +9647,25 @@ func (m *SearchRequest) UnmarshalVT(dAtA []byte) error { } m.OffsetId = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Downsample", wireType) + } + m.Downsample = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Downsample |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -9879,6 +9932,25 @@ func (m *ComplexSearchRequest) UnmarshalVT(dAtA []byte) error { } m.OffsetId = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 9: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Downsample", wireType) + } + m.Downsample = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Downsample |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -15099,6 +15171,25 @@ func (m *SearchRequest) UnmarshalVTUnsafe(dAtA []byte) error { } m.OffsetId = stringValue iNdEx = postIndex + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Downsample", wireType) + } + m.Downsample = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Downsample |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -15369,6 +15460,25 @@ func (m *ComplexSearchRequest) UnmarshalVTUnsafe(dAtA []byte) error { } m.OffsetId = stringValue iNdEx = postIndex + case 9: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Downsample", wireType) + } + m.Downsample = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Downsample |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/pkg/storeapi/store_api.pb.go b/pkg/storeapi/store_api.pb.go index 53e881042..e5b74bbb6 100644 --- a/pkg/storeapi/store_api.pb.go +++ b/pkg/storeapi/store_api.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.5 -// protoc v5.29.3 +// protoc v7.35.0 // source: storeapi/store_api.proto package storeapi @@ -447,6 +447,7 @@ type SearchRequest struct { Aggs []*AggQuery `protobuf:"bytes,12,rep,name=aggs,proto3" json:"aggs,omitempty"` Order Order `protobuf:"varint,13,opt,name=order,proto3,enum=api.Order" json:"order,omitempty"` OffsetId string `protobuf:"bytes,14,opt,name=offset_id,json=offsetId,proto3" json:"offset_id,omitempty"` + Downsample uint32 `protobuf:"varint,15,opt,name=downsample,proto3" json:"downsample,omitempty"` // If set, returns roughly 1 in N documents, statistically. unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -574,6 +575,13 @@ func (x *SearchRequest) GetOffsetId() string { return "" } +func (x *SearchRequest) GetDownsample() uint32 { + if x != nil { + return x.Downsample + } + return 0 +} + type SearchResponse struct { state protoimpl.MessageState `protogen:"open.v1"` // Deprecated: Marked as deprecated in storeapi/store_api.proto. @@ -2172,7 +2180,7 @@ var file_storeapi_store_api_proto_rawDesc = string([]byte{ 0x0a, 0x09, 0x71, 0x75, 0x61, 0x6e, 0x74, 0x69, 0x6c, 0x65, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x01, 0x52, 0x09, 0x71, 0x75, 0x61, 0x6e, 0x74, 0x69, 0x6c, 0x65, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, - 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x4a, 0x04, 0x08, 0x02, 0x10, 0x03, 0x22, 0x85, + 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x4a, 0x04, 0x08, 0x02, 0x10, 0x03, 0x22, 0xa5, 0x03, 0x0a, 0x0d, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x66, 0x72, 0x6f, 0x6d, 0x18, 0x02, @@ -2197,7 +2205,9 @@ var file_storeapi_store_api_proto_rawDesc = string([]byte{ 0x18, 0x0d, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0a, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x52, 0x05, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x12, 0x1b, 0x0a, 0x09, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6f, 0x66, - 0x66, 0x73, 0x65, 0x74, 0x49, 0x64, 0x22, 0xe6, 0x09, 0x0a, 0x0e, 0x53, 0x65, 0x61, 0x72, 0x63, + 0x66, 0x73, 0x65, 0x74, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x64, 0x6f, 0x77, 0x6e, 0x73, 0x61, + 0x6d, 0x70, 0x6c, 0x65, 0x18, 0x0f, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x64, 0x6f, 0x77, 0x6e, + 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x22, 0xe6, 0x09, 0x0a, 0x0e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x42, 0x02, 0x18, 0x01, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x3d, 0x0a, 0x0a, 0x69, 0x64, 0x5f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x18, diff --git a/pkg/storeapi/store_api_vtproto.pb.go b/pkg/storeapi/store_api_vtproto.pb.go index 73a4cce31..fe3d5dc18 100644 --- a/pkg/storeapi/store_api_vtproto.pb.go +++ b/pkg/storeapi/store_api_vtproto.pb.go @@ -121,6 +121,7 @@ func (m *SearchRequest) CloneVT() *SearchRequest { r.AggregationFilter = m.AggregationFilter r.Order = m.Order r.OffsetId = m.OffsetId + r.Downsample = m.Downsample if rhs := m.Aggs; rhs != nil { tmpContainer := make([]*AggQuery, len(rhs)) for k, v := range rhs { @@ -845,6 +846,9 @@ func (this *SearchRequest) EqualVT(that *SearchRequest) bool { if this.OffsetId != that.OffsetId { return false } + if this.Downsample != that.Downsample { + return false + } return string(this.unknownFields) == string(that.unknownFields) } @@ -2296,6 +2300,11 @@ func (m *SearchRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.Downsample != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Downsample)) + i-- + dAtA[i] = 0x78 + } if len(m.OffsetId) > 0 { i -= len(m.OffsetId) copy(dAtA[i:], m.OffsetId) @@ -4108,6 +4117,11 @@ func (m *SearchRequest) MarshalToSizedBufferVTStrict(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.Downsample != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Downsample)) + i-- + dAtA[i] = 0x78 + } if len(m.OffsetId) > 0 { i -= len(m.OffsetId) copy(dAtA[i:], m.OffsetId) @@ -5845,6 +5859,9 @@ func (m *SearchRequest) SizeVT() (n int) { if l > 0 { n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } + if m.Downsample != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.Downsample)) + } n += len(m.unknownFields) return n } @@ -7229,6 +7246,25 @@ func (m *SearchRequest) UnmarshalVT(dAtA []byte) error { } m.OffsetId = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 15: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Downsample", wireType) + } + m.Downsample = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Downsample |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -12056,6 +12092,25 @@ func (m *SearchRequest) UnmarshalVTUnsafe(dAtA []byte) error { } m.OffsetId = stringValue iNdEx = postIndex + case 15: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Downsample", wireType) + } + m.Downsample = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Downsample |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/proxy/search/search_request.go b/proxy/search/search_request.go index 7aa468e24..5d6ba3fb3 100644 --- a/proxy/search/search_request.go +++ b/proxy/search/search_request.go @@ -28,21 +28,23 @@ type SearchRequest struct { WithTotal bool ShouldFetch bool Order seq.DocsOrder + Downsample uint32 } func (sr *SearchRequest) GetAPISearchRequest() *storeapi.SearchRequest { return &storeapi.SearchRequest{ - Query: util.ByteToStringUnsafe(sr.Q), - From: int64(seq.MIDToMillis(sr.From)), - To: int64(seq.MIDToMillis(sr.To)), - Size: int64(sr.Size), - Offset: int64(sr.Offset), - Interval: int64(seq.MIDToMillis(sr.Interval)), - OffsetId: sr.OffsetId, - Aggs: convertToAggsQuery(sr.AggQ), - Explain: sr.Explain, - WithTotal: sr.WithTotal, - Order: storeapi.MustProtoOrder(sr.Order), + Query: util.ByteToStringUnsafe(sr.Q), + From: int64(seq.MIDToMillis(sr.From)), + To: int64(seq.MIDToMillis(sr.To)), + Size: int64(sr.Size), + Offset: int64(sr.Offset), + Interval: int64(seq.MIDToMillis(sr.Interval)), + OffsetId: sr.OffsetId, + Aggs: convertToAggsQuery(sr.AggQ), + Explain: sr.Explain, + WithTotal: sr.WithTotal, + Order: storeapi.MustProtoOrder(sr.Order), + Downsample: sr.Downsample, } } diff --git a/proxyapi/grpc_search.go b/proxyapi/grpc_search.go index d5e658073..59c172343 100644 --- a/proxyapi/grpc_search.go +++ b/proxyapi/grpc_search.go @@ -20,12 +20,13 @@ func (g *grpcV1) Search( } proxyReq := &seqproxyapi.ComplexSearchRequest{ - Query: req.Query, - Size: req.Size, - Offset: req.Offset, - OffsetId: req.OffsetId, - WithTotal: req.WithTotal, - Order: req.Order, + Query: req.Query, + Size: req.Size, + Offset: req.Offset, + OffsetId: req.OffsetId, + WithTotal: req.WithTotal, + Order: req.Order, + Downsample: req.Downsample, } sResp, err := g.doSearch(ctx, proxyReq, true, nil) if err != nil { diff --git a/proxyapi/grpc_v1.go b/proxyapi/grpc_v1.go index 90611a293..fb14c14e9 100644 --- a/proxyapi/grpc_v1.go +++ b/proxyapi/grpc_v1.go @@ -242,6 +242,7 @@ func (g *grpcV1) doSearch( WithTotal: req.WithTotal, ShouldFetch: shouldFetch, Order: req.Order.MustDocsOrder(), + Downsample: req.Downsample, } if len(req.Aggs) > 0 { diff --git a/storeapi/grpc_search.go b/storeapi/grpc_search.go index e5eedd980..388312802 100644 --- a/storeapi/grpc_search.go +++ b/storeapi/grpc_search.go @@ -186,6 +186,7 @@ func (g *GrpcV1) doSearch( WithTotal: req.WithTotal, Order: req.Order.MustDocsOrder(), OffsetId: offsetId, + Downsample: req.Downsample, } searchTr := tr.NewChild("search iteratively") diff --git a/tests/integration_tests/integration_test.go b/tests/integration_tests/integration_test.go index d2abf8185..67efa3252 100644 --- a/tests/integration_tests/integration_test.go +++ b/tests/integration_tests/integration_test.go @@ -1316,6 +1316,85 @@ func TestBigWithReplicasIntegration(t *testing.T) { suite.Run(t, dd) } +func (s *IntegrationTestSuite) TestDownsamplePropagation() { + t := s.T() + r := require.New(t) + + env := setup.NewTestingEnv(s.Config) + defer env.StopAll() + + const ( + docsPerBulk = 200 + tolerancePercent = 0.5 + ) + + // Setup data with status field for aggregations. + bulksNum := getBulkIterationsNum(env) + totalDocs := docsPerBulk * bulksNum + + origDocs := make([]string, docsPerBulk) + for j := 0; j < bulksNum; j++ { + baseIdx := j * docsPerBulk + for i := range origDocs { + origDocs[i] = fmt.Sprintf(`{"service":"a","id":%d,"status":%d}`, baseIdx+i, i%3) + } + setup.Bulk(t, env.IngestorBulkAddr(), origDocs) + } + + type testCase struct { + name string + downsample *uint32 // nil = option not passed + wantAll bool // expect all documents returned + } + + cases := []testCase{{ + name: "no downsample option", + downsample: nil, + wantAll: true, + }, { + name: "downsample=0", + downsample: ptr[uint32](0), + wantAll: true, + }, { + name: "downsample=1", + downsample: ptr[uint32](1), + wantAll: true, + }, { + name: "downsample=10", + downsample: ptr[uint32](10), + wantAll: false, + }} + + for _, tc := range cases { + opts := []setup.SearchOption{setup.NoFetch(), setup.WithTotal(true)} + if tc.downsample != nil { + opts = append(opts, setup.WithDownsample(*tc.downsample)) + } + + qpr, _, _, err := env.Search(`service:a`, math.MaxInt32, opts...) + r.NoError(err, "store search with %s should succeed", tc.name) + + if tc.wantAll { + r.Equal(totalDocs, len(qpr.IDs), "store search %s: should return all %d docs", tc.name, totalDocs) + } else { + r.Greater(len(qpr.IDs), 0, "store search %s: should return at least some results", tc.name) + // downsample=N: expect approximately total/N docs with ±3% tolerance. + ds := int(*tc.downsample) + delta := float64(totalDocs/ds) * tolerancePercent + r.InDelta(totalDocs/ds, len(qpr.IDs), delta, + "store search %s: should return ~%d docs", tc.name, totalDocs/ds) + } + + r.Equal(uint64(totalDocs), qpr.Total, + "store search %s: Total should reflect full count (%d)", tc.name, totalDocs) + } +} + +// ptr returns a pointer to the given value. +func ptr[T any](v T) *T { + return &v +} + func (s *IntegrationTestSuite) TestDocuments() { n := 32 env, origDocs := s.envWithDummyDocs(n) diff --git a/tests/setup/env.go b/tests/setup/env.go index a42ba006e..73b4eea20 100644 --- a/tests/setup/env.go +++ b/tests/setup/env.go @@ -573,6 +573,12 @@ func WithOrder(o seq.DocsOrder) SearchOption { } } +func WithDownsample(downsample uint32) SearchOption { + return func(sr *search.SearchRequest) { + sr.Downsample = downsample + } +} + func (t *TestingEnv) Search(q string, size int, options ...SearchOption) (*seq.QPR, [][]byte, time.Duration, error) { sr := &search.SearchRequest{ Explain: false, From 50b89223c510357e71706310669d23b97cbd95fd Mon Sep 17 00:00:00 2001 From: Evgenii Guguchkin Date: Thu, 18 Jun 2026 16:15:58 +0300 Subject: [PATCH 2/7] downsample hist and aggs too --- frac/processor/search.go | 170 ++++++++++++++++++--------------------- 1 file changed, 79 insertions(+), 91 deletions(-) diff --git a/frac/processor/search.go b/frac/processor/search.go index 8734bcc7f..14176e35b 100644 --- a/frac/processor/search.go +++ b/frac/processor/search.go @@ -4,7 +4,6 @@ import ( "context" "errors" "math" - "math/rand/v2" "sync" "time" @@ -44,10 +43,9 @@ type searchIndex interface { } type searchBuffers struct { - mids []seq.MID - rids []seq.RID - lids []node.LID - sampled []node.LID + mids []seq.MID + rids []seq.RID + lids []node.LID } var searchBuffersPool = sync.Pool{ @@ -211,155 +209,145 @@ func iterateEvalTree( hasHist := params.HasHist() needScanAllRange := params.IsScanAllRequest() - var hist HistMap - if hasHist { - hist = NewHistMap(params.From, params.To, params.HistInterval) - } - var ( total int + hist HistMap lastID seq.ID ids seq.IDSources ) + if hasHist { + hist = NewHistMap(params.From, params.To, params.HistInterval) + } + buffers := searchBuffersPool.Get().(*searchBuffers) defer searchBuffersPool.Put(buffers) mids := buffers.mids rids := buffers.rids - sampled := buffers.sampled batchedEvalTree := batcher(evalTree, buffers.lids) timerEval := sw.Timer("eval_tree_next") timerMID := sw.Timer("get_mid") - timerUpdateHist := sw.Timer("update_hist") + timerHist := sw.Timer("update_hist") timerRID := sw.Timer("get_rid") timerAgg := sw.Timer("agg_node_count") + sample := sampler(params.Downsample) + var aggs []Aggregator for { if util.IsCancelled(ctx) { return total, ids, hist, aggs, ctx.Err() } - needMore := len(ids) < params.Limit - if !needMore && !needScanAllRange { + needIDs := params.Limit - len(ids) + if needIDs < 1 && !needScanAllRange { break } - needLIDs := params.Limit - len(ids) - // if full range scan is required OR downsampling is active, - // we must fetch as many LIDs as possible in one batch. + maxBatchSize := needIDs if needScanAllRange || params.Downsample > 1 { - needLIDs = math.MaxUint32 + // if full range scan is required OR downsampling is active, + // we must fetch as many LIDs as possible in one batch. + maxBatchSize = math.MaxUint32 } timerEval.Start() - lidsSlice := batchedEvalTree(needLIDs) + lidsBatch := batchedEvalTree(maxBatchSize) timerEval.Stop() - if len(lidsSlice) == 0 { + total += len(lidsBatch) + + lidsBatch = sample(lidsBatch) + + if len(lidsBatch) == 0 { break } - mids = mids[:0] - - if hasHist { - // when histogram is needed, retrieve MIDs for ALL LIDs in the batch + if hasHist || needIDs > 0 { timerMID.Start() - mids = idsIndex.GetMIDs(lidsSlice, mids) + mids = idsIndex.GetMIDs(lidsBatch, mids[:0]) timerMID.Stop() - timerUpdateHist.Start() - hist.Update(mids) - timerUpdateHist.Stop() - - sampled, mids = sampleLIDsWithMIDs(lidsSlice, mids, sampled[:0], params.Downsample) - } else { - sampled = sampleLIDs(lidsSlice, sampled[:0], params.Downsample) - } - - // trim sampled slice to respect the remaining limit - if params.Limit-len(ids) < len(sampled) { - sampled = sampled[:params.Limit-len(ids)] - } - - if len(sampled) > 0 { - if len(mids) == 0 { - // if we don't already have MIDs (i.e., no histogram case), fetch them now - timerMID.Start() - mids = idsIndex.GetMIDs(sampled, mids) - timerMID.Stop() + if hasHist { + timerHist.Start() + hist.Update(mids) + timerHist.Stop() } - timerRID.Start() - rids = idsIndex.GetRIDs(sampled, rids[:0]) - timerRID.Stop() - - // Fill IDs for search - for i := range sampled { - id := seq.ID{MID: mids[i], RID: rids[i]} - if i == 0 || lastID != id { // lids increase monotonically, it's enough to compare current id with the last one - ids = append(ids, seq.IDSource{ID: id}) + if needIDs > 0 { + if needIDs < len(lidsBatch) { // trim sampled slice to respect the remaining limit + lidsBatch = lidsBatch[:needIDs] + } + timerRID.Start() + rids = idsIndex.GetRIDs(lidsBatch, rids[:0]) + timerRID.Stop() + + // Fill IDs for search + for i := range lidsBatch { + id := seq.ID{MID: mids[i], RID: rids[i]} + if i == 0 || lastID != id { // lids increase monotonically, it's enough to compare current id with the last one + ids = append(ids, seq.IDSource{ID: id}) + } + lastID = id } - lastID = id } } // Update aggregators if params.HasAgg() { - if aggs == nil { - var err error - aggs, err = aggSupplier() // sw timer is activated inside aggSupplier - if err != nil { - return total, ids, hist, nil, err - } - } - - timerAgg.Start() - for i := range aggs { - for _, lid := range lidsSlice { - if err := aggs[i].Next(lid); err != nil { - timerAgg.Stop() - return total, ids, hist, aggs, err - } - } + var err error + if aggs, err = updateAggs(aggs, lidsBatch, aggSupplier, timerAgg); err != nil { + return total, ids, hist, aggs, err } - timerAgg.Stop() } - - total += len(lidsSlice) } return total, ids, hist, aggs, nil } -func sampleLIDs(in, out []node.LID, k uint32) []node.LID { - if k <= 1 { // 0 or 1 -> no sample - return in +func updateAggs(aggs []Aggregator, lidsSlice []node.LID, aggSupplier func() ([]Aggregator, error), timer *stopwatch.Timer) ([]Aggregator, error) { + if aggs == nil { + var err error + if aggs, err = aggSupplier(); err != nil { // sw timer is activated inside aggSupplier + return nil, err + } } - for _, lid := range in { - if rand.N(k) == 0 { - out = append(out, lid) + + timer.Start() + defer timer.Stop() + + for i := range aggs { + for _, lid := range lidsSlice { + if err := aggs[i].Next(lid); err != nil { + return aggs, err + } } } - return out + return aggs, nil } -func sampleLIDsWithMIDs(in []node.LID, mids []seq.MID, out []node.LID, k uint32) ([]node.LID, []seq.MID) { - if k <= 1 { // 0 or 1 -> no sample - return in, mids +func sampler(n uint32) func(in []node.LID) []node.LID { + if n <= 1 { // 0 or 1 -> no sample + return func(in []node.LID) []node.LID { + return in + } } - i := 0 - for j, lid := range in { - if rand.N(k) == 0 { - out = append(out, lid) - mids[i] = mids[j] - i++ + + var cnt uint32 + return func(in []node.LID) []node.LID { + i := 0 + for _, lid := range in { + if cnt%n == 0 { + in[i] = lid + i++ + } + cnt++ } + return in[:i] } - return out, mids[:i] } func tryConvertToBatchedTree(evalTree node.Node) (node.BatchedNode, bool) { From c12ce702edb167f6636821339dc097e07d9ba6e7 Mon Sep 17 00:00:00 2001 From: Evgenii Guguchkin Date: Fri, 19 Jun 2026 12:52:54 +0300 Subject: [PATCH 3/7] downsample hist and aggs too --- frac/fraction_test.go | 239 ++++++++++++++++++++++++--------------- frac/processor/search.go | 11 +- 2 files changed, 153 insertions(+), 97 deletions(-) diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 35e02713d..86ac52cae 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -1918,7 +1918,7 @@ func (s *FractionTestSuite) TestSearchDownsample() { bulkSize = 200 queryAll = "message:*" queryFiltered = "message:started" - tolerancePct = 3 // ±3% tolerance due to probabilistic sampling + eps = 0.01 ) _, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize) @@ -1932,8 +1932,7 @@ func (s *FractionTestSuite) TestSearchDownsample() { // Step 1: verify that all documents are indexed and searchable allResult, err := s.fraction.Search(context.Background(), *s.query(queryAll, baseOpts...)) s.Require().NoError(err, "search for all documents should succeed") - s.Require().Equal(totalDocs, allResult.IDs.Len(), - "all %d documents should be found without downsample", totalDocs) + s.Require().Equal(totalDocs, allResult.IDs.Len(), "all %d documents should be found without downsample", totalDocs) // Step 2: find how many documents match the filtered query (message:started) // This count serves as the baseline for downsample expectations. @@ -1944,48 +1943,40 @@ func (s *FractionTestSuite) TestSearchDownsample() { // Step 3: verify downsample produces approximately expected document counts // With downsample=k, each document has a 1/k probability of being included, - // so we expect approximately total/k documents with ±tolerancePercent tolerance. - tolerance := filteredDocCount * tolerancePct / 100 + // so we expect approximately total/k documents with ±eps. downsampleValues := []int{10, 20, 50, 100} + + assertSampled := func(q string, ds int, total int) { + query := s.query(q, append(baseOpts, withDownsample(uint32(ds)))...) + result, err := s.fraction.Search(context.Background(), *query) + s.Require().NoError(err, "search with downsample=%d should succeed", ds) + act := float64(result.IDs.Len()) + exp := math.Ceil(float64(total) / float64(ds)) + s.Require().InEpsilon(exp, act, eps, "sampled count (%d) should be ~ %d/%d (±%f%%)", int(act), total, ds, eps) + } + for _, ds := range downsampleValues { s.T().Run(fmt.Sprintf("downsample=%d", ds), func(t *testing.T) { - query := s.query(queryFiltered, append(baseOpts, withDownsample(uint32(ds)))...) - result, err := s.fraction.Search(context.Background(), *query) - s.Require().NoError(err, "search with downsample=%d should succeed", ds) - - sampledLen := result.IDs.Len() - expectedCount := filteredDocCount / ds - lowerBound := expectedCount - tolerance - upperBound := expectedCount + tolerance - - s.Require().GreaterOrEqual(sampledLen, lowerBound, - "downsample=%d: sampled count %d should be >= %d (expected %d - tolerance %d)", - ds, sampledLen, lowerBound, expectedCount, tolerance) - s.Require().LessOrEqual(sampledLen, upperBound, - "downsample=%d: sampled count %d should be <= %d (expected %d + tolerance %d)", - ds, sampledLen, upperBound, expectedCount, tolerance) + assertSampled(queryAll, ds, totalDocs) + assertSampled(queryFiltered, ds, filteredDocCount) }) } } func (s *FractionTestSuite) TestSearchDownsampleWithTotal() { const ( - totalDocs = 1000 - bulkSize = 200 - tolerancePct = 3 // percent tolerance for sampled document count + totalDocs = 1000 + bulkSize = 200 + eps = 0.01 ) _, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize) s.insertDocuments(bulks...) - // tolerance window: ±3% of total documents - tolerance := totalDocs * tolerancePct / 100 - // downsample values to test: each should return ~1/ds of total documents downsampleValues := []int{10, 20, 50, 100} for _, ds := range downsampleValues { - ds := ds // capture loop variable s.T().Run(fmt.Sprintf("downsample=%d", ds), func(t *testing.T) { params := s.query( "message:*", @@ -1997,29 +1988,62 @@ func (s *FractionTestSuite) TestSearchDownsampleWithTotal() { result, err := s.fraction.Search(context.Background(), *params) s.Require().NoError(err, "search with downsample=%d failed", ds) - sampledDocs := result.IDs.Len() - expectedDocs := totalDocs / ds // with downsample=k, expect approximately totalDocs/k documents + act := float64(result.IDs.Len()) + exp := math.Ceil(float64(totalDocs) / float64(ds)) // with downsample=k, expect approximately totalDocs/k documents - s.Require().Less(sampledDocs, expectedDocs+tolerance, - "downsample=%d: sampled docs (%d) should be less than expected (%d) + tolerance (%d)", - ds, sampledDocs, expectedDocs, tolerance) - s.Require().Greater(sampledDocs, expectedDocs-tolerance, - "downsample=%d: sampled docs (%d) should be greater than expected (%d) - tolerance (%d)", - ds, sampledDocs, expectedDocs, tolerance) + s.Require().InEpsilon(exp, act, eps, + "sampled docs (%d) should be ~ %d/%d (±%0.2f)", + int(act), totalDocs, ds, eps) - // Total field must always reflect the full document count, regardless of downsample - s.Require().Equal(totalDocs, int(result.Total), - "downsample=%d: total should not be affected by downsample", ds) + s.Require().Equal(totalDocs, int(result.Total), "total should not be affected by downsample") }) } } +func (s *FractionTestSuite) TestSearchDownsampleZeroAndOne() { + const ( + totalDocs = 5000 + bulkSize = 200 + queryAll = "message:*" + ) + + _, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize) + s.insertDocuments(bulks...) + + baseOpts := []searchOption{ + withFrom(fromTime.Format(time.RFC3339Nano)), + withTo(toTime.Format(time.RFC3339Nano)), + } + + // searchAndAssertIDs is a local helper that runs a search with the given options + // and asserts that the result contains exactly totalDocs documents. + searchAndAssertIDs := func(name string, opts ...searchOption) { + s.T().Run(name, func(t *testing.T) { + params := s.query(queryAll, append(baseOpts, opts...)...) + qpr, err := s.fraction.Search(context.Background(), *params) + s.Require().NoError(err, "%s: search failed", name) + s.Require().NotNil(qpr, "%s: search result must not be nil", name) + s.Require().Equal(totalDocs, qpr.IDs.Len(), + "%s: expected %d documents, got %d", name, totalDocs, qpr.IDs.Len()) + }) + } + + // downsample=0 (default) — should return all documents + searchAndAssertIDs("downsample=0 (default)") + + // downsample=0 explicitly — should return all documents + searchAndAssertIDs("downsample=0 (explicit)", withDownsample(0)) + + // downsample=1 — should return all documents + searchAndAssertIDs("downsample=1", withDownsample(1)) +} + func (s *FractionTestSuite) TestSearchDownsampleWithAggAndHist() { const ( - totalDocs = 1000 + totalDocs = 10000 bulkSize = 200 - hist = 8 - downsample = 20 + hist = 1000 + downsample = 3 ) _, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize) @@ -2043,82 +2067,115 @@ func (s *FractionTestSuite) TestSearchDownsampleWithAggAndHist() { s.Require().Greater(len(qprNoDS.Aggs), 0, "should have aggregation results") // Verify the histogram has a reasonable number of buckets. - s.Require().Greater(len(qprNoDS.Histogram), 0, "histogram should have at least one bucket") - s.Require().LessOrEqual(len(qprNoDS.Histogram), totalDocs/hist, - "histogram buckets (%d) should not exceed cntDocs/hist=%d", - len(qprNoDS.Histogram), totalDocs/hist) + actualHist := len(qprNoDS.Histogram) + s.Require().Greater(actualHist, 0, "histogram should have at least one bucket") + s.Require().InEpsilon(totalDocs/hist, actualHist, 0.1, + "histogram buckets (%d) should be ~ cntDocs/hist=%d", + actualHist, totalDocs/hist) s.T().Run("with downsample", func(t *testing.T) { paramsDS := s.query("message:started", append(commonOpts, withDownsample(downsample))...) qprDS, err := s.fraction.Search(context.Background(), *paramsDS) s.Require().NoError(err, "search with downsample=%d failed", downsample) s.Require().NotNil(qprDS, "search result must not be nil") - s.Require().Equal(qprNoDS.Histogram, qprDS.Histogram, "histogram should match without downsample") - assertAggregationsEqual(s, qprNoDS, qprDS) + assertSampledAggs(s, qprNoDS.Aggs, qprDS.Aggs, downsample) + assertSampledHist(s, qprNoDS.Histogram, qprDS.Histogram, downsample) }) }) + } -// assertAggregationsEqual verifies that two search results have identical aggregation data. -func assertAggregationsEqual(s *FractionTestSuite, expected, actual *seq.QPR) { - s.Require().Equal(len(expected.Aggs), len(actual.Aggs), - "number of aggregation groups should be the same; "+ - "aggregations are computed on the full document set and are not affected by downsample") +func assertSampledAggs(s *FractionTestSuite, expected, actual []seq.AggregatableSamples, ds uint32) { + const ( + distEps = 0.25 + totalEps = 0.05 + ) + + s.Require().Equal(len(expected), len(actual), + "number of aggregation groups: expected %d, got %d", + len(expected), len(actual)) - for i := range expected.Aggs { - expAgg := &expected.Aggs[i] - actAgg := &actual.Aggs[i] + for i := range expected { + // convert aggregations to token → Total maps + expMap := samplesToMap(expected[i].SamplesByBin) + actMap := samplesToMap(actual[i].SamplesByBin) - s.Require().Equal(len(expAgg.SamplesByBin), len(actAgg.SamplesByBin), - "number of aggregation bins should be the same for agg group %d", i) + // calculate totals and distributions + expTotal := sumMap(expMap) + actTotal := sumMap(actMap) + expDist := buildDistMap(expMap, expTotal) + actDist := buildDistMap(actMap, actTotal) - for bin, expSample := range expAgg.SamplesByBin { - actSample, ok := actAgg.SamplesByBin[bin] - s.Require().True(ok, "bin %v should exist in downsample results for agg group %d", bin, i) - // Total count is computed from the full document set and must match exactly. - s.Require().Equal(expSample.Total, actSample.Total, - "aggregation total for bin %v in agg group %d should be the same "+ - "(aggregations are computed on the full document set, not sampled)", bin, i) - } + assertDistEqual(s, expDist, actDist, distEps, "aggs") + assertTotalScaled(s, expTotal, actTotal, ds, totalEps, "aggs") } } -func (s *FractionTestSuite) TestSearchDownsampleZeroAndOne() { +func assertSampledHist(s *FractionTestSuite, expected, actual map[seq.MID]uint64, ds uint32) { const ( - totalDocs = 5000 - bulkSize = 200 - queryAll = "message:*" + distEps = 0.2 + totalEps = 0.05 ) - _, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize) - s.insertDocuments(bulks...) + expTotal := sumMap(expected) + actTotal := sumMap(actual) + expDist := buildDistMap(expected, expTotal) + actDist := buildDistMap(actual, actTotal) - baseOpts := []searchOption{ - withFrom(fromTime.Format(time.RFC3339Nano)), - withTo(toTime.Format(time.RFC3339Nano)), + assertDistEqual(s, expDist, actDist, distEps, "histogram") + assertTotalScaled(s, expTotal, actTotal, ds, totalEps, "histogram") +} + +func sumMap[K comparable](m map[K]uint64) uint64 { + var sum uint64 + for _, v := range m { + sum += v } + return sum +} - // searchAndAssertIDs is a local helper that runs a search with the given options - // and asserts that the result contains exactly totalDocs documents. - searchAndAssertIDs := func(name string, opts ...searchOption) { - s.T().Run(name, func(t *testing.T) { - params := s.query(queryAll, append(baseOpts, opts...)...) - qpr, err := s.fraction.Search(context.Background(), *params) - s.Require().NoError(err, "%s: search failed", name) - s.Require().NotNil(qpr, "%s: search result must not be nil", name) - s.Require().Equal(totalDocs, qpr.IDs.Len(), - "%s: expected %d documents, got %d", name, totalDocs, qpr.IDs.Len()) - }) +func buildDistMap[K comparable](m map[K]uint64, total uint64) map[K]float64 { + dist := make(map[K]float64, len(m)) + if total == 0 { + return dist } + for k, v := range m { + dist[k] = float64(v) / float64(total) + } + return dist +} - // downsample=0 (default) — should return all documents - searchAndAssertIDs("downsample=0 (default)") +func assertDistEqual[K comparable](s *FractionTestSuite, expDist, actDist map[K]float64, eps float64, label string) { + allKeys := make(map[K]struct{}) + for k := range expDist { + allKeys[k] = struct{}{} + } + for k := range actDist { + allKeys[k] = struct{}{} + } - // downsample=0 explicitly — should return all documents - searchAndAssertIDs("downsample=0 (explicit)", withDownsample(0)) + for k := range allKeys { + expVal := expDist[k] + actVal := actDist[k] + s.Assert().InEpsilon(expVal, actVal, eps, + "%s: distribution mismatch for key \"%v\": expected %.2f, got %.2f", + label, k, expVal, actVal) + } +} - // downsample=1 — should return all documents - searchAndAssertIDs("downsample=1", withDownsample(1)) +func assertTotalScaled(s *FractionTestSuite, expTotal, actTotal uint64, ds uint32, eps float64, label string) { + expScaled := float64(expTotal) / float64(ds) + s.Assert().InEpsilon(expScaled, float64(actTotal), eps, + "%s: total count mismatch: expected %.2f (scaled by ds=%d), got %d", + label, expScaled, ds, actTotal) +} + +func samplesToMap(samplesByBin map[seq.AggBin]*seq.SamplesContainer) map[string]uint64 { + res := make(map[string]uint64, len(samplesByBin)) + for bin, sample := range samplesByBin { + res[bin.Token] = uint64(sample.Total) + } + return res } type searchOption func(*processor.SearchParams) error diff --git a/frac/processor/search.go b/frac/processor/search.go index 14176e35b..34cfe1a02 100644 --- a/frac/processor/search.go +++ b/frac/processor/search.go @@ -278,15 +278,14 @@ func iterateEvalTree( } if needIDs > 0 { - if needIDs < len(lidsBatch) { // trim sampled slice to respect the remaining limit - lidsBatch = lidsBatch[:needIDs] - } + needLIDs := min(needIDs, len(lidsBatch)) + timerRID.Start() - rids = idsIndex.GetRIDs(lidsBatch, rids[:0]) + rids = idsIndex.GetRIDs(lidsBatch[:needLIDs], rids[:0]) timerRID.Stop() - // Fill IDs for search - for i := range lidsBatch { + // fill IDs for search + for i := 0; i < needLIDs; i++ { id := seq.ID{MID: mids[i], RID: rids[i]} if i == 0 || lastID != id { // lids increase monotonically, it's enough to compare current id with the last one ids = append(ids, seq.IDSource{ID: id}) From 694b1bd3acdc8d80b5f8d5362db2ffebaf590a53 Mon Sep 17 00:00:00 2001 From: Evgenii Guguchkin Date: Wed, 24 Jun 2026 13:21:45 +0300 Subject: [PATCH 4/7] downsample hist and aggs too. tune eps --- frac/fraction_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 86ac52cae..33615f6b7 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -2087,7 +2087,7 @@ func (s *FractionTestSuite) TestSearchDownsampleWithAggAndHist() { func assertSampledAggs(s *FractionTestSuite, expected, actual []seq.AggregatableSamples, ds uint32) { const ( - distEps = 0.25 + distEps = 0.3 totalEps = 0.05 ) @@ -2113,7 +2113,7 @@ func assertSampledAggs(s *FractionTestSuite, expected, actual []seq.Aggregatable func assertSampledHist(s *FractionTestSuite, expected, actual map[seq.MID]uint64, ds uint32) { const ( - distEps = 0.2 + distEps = 0.3 totalEps = 0.05 ) From cd7185e84fb9dfc686ecbfa6fd20073eabf60652 Mon Sep 17 00:00:00 2001 From: Evgenii Guguchkin Date: Thu, 25 Jun 2026 02:13:41 +0300 Subject: [PATCH 5/7] move downsample param to query --- api/seqproxyapi/v1/seq_proxy_api.proto | 3 +- pkg/seqproxyapi/v1/marshaler_test.go | 2 +- pkg/seqproxyapi/v1/seq_proxy_api.pb.go | 48 ++--- .../v1/seq_proxy_api_vtproto.pb.go | 165 ++++++------------ pkg/storeapi/store_api.pb.go | 2 +- proxyapi/grpc_search.go | 13 +- proxyapi/grpc_v1.go | 2 +- tests/integration_tests/integration_test.go | 12 +- tests/setup/env.go | 37 +++- 9 files changed, 121 insertions(+), 163 deletions(-) diff --git a/api/seqproxyapi/v1/seq_proxy_api.proto b/api/seqproxyapi/v1/seq_proxy_api.proto index 6da046859..934de8589 100644 --- a/api/seqproxyapi/v1/seq_proxy_api.proto +++ b/api/seqproxyapi/v1/seq_proxy_api.proto @@ -168,6 +168,7 @@ message SearchQuery { google.protobuf.Timestamp from = 2; // Lower bound for search (inclusive). google.protobuf.Timestamp to = 3; // Upper bound for search (inclusive). bool explain = 4; // Should request be explained (tracing will be provided with the result). + uint32 downsample = 5; // If set, returns roughly 1 in N documents on a probabilistic basis } // Aggregation function used in request. @@ -217,7 +218,6 @@ message SearchRequest { bool with_total = 4; // Should total number of documents be returned in response. Order order = 5; // Document order ORDER_DESC/ORDER_ASC. string offset_id = 6; // ID offset for pagination. - uint32 downsample = 7; // If set, returns roughly 1 in N documents on a probabilistic basis } message ComplexSearchRequest { @@ -229,7 +229,6 @@ message ComplexSearchRequest { bool with_total = 6; // Should total number of documents be returned in response. Order order = 7; // Document order ORDER_DESC/ORDER_ASC. string offset_id = 8; // ID offset for pagination. - uint32 downsample = 9; // If set, returns roughly 1 in N documents on a probabilistic basis } message SearchResponse { diff --git a/pkg/seqproxyapi/v1/marshaler_test.go b/pkg/seqproxyapi/v1/marshaler_test.go index 5322a5200..08d274623 100644 --- a/pkg/seqproxyapi/v1/marshaler_test.go +++ b/pkg/seqproxyapi/v1/marshaler_test.go @@ -142,6 +142,6 @@ func TestFetchAsyncSearchResultResponseMarshalJSON(t *testing.T) { Progress: 1, DiskUsage: 488, }, - `{"status":"AsyncSearchStatusCanceled","request":{"retention":"3600s","query":{"query":"message:some_message","from":"2025-07-01T05:20:00Z","to":"2025-08-01T05:20:00Z","explain":false},"aggs":[],"withDocs":true,"size":"100"},"response":{"docs":[{"id":"46e48be997010000-e70163d0fa7582e4","data":{"message":"some_message","level":3},"time":"2025-07-08T10:19:08.742Z"}],"hist":{}},"progress":1,"disk_usage":"488","started_at":"2025-07-25T12:25:57.672Z","expires_at":"2025-07-25T13:25:57.672Z","canceled_at":"2025-07-25T12:34:26.577Z"}`, + `{"status":"AsyncSearchStatusCanceled","request":{"retention":"3600s","query":{"query":"message:some_message","from":"2025-07-01T05:20:00Z","to":"2025-08-01T05:20:00Z","explain":false,"downsample":0},"aggs":[],"withDocs":true,"size":"100"},"response":{"docs":[{"id":"46e48be997010000-e70163d0fa7582e4","data":{"message":"some_message","level":3},"time":"2025-07-08T10:19:08.742Z"}],"hist":{}},"progress":1,"disk_usage":"488","started_at":"2025-07-25T12:25:57.672Z","expires_at":"2025-07-25T13:25:57.672Z","canceled_at":"2025-07-25T12:34:26.577Z"}`, ) } diff --git a/pkg/seqproxyapi/v1/seq_proxy_api.pb.go b/pkg/seqproxyapi/v1/seq_proxy_api.pb.go index 29a70a723..9eb79211a 100644 --- a/pkg/seqproxyapi/v1/seq_proxy_api.pb.go +++ b/pkg/seqproxyapi/v1/seq_proxy_api.pb.go @@ -462,10 +462,11 @@ func (x *Histogram) GetBuckets() []*Histogram_Bucket { // General search query for requesting documents. type SearchQuery struct { state protoimpl.MessageState `protogen:"open.v1"` - Query string `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"` // Search query. - From *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=from,proto3" json:"from,omitempty"` // Lower bound for search (inclusive). - To *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=to,proto3" json:"to,omitempty"` // Upper bound for search (inclusive). - Explain bool `protobuf:"varint,4,opt,name=explain,proto3" json:"explain,omitempty"` // Should request be explained (tracing will be provided with the result). + Query string `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"` // Search query. + From *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=from,proto3" json:"from,omitempty"` // Lower bound for search (inclusive). + To *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=to,proto3" json:"to,omitempty"` // Upper bound for search (inclusive). + Explain bool `protobuf:"varint,4,opt,name=explain,proto3" json:"explain,omitempty"` // Should request be explained (tracing will be provided with the result). + Downsample uint32 `protobuf:"varint,5,opt,name=downsample,proto3" json:"downsample,omitempty"` // If set, returns roughly 1 in N documents on a probabilistic basis unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -528,6 +529,13 @@ func (x *SearchQuery) GetExplain() bool { return false } +func (x *SearchQuery) GetDownsample() uint32 { + if x != nil { + return x.Downsample + } + return 0 +} + // Aggregation query. Generally uses `field` and `group_by`, for details, refer to AggFunc definition. type AggQuery struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -719,7 +727,6 @@ type SearchRequest struct { WithTotal bool `protobuf:"varint,4,opt,name=with_total,json=withTotal,proto3" json:"with_total,omitempty"` // Should total number of documents be returned in response. Order Order `protobuf:"varint,5,opt,name=order,proto3,enum=seqproxyapi.v1.Order" json:"order,omitempty"` // Document order ORDER_DESC/ORDER_ASC. OffsetId string `protobuf:"bytes,6,opt,name=offset_id,json=offsetId,proto3" json:"offset_id,omitempty"` // ID offset for pagination. - Downsample uint32 `protobuf:"varint,7,opt,name=downsample,proto3" json:"downsample,omitempty"` // If set, returns roughly 1 in N documents, statistically. unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -796,13 +803,6 @@ func (x *SearchRequest) GetOffsetId() string { return "" } -func (x *SearchRequest) GetDownsample() uint32 { - if x != nil { - return x.Downsample - } - return 0 -} - type ComplexSearchRequest struct { state protoimpl.MessageState `protogen:"open.v1"` Query *SearchQuery `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"` // Search query. @@ -813,7 +813,6 @@ type ComplexSearchRequest struct { WithTotal bool `protobuf:"varint,6,opt,name=with_total,json=withTotal,proto3" json:"with_total,omitempty"` // Should total number of documents be returned in response. Order Order `protobuf:"varint,7,opt,name=order,proto3,enum=seqproxyapi.v1.Order" json:"order,omitempty"` // Document order ORDER_DESC/ORDER_ASC. OffsetId string `protobuf:"bytes,8,opt,name=offset_id,json=offsetId,proto3" json:"offset_id,omitempty"` // ID offset for pagination. - Downsample uint32 `protobuf:"varint,9,opt,name=downsample,proto3" json:"downsample,omitempty"` // If set, returns roughly 1 in N documents, statistically. unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -904,13 +903,6 @@ func (x *ComplexSearchRequest) GetOffsetId() string { return "" } -func (x *ComplexSearchRequest) GetDownsample() uint32 { - if x != nil { - return x.Downsample - } - return 0 -} - type SearchResponse struct { state protoimpl.MessageState `protogen:"open.v1"` // Deprecated: Marked as deprecated in seqproxyapi/v1/seq_proxy_api.proto. @@ -2706,7 +2698,7 @@ var file_seqproxyapi_v1_seq_proxy_api_proto_rawDesc = string([]byte{ 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x64, 0x6f, 0x63, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x2a, 0x0a, 0x02, 0x74, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, - 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x02, 0x74, 0x73, 0x22, 0x99, 0x01, + 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x02, 0x74, 0x73, 0x22, 0xb9, 0x01, 0x0a, 0x0b, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x12, 0x2e, 0x0a, 0x04, 0x66, 0x72, 0x6f, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, @@ -2716,7 +2708,9 @@ var file_seqproxyapi_v1_seq_proxy_api_proto_rawDesc = string([]byte{ 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x02, 0x74, 0x6f, 0x12, 0x18, 0x0a, 0x07, 0x65, 0x78, 0x70, 0x6c, 0x61, 0x69, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, - 0x52, 0x07, 0x65, 0x78, 0x70, 0x6c, 0x61, 0x69, 0x6e, 0x22, 0xba, 0x01, 0x0a, 0x08, 0x41, 0x67, + 0x52, 0x07, 0x65, 0x78, 0x70, 0x6c, 0x61, 0x69, 0x6e, 0x12, 0x1e, 0x0a, 0x0a, 0x64, 0x6f, 0x77, + 0x6e, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x64, + 0x6f, 0x77, 0x6e, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x22, 0xba, 0x01, 0x0a, 0x08, 0x41, 0x67, 0x67, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x12, 0x19, 0x0a, 0x08, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x62, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, @@ -2740,7 +2734,7 @@ var file_seqproxyapi_v1_seq_proxy_api_proto_rawDesc = string([]byte{ 0x6e, 0x12, 0x38, 0x0a, 0x08, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x73, 0x65, 0x71, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x45, 0x78, 0x70, 0x6c, 0x61, 0x69, 0x6e, 0x45, 0x6e, 0x74, 0x72, - 0x79, 0x52, 0x08, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x22, 0xf7, 0x01, 0x0a, 0x0d, + 0x79, 0x52, 0x08, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x22, 0xd7, 0x01, 0x0a, 0x0d, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x31, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x73, 0x65, 0x71, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x65, @@ -2754,9 +2748,7 @@ var file_seqproxyapi_v1_seq_proxy_api_proto_rawDesc = string([]byte{ 0x70, 0x72, 0x6f, 0x78, 0x79, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x52, 0x05, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x12, 0x1b, 0x0a, 0x09, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6f, 0x66, 0x66, - 0x73, 0x65, 0x74, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x64, 0x6f, 0x77, 0x6e, 0x73, 0x61, 0x6d, - 0x70, 0x6c, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x64, 0x6f, 0x77, 0x6e, 0x73, - 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x22, 0xe9, 0x02, 0x0a, 0x14, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, + 0x73, 0x65, 0x74, 0x49, 0x64, 0x22, 0xc9, 0x02, 0x0a, 0x14, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x78, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x31, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x73, 0x65, 0x71, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x53, @@ -2776,9 +2768,7 @@ var file_seqproxyapi_v1_seq_proxy_api_proto_rawDesc = string([]byte{ 0x73, 0x65, 0x71, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x52, 0x05, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x12, 0x1b, 0x0a, 0x09, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, - 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x64, 0x6f, 0x77, 0x6e, - 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x64, 0x6f, - 0x77, 0x6e, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x42, 0x07, 0x0a, 0x05, 0x5f, 0x68, 0x69, 0x73, + 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x49, 0x64, 0x42, 0x07, 0x0a, 0x05, 0x5f, 0x68, 0x69, 0x73, 0x74, 0x22, 0xb0, 0x01, 0x0a, 0x0e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2d, 0x0a, 0x10, 0x70, 0x61, 0x72, 0x74, 0x69, 0x61, 0x6c, 0x5f, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x42, 0x02, diff --git a/pkg/seqproxyapi/v1/seq_proxy_api_vtproto.pb.go b/pkg/seqproxyapi/v1/seq_proxy_api_vtproto.pb.go index b45009b1a..3b4d8b697 100644 --- a/pkg/seqproxyapi/v1/seq_proxy_api_vtproto.pb.go +++ b/pkg/seqproxyapi/v1/seq_proxy_api_vtproto.pb.go @@ -170,6 +170,7 @@ func (m *SearchQuery) CloneVT() *SearchQuery { r.From = (*timestamppb.Timestamp)((*timestamppb1.Timestamp)(m.From).CloneVT()) r.To = (*timestamppb.Timestamp)((*timestamppb1.Timestamp)(m.To).CloneVT()) r.Explain = m.Explain + r.Downsample = m.Downsample if len(m.unknownFields) > 0 { r.unknownFields = make([]byte, len(m.unknownFields)) copy(r.unknownFields, m.unknownFields) @@ -262,7 +263,6 @@ func (m *SearchRequest) CloneVT() *SearchRequest { r.WithTotal = m.WithTotal r.Order = m.Order r.OffsetId = m.OffsetId - r.Downsample = m.Downsample if len(m.unknownFields) > 0 { r.unknownFields = make([]byte, len(m.unknownFields)) copy(r.unknownFields, m.unknownFields) @@ -286,7 +286,6 @@ func (m *ComplexSearchRequest) CloneVT() *ComplexSearchRequest { r.WithTotal = m.WithTotal r.Order = m.Order r.OffsetId = m.OffsetId - r.Downsample = m.Downsample if rhs := m.Aggs; rhs != nil { tmpContainer := make([]*AggQuery, len(rhs)) for k, v := range rhs { @@ -1079,6 +1078,9 @@ func (this *SearchQuery) EqualVT(that *SearchQuery) bool { if this.Explain != that.Explain { return false } + if this.Downsample != that.Downsample { + return false + } return string(this.unknownFields) == string(that.unknownFields) } @@ -1208,9 +1210,6 @@ func (this *SearchRequest) EqualVT(that *SearchRequest) bool { if this.OffsetId != that.OffsetId { return false } - if this.Downsample != that.Downsample { - return false - } return string(this.unknownFields) == string(that.unknownFields) } @@ -1265,9 +1264,6 @@ func (this *ComplexSearchRequest) EqualVT(that *ComplexSearchRequest) bool { if this.OffsetId != that.OffsetId { return false } - if this.Downsample != that.Downsample { - return false - } return string(this.unknownFields) == string(that.unknownFields) } @@ -3023,6 +3019,11 @@ func (m *SearchQuery) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.Downsample != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Downsample)) + i-- + dAtA[i] = 0x28 + } if m.Explain { i-- if m.Explain { @@ -3264,11 +3265,6 @@ func (m *SearchRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } - if m.Downsample != 0 { - i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Downsample)) - i-- - dAtA[i] = 0x38 - } if len(m.OffsetId) > 0 { i -= len(m.OffsetId) copy(dAtA[i:], m.OffsetId) @@ -3344,11 +3340,6 @@ func (m *ComplexSearchRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error) i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } - if m.Downsample != 0 { - i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Downsample)) - i-- - dAtA[i] = 0x48 - } if len(m.OffsetId) > 0 { i -= len(m.OffsetId) copy(dAtA[i:], m.OffsetId) @@ -5301,6 +5292,11 @@ func (m *SearchQuery) MarshalToSizedBufferVTStrict(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.Downsample != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Downsample)) + i-- + dAtA[i] = 0x28 + } if m.Explain { i-- if m.Explain { @@ -5542,11 +5538,6 @@ func (m *SearchRequest) MarshalToSizedBufferVTStrict(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } - if m.Downsample != 0 { - i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Downsample)) - i-- - dAtA[i] = 0x38 - } if len(m.OffsetId) > 0 { i -= len(m.OffsetId) copy(dAtA[i:], m.OffsetId) @@ -5622,11 +5613,6 @@ func (m *ComplexSearchRequest) MarshalToSizedBufferVTStrict(dAtA []byte) (int, e i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } - if m.Downsample != 0 { - i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Downsample)) - i-- - dAtA[i] = 0x48 - } if len(m.OffsetId) > 0 { i -= len(m.OffsetId) copy(dAtA[i:], m.OffsetId) @@ -7372,6 +7358,9 @@ func (m *SearchQuery) SizeVT() (n int) { if m.Explain { n += 2 } + if m.Downsample != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.Downsample)) + } n += len(m.unknownFields) return n } @@ -7468,9 +7457,6 @@ func (m *SearchRequest) SizeVT() (n int) { if l > 0 { n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } - if m.Downsample != 0 { - n += 1 + protohelpers.SizeOfVarint(uint64(m.Downsample)) - } n += len(m.unknownFields) return n } @@ -7511,9 +7497,6 @@ func (m *ComplexSearchRequest) SizeVT() (n int) { if l > 0 { n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } - if m.Downsample != 0 { - n += 1 + protohelpers.SizeOfVarint(uint64(m.Downsample)) - } n += len(m.unknownFields) return n } @@ -8994,6 +8977,25 @@ func (m *SearchQuery) UnmarshalVT(dAtA []byte) error { } } m.Explain = bool(v != 0) + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Downsample", wireType) + } + m.Downsample = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Downsample |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -9647,25 +9649,6 @@ func (m *SearchRequest) UnmarshalVT(dAtA []byte) error { } m.OffsetId = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex - case 7: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Downsample", wireType) - } - m.Downsample = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return protohelpers.ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Downsample |= uint32(b&0x7F) << shift - if b < 0x80 { - break - } - } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -9932,25 +9915,6 @@ func (m *ComplexSearchRequest) UnmarshalVT(dAtA []byte) error { } m.OffsetId = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex - case 9: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Downsample", wireType) - } - m.Downsample = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return protohelpers.ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Downsample |= uint32(b&0x7F) << shift - if b < 0x80 { - break - } - } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -14494,6 +14458,25 @@ func (m *SearchQuery) UnmarshalVTUnsafe(dAtA []byte) error { } } m.Explain = bool(v != 0) + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Downsample", wireType) + } + m.Downsample = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Downsample |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -15171,25 +15154,6 @@ func (m *SearchRequest) UnmarshalVTUnsafe(dAtA []byte) error { } m.OffsetId = stringValue iNdEx = postIndex - case 7: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Downsample", wireType) - } - m.Downsample = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return protohelpers.ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Downsample |= uint32(b&0x7F) << shift - if b < 0x80 { - break - } - } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -15460,25 +15424,6 @@ func (m *ComplexSearchRequest) UnmarshalVTUnsafe(dAtA []byte) error { } m.OffsetId = stringValue iNdEx = postIndex - case 9: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Downsample", wireType) - } - m.Downsample = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return protohelpers.ErrIntOverflow - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Downsample |= uint32(b&0x7F) << shift - if b < 0x80 { - break - } - } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/pkg/storeapi/store_api.pb.go b/pkg/storeapi/store_api.pb.go index e5b74bbb6..80aabf744 100644 --- a/pkg/storeapi/store_api.pb.go +++ b/pkg/storeapi/store_api.pb.go @@ -447,7 +447,7 @@ type SearchRequest struct { Aggs []*AggQuery `protobuf:"bytes,12,rep,name=aggs,proto3" json:"aggs,omitempty"` Order Order `protobuf:"varint,13,opt,name=order,proto3,enum=api.Order" json:"order,omitempty"` OffsetId string `protobuf:"bytes,14,opt,name=offset_id,json=offsetId,proto3" json:"offset_id,omitempty"` - Downsample uint32 `protobuf:"varint,15,opt,name=downsample,proto3" json:"downsample,omitempty"` // If set, returns roughly 1 in N documents, statistically. + Downsample uint32 `protobuf:"varint,15,opt,name=downsample,proto3" json:"downsample,omitempty"` // If set, returns roughly 1 in N documents on a probabilistic basis unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } diff --git a/proxyapi/grpc_search.go b/proxyapi/grpc_search.go index 59c172343..d5e658073 100644 --- a/proxyapi/grpc_search.go +++ b/proxyapi/grpc_search.go @@ -20,13 +20,12 @@ func (g *grpcV1) Search( } proxyReq := &seqproxyapi.ComplexSearchRequest{ - Query: req.Query, - Size: req.Size, - Offset: req.Offset, - OffsetId: req.OffsetId, - WithTotal: req.WithTotal, - Order: req.Order, - Downsample: req.Downsample, + Query: req.Query, + Size: req.Size, + Offset: req.Offset, + OffsetId: req.OffsetId, + WithTotal: req.WithTotal, + Order: req.Order, } sResp, err := g.doSearch(ctx, proxyReq, true, nil) if err != nil { diff --git a/proxyapi/grpc_v1.go b/proxyapi/grpc_v1.go index 0763e7c61..6946067bc 100644 --- a/proxyapi/grpc_v1.go +++ b/proxyapi/grpc_v1.go @@ -245,7 +245,7 @@ func (g *grpcV1) doSearch( WithTotal: req.WithTotal, ShouldFetch: shouldFetch, Order: req.Order.MustDocsOrder(), - Downsample: req.Downsample, + Downsample: req.Query.Downsample, } if len(req.Aggs) > 0 { diff --git a/tests/integration_tests/integration_test.go b/tests/integration_tests/integration_test.go index 67efa3252..db1f3a546 100644 --- a/tests/integration_tests/integration_test.go +++ b/tests/integration_tests/integration_test.go @@ -1371,21 +1371,21 @@ func (s *IntegrationTestSuite) TestDownsamplePropagation() { opts = append(opts, setup.WithDownsample(*tc.downsample)) } - qpr, _, _, err := env.Search(`service:a`, math.MaxInt32, opts...) - r.NoError(err, "store search with %s should succeed", tc.name) + resp := env.HTTPSearch(t, `service:a`, math.MaxInt32, opts...) + r.Equal(seqproxyapi.ErrorCode_ERROR_CODE_NO, resp.Error.Code, "store search with %s should succeed", tc.name) if tc.wantAll { - r.Equal(totalDocs, len(qpr.IDs), "store search %s: should return all %d docs", tc.name, totalDocs) + r.Equal(totalDocs, len(resp.Docs), "store search %s: should return all %d docs", tc.name, totalDocs) } else { - r.Greater(len(qpr.IDs), 0, "store search %s: should return at least some results", tc.name) + r.Greater(len(resp.Docs), 0, "store search %s: should return at least some results", tc.name) // downsample=N: expect approximately total/N docs with ±3% tolerance. ds := int(*tc.downsample) delta := float64(totalDocs/ds) * tolerancePercent - r.InDelta(totalDocs/ds, len(qpr.IDs), delta, + r.InDelta(totalDocs/ds, len(resp.Docs), delta, "store search %s: should return ~%d docs", tc.name, totalDocs/ds) } - r.Equal(uint64(totalDocs), qpr.Total, + r.Equal(int64(totalDocs), resp.Total, "store search %s: Total should reflect full count (%d)", tc.name, totalDocs) } } diff --git a/tests/setup/env.go b/tests/setup/env.go index 73b4eea20..86d1703a8 100644 --- a/tests/setup/env.go +++ b/tests/setup/env.go @@ -8,17 +8,18 @@ import ( "net" "path/filepath" "runtime" + "testing" "time" - "go.uber.org/atomic" - "go.uber.org/zap" - "github.com/alecthomas/units" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/google/uuid" + "go.uber.org/atomic" + "go.uber.org/zap" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/ozontech/seq-db/buildinfo" "github.com/ozontech/seq-db/consts" @@ -27,6 +28,7 @@ import ( "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/mappingprovider" "github.com/ozontech/seq-db/network/circuitbreaker" + "github.com/ozontech/seq-db/pkg/seqproxyapi/v1" "github.com/ozontech/seq-db/proxy/bulk" "github.com/ozontech/seq-db/proxy/search" "github.com/ozontech/seq-db/proxy/stores" @@ -345,7 +347,7 @@ func MakeIngestors(cfg *TestingEnvConfig, hot, cold [][]string) []*Ingestor { API: proxyapi.APIConfig{ SearchTimeout: 10 * time.Minute, // long enough for debugging purposes with a debugger ExportTimeout: 10 * time.Minute, // the same (debugging purposes) - QueryRateLimit: 0, + QueryRateLimit: 10000, // todo: support no ratelimit if == 0 EsVersion: "test", GatewayAddr: grpcLis.Addr().String(), }, @@ -579,7 +581,7 @@ func WithDownsample(downsample uint32) SearchOption { } } -func (t *TestingEnv) Search(q string, size int, options ...SearchOption) (*seq.QPR, [][]byte, time.Duration, error) { +func (t *TestingEnv) buildRequest(q string, size int, options ...SearchOption) *search.SearchRequest { sr := &search.SearchRequest{ Explain: false, Q: []byte(q), @@ -591,10 +593,33 @@ func (t *TestingEnv) Search(q string, size int, options ...SearchOption) (*seq.Q ShouldFetch: true, Order: seq.DocsOrderDesc, } - for _, option := range options { option(sr) } + return sr +} + +func (t *TestingEnv) HTTPSearch(tt *testing.T, q string, size int, options ...SearchOption) *seqproxyapi.SearchResponse { + sr := t.buildRequest(q, size, options...) + + return SearchHTTP(tt, t.IngestorSearchAddr(), &seqproxyapi.SearchRequest{ + Query: &seqproxyapi.SearchQuery{ + Query: string(sr.Q), + From: timestamppb.New(sr.From.Time()), + To: timestamppb.New(sr.To.Time()), + Explain: sr.Explain, + Downsample: sr.Downsample, + }, + Size: int64(sr.Size), + Offset: int64(sr.Offset), + WithTotal: sr.WithTotal, + Order: seqproxyapi.Order(sr.Order), + OffsetId: sr.OffsetId, + }) +} + +func (t *TestingEnv) Search(q string, size int, options ...SearchOption) (*seq.QPR, [][]byte, time.Duration, error) { + sr := t.buildRequest(q, size, options...) var docs [][]byte qpr, docsStream, duration, err := t.Ingestor().SearchIngestor.Search(context.Background(), sr, nil) From 461352b86aa5e82e7d6037cc14a6ea69bf63b936 Mon Sep 17 00:00:00 2001 From: Evgenii Guguchkin Date: Thu, 25 Jun 2026 17:23:08 +0300 Subject: [PATCH 6/7] randomize initial sampler counter to prevent guaranteed inclusion of first doc per fraction --- frac/processor/search.go | 3 ++- frac/processor/search_params.go | 3 +++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/frac/processor/search.go b/frac/processor/search.go index 34cfe1a02..61b33a9de 100644 --- a/frac/processor/search.go +++ b/frac/processor/search.go @@ -4,6 +4,7 @@ import ( "context" "errors" "math" + "math/rand/v2" "sync" "time" @@ -335,7 +336,7 @@ func sampler(n uint32) func(in []node.LID) []node.LID { } } - var cnt uint32 + cnt := rand.Uint32N(n) return func(in []node.LID) []node.LID { i := 0 for _, lid := range in { diff --git a/frac/processor/search_params.go b/frac/processor/search_params.go index 4d7279d78..182c83977 100644 --- a/frac/processor/search_params.go +++ b/frac/processor/search_params.go @@ -76,6 +76,9 @@ func (p SearchParams) MarshalLogObject(enc zapcore.ObjectEncoder) error { return err } } + if p.Downsample > 1 { + enc.AddUint32("Downsample", p.Downsample) + } return nil } From e36221cebf27db9135710ccce4d83ae10adf1df5 Mon Sep 17 00:00:00 2001 From: Evgenii Guguchkin Date: Thu, 25 Jun 2026 19:00:20 +0300 Subject: [PATCH 7/7] make test more accurate --- frac/fraction_test.go | 41 +++++++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 33615f6b7..28c4d135d 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -1918,7 +1918,7 @@ func (s *FractionTestSuite) TestSearchDownsample() { bulkSize = 200 queryAll = "message:*" queryFiltered = "message:started" - eps = 0.01 + eps = 0.05 ) _, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize) @@ -1947,12 +1947,18 @@ func (s *FractionTestSuite) TestSearchDownsample() { downsampleValues := []int{10, 20, 50, 100} assertSampled := func(q string, ds int, total int) { + actSum := 0 + actCnt := 0 query := s.query(q, append(baseOpts, withDownsample(uint32(ds)))...) - result, err := s.fraction.Search(context.Background(), *query) - s.Require().NoError(err, "search with downsample=%d should succeed", ds) - act := float64(result.IDs.Len()) - exp := math.Ceil(float64(total) / float64(ds)) - s.Require().InEpsilon(exp, act, eps, "sampled count (%d) should be ~ %d/%d (±%f%%)", int(act), total, ds, eps) + for range 100 { + result, err := s.fraction.Search(s.T().Context(), *query) + s.Require().NoError(err, "search with downsample=%d should succeed", ds) + actSum += result.IDs.Len() + actCnt++ + } + act := float64(actSum) / float64(actCnt) + exp := float64(total) / float64(ds) + s.Require().InEpsilon(exp, act, eps, "sampled count (%.2f) should be ~ %d/%d (±%f%%)", act, total, ds, eps) } for _, ds := range downsampleValues { @@ -1967,7 +1973,7 @@ func (s *FractionTestSuite) TestSearchDownsampleWithTotal() { const ( totalDocs = 1000 bulkSize = 200 - eps = 0.01 + eps = 0.05 ) _, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize) @@ -1985,17 +1991,20 @@ func (s *FractionTestSuite) TestSearchDownsampleWithTotal() { withDownsample(uint32(ds)), withTotal(), ) - result, err := s.fraction.Search(context.Background(), *params) - s.Require().NoError(err, "search with downsample=%d failed", ds) - - act := float64(result.IDs.Len()) - exp := math.Ceil(float64(totalDocs) / float64(ds)) // with downsample=k, expect approximately totalDocs/k documents - s.Require().InEpsilon(exp, act, eps, - "sampled docs (%d) should be ~ %d/%d (±%0.2f)", - int(act), totalDocs, ds, eps) + actSum := 0 + actCnt := 0 + for range 100 { + result, err := s.fraction.Search(s.T().Context(), *params) + s.Require().NoError(err, "search with downsample=%d failed", ds) + s.Require().Equal(totalDocs, int(result.Total), "total should not be affected by downsample") + actSum += result.IDs.Len() + actCnt++ + } + act := float64(actSum) / float64(actCnt) + exp := float64(totalDocs) / float64(ds) // with downsample=k, expect approximately totalDocs/k documents + s.Require().InEpsilon(exp, act, eps, "sampled docs (%.2f) should be ~ %d/%d (±%0.2f)", act, totalDocs, ds, eps) - s.Require().Equal(totalDocs, int(result.Total), "total should not be affected by downsample") }) } }