diff --git a/api/seqproxyapi/v1/seq_proxy_api.proto b/api/seqproxyapi/v1/seq_proxy_api.proto index 22ca2f1da..934de8589 100644 --- a/api/seqproxyapi/v1/seq_proxy_api.proto +++ b/api/seqproxyapi/v1/seq_proxy_api.proto @@ -6,6 +6,7 @@ package seqproxyapi.v1; import "google/protobuf/duration.proto"; import "google/protobuf/timestamp.proto"; + import "google/api/annotations.proto"; // seq-db public api. Exposes APIs related to document querying. @@ -167,6 +168,7 @@ message SearchQuery { google.protobuf.Timestamp from = 2; // Lower bound for search (inclusive). google.protobuf.Timestamp to = 3; // Upper bound for search (inclusive). bool explain = 4; // Should request be explained (tracing will be provided with the result). + uint32 downsample = 5; // If set, returns roughly 1 in N documents on a probabilistic basis } // Aggregation function used in request. @@ -219,14 +221,14 @@ message SearchRequest { } message ComplexSearchRequest { - SearchQuery query = 1; // Search query. - repeated AggQuery aggs = 2; // List of aggregation queries. - optional HistQuery hist = 3; // Histogram query. - int64 size = 4; // Maximum number of documents to return. - int64 offset = 5; // Search offset. - bool with_total = 6; // Should total number of documents be returned in response. - Order order = 7; // Document order ORDER_DESC/ORDER_ASC. - string offset_id = 8; // ID offset for pagination. + SearchQuery query = 1; // Search query. + repeated AggQuery aggs = 2; // List of aggregation queries. + optional HistQuery hist = 3; // Histogram query. + int64 size = 4; // Maximum number of documents to return. + int64 offset = 5; // Search offset. + bool with_total = 6; // Should total number of documents be returned in response. + Order order = 7; // Document order ORDER_DESC/ORDER_ASC. + string offset_id = 8; // ID offset for pagination. } message SearchResponse { diff --git a/api/storeapi/store_api.proto b/api/storeapi/store_api.proto index 43a343b81..d3b2e2b42 100644 --- a/api/storeapi/store_api.proto +++ b/api/storeapi/store_api.proto @@ -77,6 +77,7 @@ message SearchRequest { repeated AggQuery aggs = 12; Order order = 13; string offset_id = 14; + uint32 downsample = 15; // If set, returns roughly 1 in N documents on a probabilistic basis } message SearchResponse { diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 244aeb99f..28c4d135d 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -1912,6 +1912,281 @@ func (s *FractionTestSuite) TestFractionInfo() { } } +func (s *FractionTestSuite) TestSearchDownsample() { + const ( + totalDocs = 5000 + bulkSize = 200 + queryAll = "message:*" + queryFiltered = "message:started" + eps = 0.05 + ) + + _, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize) + s.insertDocuments(bulks...) + + baseOpts := []searchOption{ + withFrom(fromTime.Format(time.RFC3339Nano)), + withTo(toTime.Format(time.RFC3339Nano)), + } + + // Step 1: verify that all documents are indexed and searchable + allResult, err := s.fraction.Search(context.Background(), *s.query(queryAll, baseOpts...)) + s.Require().NoError(err, "search for all documents should succeed") + s.Require().Equal(totalDocs, allResult.IDs.Len(), "all %d documents should be found without downsample", totalDocs) + + // Step 2: find how many documents match the filtered query (message:started) + // This count serves as the baseline for downsample expectations. + filteredResult, err := s.fraction.Search(context.Background(), *s.query(queryFiltered, baseOpts...)) + s.Require().NoError(err, "search for filtered documents should succeed") + filteredDocCount := filteredResult.IDs.Len() + s.Require().Greater(filteredDocCount, 0, "at least one document should match %q", queryFiltered) + + // Step 3: verify downsample produces approximately expected document counts + // With downsample=k, each document has a 1/k probability of being included, + // so we expect approximately total/k documents with ±eps. + downsampleValues := []int{10, 20, 50, 100} + + assertSampled := func(q string, ds int, total int) { + actSum := 0 + actCnt := 0 + query := s.query(q, append(baseOpts, withDownsample(uint32(ds)))...) + for range 100 { + result, err := s.fraction.Search(s.T().Context(), *query) + s.Require().NoError(err, "search with downsample=%d should succeed", ds) + actSum += result.IDs.Len() + actCnt++ + } + act := float64(actSum) / float64(actCnt) + exp := float64(total) / float64(ds) + s.Require().InEpsilon(exp, act, eps, "sampled count (%.2f) should be ~ %d/%d (±%f%%)", act, total, ds, eps) + } + + for _, ds := range downsampleValues { + s.T().Run(fmt.Sprintf("downsample=%d", ds), func(t *testing.T) { + assertSampled(queryAll, ds, totalDocs) + assertSampled(queryFiltered, ds, filteredDocCount) + }) + } +} + +func (s *FractionTestSuite) TestSearchDownsampleWithTotal() { + const ( + totalDocs = 1000 + bulkSize = 200 + eps = 0.05 + ) + + _, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize) + s.insertDocuments(bulks...) + + // downsample values to test: each should return ~1/ds of total documents + downsampleValues := []int{10, 20, 50, 100} + + for _, ds := range downsampleValues { + s.T().Run(fmt.Sprintf("downsample=%d", ds), func(t *testing.T) { + params := s.query( + "message:*", + withFrom(fromTime.Format(time.RFC3339Nano)), + withTo(toTime.Format(time.RFC3339Nano)), + withDownsample(uint32(ds)), + withTotal(), + ) + + actSum := 0 + actCnt := 0 + for range 100 { + result, err := s.fraction.Search(s.T().Context(), *params) + s.Require().NoError(err, "search with downsample=%d failed", ds) + s.Require().Equal(totalDocs, int(result.Total), "total should not be affected by downsample") + actSum += result.IDs.Len() + actCnt++ + } + act := float64(actSum) / float64(actCnt) + exp := float64(totalDocs) / float64(ds) // with downsample=k, expect approximately totalDocs/k documents + s.Require().InEpsilon(exp, act, eps, "sampled docs (%.2f) should be ~ %d/%d (±%0.2f)", act, totalDocs, ds, eps) + + }) + } +} + +func (s *FractionTestSuite) TestSearchDownsampleZeroAndOne() { + const ( + totalDocs = 5000 + bulkSize = 200 + queryAll = "message:*" + ) + + _, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize) + s.insertDocuments(bulks...) + + baseOpts := []searchOption{ + withFrom(fromTime.Format(time.RFC3339Nano)), + withTo(toTime.Format(time.RFC3339Nano)), + } + + // searchAndAssertIDs is a local helper that runs a search with the given options + // and asserts that the result contains exactly totalDocs documents. + searchAndAssertIDs := func(name string, opts ...searchOption) { + s.T().Run(name, func(t *testing.T) { + params := s.query(queryAll, append(baseOpts, opts...)...) + qpr, err := s.fraction.Search(context.Background(), *params) + s.Require().NoError(err, "%s: search failed", name) + s.Require().NotNil(qpr, "%s: search result must not be nil", name) + s.Require().Equal(totalDocs, qpr.IDs.Len(), + "%s: expected %d documents, got %d", name, totalDocs, qpr.IDs.Len()) + }) + } + + // downsample=0 (default) — should return all documents + searchAndAssertIDs("downsample=0 (default)") + + // downsample=0 explicitly — should return all documents + searchAndAssertIDs("downsample=0 (explicit)", withDownsample(0)) + + // downsample=1 — should return all documents + searchAndAssertIDs("downsample=1", withDownsample(1)) +} + +func (s *FractionTestSuite) TestSearchDownsampleWithAggAndHist() { + const ( + totalDocs = 10000 + bulkSize = 200 + hist = 1000 + downsample = 3 + ) + + _, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize) + s.insertDocuments(bulks...) + + commonOpts := []searchOption{ + withFrom(fromTime.Format(time.RFC3339Nano)), + withTo(toTime.Format(time.RFC3339Nano)), + withHist(uint64(hist)), + withAggQuery(processor.AggQuery{ + GroupBy: aggField("service"), + Func: seq.AggFuncCount, + }), + } + + s.T().Run("without downsample", func(t *testing.T) { + paramsNoDS := s.query("message:started", commonOpts...) + qprNoDS, err := s.fraction.Search(context.Background(), *paramsNoDS) + s.Require().NoError(err, "search without downsample failed") + s.Require().NotNil(qprNoDS, "search result must not be nil") + s.Require().Greater(len(qprNoDS.Aggs), 0, "should have aggregation results") + + // Verify the histogram has a reasonable number of buckets. + actualHist := len(qprNoDS.Histogram) + s.Require().Greater(actualHist, 0, "histogram should have at least one bucket") + s.Require().InEpsilon(totalDocs/hist, actualHist, 0.1, + "histogram buckets (%d) should be ~ cntDocs/hist=%d", + actualHist, totalDocs/hist) + + s.T().Run("with downsample", func(t *testing.T) { + paramsDS := s.query("message:started", append(commonOpts, withDownsample(downsample))...) + qprDS, err := s.fraction.Search(context.Background(), *paramsDS) + s.Require().NoError(err, "search with downsample=%d failed", downsample) + s.Require().NotNil(qprDS, "search result must not be nil") + assertSampledAggs(s, qprNoDS.Aggs, qprDS.Aggs, downsample) + assertSampledHist(s, qprNoDS.Histogram, qprDS.Histogram, downsample) + }) + }) + +} + +func assertSampledAggs(s *FractionTestSuite, expected, actual []seq.AggregatableSamples, ds uint32) { + const ( + distEps = 0.3 + totalEps = 0.05 + ) + + s.Require().Equal(len(expected), len(actual), + "number of aggregation groups: expected %d, got %d", + len(expected), len(actual)) + + for i := range expected { + // convert aggregations to token → Total maps + expMap := samplesToMap(expected[i].SamplesByBin) + actMap := samplesToMap(actual[i].SamplesByBin) + + // calculate totals and distributions + expTotal := sumMap(expMap) + actTotal := sumMap(actMap) + expDist := buildDistMap(expMap, expTotal) + actDist := buildDistMap(actMap, actTotal) + + assertDistEqual(s, expDist, actDist, distEps, "aggs") + assertTotalScaled(s, expTotal, actTotal, ds, totalEps, "aggs") + } +} + +func assertSampledHist(s *FractionTestSuite, expected, actual map[seq.MID]uint64, ds uint32) { + const ( + distEps = 0.3 + totalEps = 0.05 + ) + + expTotal := sumMap(expected) + actTotal := sumMap(actual) + expDist := buildDistMap(expected, expTotal) + actDist := buildDistMap(actual, actTotal) + + assertDistEqual(s, expDist, actDist, distEps, "histogram") + assertTotalScaled(s, expTotal, actTotal, ds, totalEps, "histogram") +} + +func sumMap[K comparable](m map[K]uint64) uint64 { + var sum uint64 + for _, v := range m { + sum += v + } + return sum +} + +func buildDistMap[K comparable](m map[K]uint64, total uint64) map[K]float64 { + dist := make(map[K]float64, len(m)) + if total == 0 { + return dist + } + for k, v := range m { + dist[k] = float64(v) / float64(total) + } + return dist +} + +func assertDistEqual[K comparable](s *FractionTestSuite, expDist, actDist map[K]float64, eps float64, label string) { + allKeys := make(map[K]struct{}) + for k := range expDist { + allKeys[k] = struct{}{} + } + for k := range actDist { + allKeys[k] = struct{}{} + } + + for k := range allKeys { + expVal := expDist[k] + actVal := actDist[k] + s.Assert().InEpsilon(expVal, actVal, eps, + "%s: distribution mismatch for key \"%v\": expected %.2f, got %.2f", + label, k, expVal, actVal) + } +} + +func assertTotalScaled(s *FractionTestSuite, expTotal, actTotal uint64, ds uint32, eps float64, label string) { + expScaled := float64(expTotal) / float64(ds) + s.Assert().InEpsilon(expScaled, float64(actTotal), eps, + "%s: total count mismatch: expected %.2f (scaled by ds=%d), got %d", + label, expScaled, ds, actTotal) +} + +func samplesToMap(samplesByBin map[seq.AggBin]*seq.SamplesContainer) map[string]uint64 { + res := make(map[string]uint64, len(samplesByBin)) + for bin, sample := range samplesByBin { + res[bin.Token] = uint64(sample.Total) + } + return res +} + type searchOption func(*processor.SearchParams) error func (s *FractionTestSuite) query(queryString string, options ...searchOption) *processor.SearchParams { @@ -1976,6 +2251,13 @@ func withHist(histInterval uint64) searchOption { } } +func withDownsample(k uint32) searchOption { + return func(sp *processor.SearchParams) error { + sp.Downsample = k + return nil + } +} + func aggField(field string) *parser.Literal { searchAll := []parser.Term{{ Kind: parser.TermSymbol, Data: "*", diff --git a/frac/processor/search.go b/frac/processor/search.go index 30ed4f45e..61b33a9de 100644 --- a/frac/processor/search.go +++ b/frac/processor/search.go @@ -4,6 +4,7 @@ import ( "context" "errors" "math" + "math/rand/v2" "sync" "time" @@ -43,9 +44,9 @@ type searchIndex interface { } type searchBuffers struct { - lids []node.LID mids []seq.MID rids []seq.RID + lids []node.LID } var searchBuffersPool = sync.Pool{ @@ -209,19 +210,20 @@ func iterateEvalTree( hasHist := params.HasHist() needScanAllRange := params.IsScanAllRequest() - var hist HistMap - if hasHist { - hist = NewHistMap(params.From, params.To, params.HistInterval) - } - var ( total int + hist HistMap lastID seq.ID ids seq.IDSources ) + if hasHist { + hist = NewHistMap(params.From, params.To, params.HistInterval) + } + buffers := searchBuffersPool.Get().(*searchBuffers) defer searchBuffersPool.Put(buffers) + mids := buffers.mids rids := buffers.rids @@ -229,98 +231,123 @@ func iterateEvalTree( timerEval := sw.Timer("eval_tree_next") timerMID := sw.Timer("get_mid") - timerUpdateHist := sw.Timer("update_hist") + timerHist := sw.Timer("update_hist") timerRID := sw.Timer("get_rid") timerAgg := sw.Timer("agg_node_count") + sample := sampler(params.Downsample) + var aggs []Aggregator for { if util.IsCancelled(ctx) { return total, ids, hist, aggs, ctx.Err() } - needMore := len(ids) < params.Limit - if !needMore && !needScanAllRange { + needIDs := params.Limit - len(ids) + if needIDs < 1 && !needScanAllRange { break } - needLIDs := params.Limit - len(ids) - if needScanAllRange { - needLIDs = math.MaxUint32 + + maxBatchSize := needIDs + if needScanAllRange || params.Downsample > 1 { + // if full range scan is required OR downsampling is active, + // we must fetch as many LIDs as possible in one batch. + maxBatchSize = math.MaxUint32 } timerEval.Start() - lidsSlice := batchedEvalTree(needLIDs) + lidsBatch := batchedEvalTree(maxBatchSize) timerEval.Stop() - if len(lidsSlice) == 0 { - break - } + total += len(lidsBatch) - needMIDs := min(params.Limit-len(ids), len(lidsSlice)) - if hasHist { - // need to fetch mids for all lids for hist - needMIDs = len(lidsSlice) + lidsBatch = sample(lidsBatch) + + if len(lidsBatch) == 0 { + break } - // Get MIDs - if needMIDs > 0 { + if hasHist || needIDs > 0 { timerMID.Start() - mids = idsIndex.GetMIDs(lidsSlice[:needMIDs], mids[:0]) + mids = idsIndex.GetMIDs(lidsBatch, mids[:0]) timerMID.Stop() - } - // Get RIDs - // compute number of ids we can get here, since some MIDs might have been filtered out - needIDs := min(params.Limit-len(ids), len(lidsSlice)) - if needIDs > 0 { - timerRID.Start() - rids = idsIndex.GetRIDs(lidsSlice[0:needIDs], rids[:0]) - timerRID.Stop() - } + if hasHist { + timerHist.Start() + hist.Update(mids) + timerHist.Stop() + } - // Fill IDs for search - for i := 0; i < needIDs; i++ { - id := seq.ID{MID: mids[i], RID: rids[i]} + if needIDs > 0 { + needLIDs := min(needIDs, len(lidsBatch)) - if i == 0 || lastID != id { // lids increase monotonically, it's enough to compare current id with the last one - ids = append(ids, seq.IDSource{ID: id}) - } - lastID = id - } + timerRID.Start() + rids = idsIndex.GetRIDs(lidsBatch[:needLIDs], rids[:0]) + timerRID.Stop() - // Update hist map - if hasHist { - timerUpdateHist.Start() - hist.Update(mids) - timerUpdateHist.Stop() + // fill IDs for search + for i := 0; i < needLIDs; i++ { + id := seq.ID{MID: mids[i], RID: rids[i]} + if i == 0 || lastID != id { // lids increase monotonically, it's enough to compare current id with the last one + ids = append(ids, seq.IDSource{ID: id}) + } + lastID = id + } + } } // Update aggregators if params.HasAgg() { - if aggs == nil { - var err error - aggs, err = aggSupplier() // sw timer is activated inside aggSupplier - if err != nil { - return total, ids, hist, nil, err - } + var err error + if aggs, err = updateAggs(aggs, lidsBatch, aggSupplier, timerAgg); err != nil { + return total, ids, hist, aggs, err } + } + } - timerAgg.Start() - for i := range aggs { - for _, lid := range lidsSlice { - if err := aggs[i].Next(lid); err != nil { - timerAgg.Stop() - return total, ids, hist, aggs, err - } - } + return total, ids, hist, aggs, nil +} + +func updateAggs(aggs []Aggregator, lidsSlice []node.LID, aggSupplier func() ([]Aggregator, error), timer *stopwatch.Timer) ([]Aggregator, error) { + if aggs == nil { + var err error + if aggs, err = aggSupplier(); err != nil { // sw timer is activated inside aggSupplier + return nil, err + } + } + + timer.Start() + defer timer.Stop() + + for i := range aggs { + for _, lid := range lidsSlice { + if err := aggs[i].Next(lid); err != nil { + return aggs, err } - timerAgg.Stop() } + } + return aggs, nil +} - total += len(lidsSlice) +func sampler(n uint32) func(in []node.LID) []node.LID { + if n <= 1 { // 0 or 1 -> no sample + return func(in []node.LID) []node.LID { + return in + } } - return total, ids, hist, aggs, nil + cnt := rand.Uint32N(n) + return func(in []node.LID) []node.LID { + i := 0 + for _, lid := range in { + if cnt%n == 0 { + in[i] = lid + i++ + } + cnt++ + } + return in[:i] + } } func tryConvertToBatchedTree(evalTree node.Node) (node.BatchedNode, bool) { diff --git a/frac/processor/search_params.go b/frac/processor/search_params.go index de2594b86..182c83977 100644 --- a/frac/processor/search_params.go +++ b/frac/processor/search_params.go @@ -47,6 +47,8 @@ type SearchParams struct { WithTotal bool Order seq.DocsOrder + + Downsample uint32 } func (p SearchParams) MarshalLogObject(enc zapcore.ObjectEncoder) error { @@ -74,6 +76,9 @@ func (p SearchParams) MarshalLogObject(enc zapcore.ObjectEncoder) error { return err } } + if p.Downsample > 1 { + enc.AddUint32("Downsample", p.Downsample) + } return nil } diff --git a/pkg/seqproxyapi/v1/marshaler_test.go b/pkg/seqproxyapi/v1/marshaler_test.go index 5322a5200..08d274623 100644 --- a/pkg/seqproxyapi/v1/marshaler_test.go +++ b/pkg/seqproxyapi/v1/marshaler_test.go @@ -142,6 +142,6 @@ func TestFetchAsyncSearchResultResponseMarshalJSON(t *testing.T) { Progress: 1, DiskUsage: 488, }, - `{"status":"AsyncSearchStatusCanceled","request":{"retention":"3600s","query":{"query":"message:some_message","from":"2025-07-01T05:20:00Z","to":"2025-08-01T05:20:00Z","explain":false},"aggs":[],"withDocs":true,"size":"100"},"response":{"docs":[{"id":"46e48be997010000-e70163d0fa7582e4","data":{"message":"some_message","level":3},"time":"2025-07-08T10:19:08.742Z"}],"hist":{}},"progress":1,"disk_usage":"488","started_at":"2025-07-25T12:25:57.672Z","expires_at":"2025-07-25T13:25:57.672Z","canceled_at":"2025-07-25T12:34:26.577Z"}`, + `{"status":"AsyncSearchStatusCanceled","request":{"retention":"3600s","query":{"query":"message:some_message","from":"2025-07-01T05:20:00Z","to":"2025-08-01T05:20:00Z","explain":false,"downsample":0},"aggs":[],"withDocs":true,"size":"100"},"response":{"docs":[{"id":"46e48be997010000-e70163d0fa7582e4","data":{"message":"some_message","level":3},"time":"2025-07-08T10:19:08.742Z"}],"hist":{}},"progress":1,"disk_usage":"488","started_at":"2025-07-25T12:25:57.672Z","expires_at":"2025-07-25T13:25:57.672Z","canceled_at":"2025-07-25T12:34:26.577Z"}`, ) } diff --git a/pkg/seqproxyapi/v1/seq_proxy_api.pb.go b/pkg/seqproxyapi/v1/seq_proxy_api.pb.go index eac9fd1cc..9eb79211a 100644 --- a/pkg/seqproxyapi/v1/seq_proxy_api.pb.go +++ b/pkg/seqproxyapi/v1/seq_proxy_api.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.5 -// protoc v5.29.3 +// protoc v7.35.0 // source: seqproxyapi/v1/seq_proxy_api.proto package seqproxyapi @@ -462,10 +462,11 @@ func (x *Histogram) GetBuckets() []*Histogram_Bucket { // General search query for requesting documents. type SearchQuery struct { state protoimpl.MessageState `protogen:"open.v1"` - Query string `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"` // Search query. - From *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=from,proto3" json:"from,omitempty"` // Lower bound for search (inclusive). - To *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=to,proto3" json:"to,omitempty"` // Upper bound for search (inclusive). - Explain bool `protobuf:"varint,4,opt,name=explain,proto3" json:"explain,omitempty"` // Should request be explained (tracing will be provided with the result). + Query string `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"` // Search query. + From *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=from,proto3" json:"from,omitempty"` // Lower bound for search (inclusive). + To *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=to,proto3" json:"to,omitempty"` // Upper bound for search (inclusive). + Explain bool `protobuf:"varint,4,opt,name=explain,proto3" json:"explain,omitempty"` // Should request be explained (tracing will be provided with the result). + Downsample uint32 `protobuf:"varint,5,opt,name=downsample,proto3" json:"downsample,omitempty"` // If set, returns roughly 1 in N documents on a probabilistic basis unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -528,6 +529,13 @@ func (x *SearchQuery) GetExplain() bool { return false } +func (x *SearchQuery) GetDownsample() uint32 { + if x != nil { + return x.Downsample + } + return 0 +} + // Aggregation query. Generally uses `field` and `group_by`, for details, refer to AggFunc definition. type AggQuery struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -2690,7 +2698,7 @@ var file_seqproxyapi_v1_seq_proxy_api_proto_rawDesc = string([]byte{ 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x64, 0x6f, 0x63, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x2a, 0x0a, 0x02, 0x74, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, - 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x02, 0x74, 0x73, 0x22, 0x99, 0x01, + 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x02, 0x74, 0x73, 0x22, 0xb9, 0x01, 0x0a, 0x0b, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x12, 0x2e, 0x0a, 0x04, 0x66, 0x72, 0x6f, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, @@ -2700,7 +2708,9 @@ var file_seqproxyapi_v1_seq_proxy_api_proto_rawDesc = string([]byte{ 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x02, 0x74, 0x6f, 0x12, 0x18, 0x0a, 0x07, 0x65, 0x78, 0x70, 0x6c, 0x61, 0x69, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, - 0x52, 0x07, 0x65, 0x78, 0x70, 0x6c, 0x61, 0x69, 0x6e, 0x22, 0xba, 0x01, 0x0a, 0x08, 0x41, 0x67, + 0x52, 0x07, 0x65, 0x78, 0x70, 0x6c, 0x61, 0x69, 0x6e, 0x12, 0x1e, 0x0a, 0x0a, 0x64, 0x6f, 0x77, + 0x6e, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x64, + 0x6f, 0x77, 0x6e, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x22, 0xba, 0x01, 0x0a, 0x08, 0x41, 0x67, 0x67, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x12, 0x19, 0x0a, 0x08, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x62, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, diff --git a/pkg/seqproxyapi/v1/seq_proxy_api_vtproto.pb.go b/pkg/seqproxyapi/v1/seq_proxy_api_vtproto.pb.go index 97f47d659..3b4d8b697 100644 --- a/pkg/seqproxyapi/v1/seq_proxy_api_vtproto.pb.go +++ b/pkg/seqproxyapi/v1/seq_proxy_api_vtproto.pb.go @@ -170,6 +170,7 @@ func (m *SearchQuery) CloneVT() *SearchQuery { r.From = (*timestamppb.Timestamp)((*timestamppb1.Timestamp)(m.From).CloneVT()) r.To = (*timestamppb.Timestamp)((*timestamppb1.Timestamp)(m.To).CloneVT()) r.Explain = m.Explain + r.Downsample = m.Downsample if len(m.unknownFields) > 0 { r.unknownFields = make([]byte, len(m.unknownFields)) copy(r.unknownFields, m.unknownFields) @@ -1077,6 +1078,9 @@ func (this *SearchQuery) EqualVT(that *SearchQuery) bool { if this.Explain != that.Explain { return false } + if this.Downsample != that.Downsample { + return false + } return string(this.unknownFields) == string(that.unknownFields) } @@ -3015,6 +3019,11 @@ func (m *SearchQuery) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.Downsample != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Downsample)) + i-- + dAtA[i] = 0x28 + } if m.Explain { i-- if m.Explain { @@ -5283,6 +5292,11 @@ func (m *SearchQuery) MarshalToSizedBufferVTStrict(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.Downsample != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Downsample)) + i-- + dAtA[i] = 0x28 + } if m.Explain { i-- if m.Explain { @@ -7344,6 +7358,9 @@ func (m *SearchQuery) SizeVT() (n int) { if m.Explain { n += 2 } + if m.Downsample != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.Downsample)) + } n += len(m.unknownFields) return n } @@ -8960,6 +8977,25 @@ func (m *SearchQuery) UnmarshalVT(dAtA []byte) error { } } m.Explain = bool(v != 0) + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Downsample", wireType) + } + m.Downsample = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Downsample |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -14422,6 +14458,25 @@ func (m *SearchQuery) UnmarshalVTUnsafe(dAtA []byte) error { } } m.Explain = bool(v != 0) + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Downsample", wireType) + } + m.Downsample = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Downsample |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/pkg/storeapi/store_api.pb.go b/pkg/storeapi/store_api.pb.go index 53e881042..80aabf744 100644 --- a/pkg/storeapi/store_api.pb.go +++ b/pkg/storeapi/store_api.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.5 -// protoc v5.29.3 +// protoc v7.35.0 // source: storeapi/store_api.proto package storeapi @@ -447,6 +447,7 @@ type SearchRequest struct { Aggs []*AggQuery `protobuf:"bytes,12,rep,name=aggs,proto3" json:"aggs,omitempty"` Order Order `protobuf:"varint,13,opt,name=order,proto3,enum=api.Order" json:"order,omitempty"` OffsetId string `protobuf:"bytes,14,opt,name=offset_id,json=offsetId,proto3" json:"offset_id,omitempty"` + Downsample uint32 `protobuf:"varint,15,opt,name=downsample,proto3" json:"downsample,omitempty"` // If set, returns roughly 1 in N documents on a probabilistic basis unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -574,6 +575,13 @@ func (x *SearchRequest) GetOffsetId() string { return "" } +func (x *SearchRequest) GetDownsample() uint32 { + if x != nil { + return x.Downsample + } + return 0 +} + type SearchResponse struct { state protoimpl.MessageState `protogen:"open.v1"` // Deprecated: Marked as deprecated in storeapi/store_api.proto. @@ -2172,7 +2180,7 @@ var file_storeapi_store_api_proto_rawDesc = string([]byte{ 0x0a, 0x09, 0x71, 0x75, 0x61, 0x6e, 0x74, 0x69, 0x6c, 0x65, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x01, 0x52, 0x09, 0x71, 0x75, 0x61, 0x6e, 0x74, 0x69, 0x6c, 0x65, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, - 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x4a, 0x04, 0x08, 0x02, 0x10, 0x03, 0x22, 0x85, + 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x4a, 0x04, 0x08, 0x02, 0x10, 0x03, 0x22, 0xa5, 0x03, 0x0a, 0x0d, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x66, 0x72, 0x6f, 0x6d, 0x18, 0x02, @@ -2197,7 +2205,9 @@ var file_storeapi_store_api_proto_rawDesc = string([]byte{ 0x18, 0x0d, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0a, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x52, 0x05, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x12, 0x1b, 0x0a, 0x09, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6f, 0x66, - 0x66, 0x73, 0x65, 0x74, 0x49, 0x64, 0x22, 0xe6, 0x09, 0x0a, 0x0e, 0x53, 0x65, 0x61, 0x72, 0x63, + 0x66, 0x73, 0x65, 0x74, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x64, 0x6f, 0x77, 0x6e, 0x73, 0x61, + 0x6d, 0x70, 0x6c, 0x65, 0x18, 0x0f, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x64, 0x6f, 0x77, 0x6e, + 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x22, 0xe6, 0x09, 0x0a, 0x0e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x42, 0x02, 0x18, 0x01, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x3d, 0x0a, 0x0a, 0x69, 0x64, 0x5f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x18, diff --git a/pkg/storeapi/store_api_vtproto.pb.go b/pkg/storeapi/store_api_vtproto.pb.go index 73a4cce31..fe3d5dc18 100644 --- a/pkg/storeapi/store_api_vtproto.pb.go +++ b/pkg/storeapi/store_api_vtproto.pb.go @@ -121,6 +121,7 @@ func (m *SearchRequest) CloneVT() *SearchRequest { r.AggregationFilter = m.AggregationFilter r.Order = m.Order r.OffsetId = m.OffsetId + r.Downsample = m.Downsample if rhs := m.Aggs; rhs != nil { tmpContainer := make([]*AggQuery, len(rhs)) for k, v := range rhs { @@ -845,6 +846,9 @@ func (this *SearchRequest) EqualVT(that *SearchRequest) bool { if this.OffsetId != that.OffsetId { return false } + if this.Downsample != that.Downsample { + return false + } return string(this.unknownFields) == string(that.unknownFields) } @@ -2296,6 +2300,11 @@ func (m *SearchRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.Downsample != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Downsample)) + i-- + dAtA[i] = 0x78 + } if len(m.OffsetId) > 0 { i -= len(m.OffsetId) copy(dAtA[i:], m.OffsetId) @@ -4108,6 +4117,11 @@ func (m *SearchRequest) MarshalToSizedBufferVTStrict(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.Downsample != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Downsample)) + i-- + dAtA[i] = 0x78 + } if len(m.OffsetId) > 0 { i -= len(m.OffsetId) copy(dAtA[i:], m.OffsetId) @@ -5845,6 +5859,9 @@ func (m *SearchRequest) SizeVT() (n int) { if l > 0 { n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } + if m.Downsample != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.Downsample)) + } n += len(m.unknownFields) return n } @@ -7229,6 +7246,25 @@ func (m *SearchRequest) UnmarshalVT(dAtA []byte) error { } m.OffsetId = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 15: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Downsample", wireType) + } + m.Downsample = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Downsample |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -12056,6 +12092,25 @@ func (m *SearchRequest) UnmarshalVTUnsafe(dAtA []byte) error { } m.OffsetId = stringValue iNdEx = postIndex + case 15: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Downsample", wireType) + } + m.Downsample = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Downsample |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/proxy/search/search_request.go b/proxy/search/search_request.go index 7aa468e24..5d6ba3fb3 100644 --- a/proxy/search/search_request.go +++ b/proxy/search/search_request.go @@ -28,21 +28,23 @@ type SearchRequest struct { WithTotal bool ShouldFetch bool Order seq.DocsOrder + Downsample uint32 } func (sr *SearchRequest) GetAPISearchRequest() *storeapi.SearchRequest { return &storeapi.SearchRequest{ - Query: util.ByteToStringUnsafe(sr.Q), - From: int64(seq.MIDToMillis(sr.From)), - To: int64(seq.MIDToMillis(sr.To)), - Size: int64(sr.Size), - Offset: int64(sr.Offset), - Interval: int64(seq.MIDToMillis(sr.Interval)), - OffsetId: sr.OffsetId, - Aggs: convertToAggsQuery(sr.AggQ), - Explain: sr.Explain, - WithTotal: sr.WithTotal, - Order: storeapi.MustProtoOrder(sr.Order), + Query: util.ByteToStringUnsafe(sr.Q), + From: int64(seq.MIDToMillis(sr.From)), + To: int64(seq.MIDToMillis(sr.To)), + Size: int64(sr.Size), + Offset: int64(sr.Offset), + Interval: int64(seq.MIDToMillis(sr.Interval)), + OffsetId: sr.OffsetId, + Aggs: convertToAggsQuery(sr.AggQ), + Explain: sr.Explain, + WithTotal: sr.WithTotal, + Order: storeapi.MustProtoOrder(sr.Order), + Downsample: sr.Downsample, } } diff --git a/proxyapi/grpc_v1.go b/proxyapi/grpc_v1.go index 79dd341ac..6946067bc 100644 --- a/proxyapi/grpc_v1.go +++ b/proxyapi/grpc_v1.go @@ -245,6 +245,7 @@ func (g *grpcV1) doSearch( WithTotal: req.WithTotal, ShouldFetch: shouldFetch, Order: req.Order.MustDocsOrder(), + Downsample: req.Query.Downsample, } if len(req.Aggs) > 0 { diff --git a/storeapi/grpc_search.go b/storeapi/grpc_search.go index e5eedd980..388312802 100644 --- a/storeapi/grpc_search.go +++ b/storeapi/grpc_search.go @@ -186,6 +186,7 @@ func (g *GrpcV1) doSearch( WithTotal: req.WithTotal, Order: req.Order.MustDocsOrder(), OffsetId: offsetId, + Downsample: req.Downsample, } searchTr := tr.NewChild("search iteratively") diff --git a/tests/integration_tests/integration_test.go b/tests/integration_tests/integration_test.go index d2abf8185..db1f3a546 100644 --- a/tests/integration_tests/integration_test.go +++ b/tests/integration_tests/integration_test.go @@ -1316,6 +1316,85 @@ func TestBigWithReplicasIntegration(t *testing.T) { suite.Run(t, dd) } +func (s *IntegrationTestSuite) TestDownsamplePropagation() { + t := s.T() + r := require.New(t) + + env := setup.NewTestingEnv(s.Config) + defer env.StopAll() + + const ( + docsPerBulk = 200 + tolerancePercent = 0.5 + ) + + // Setup data with status field for aggregations. + bulksNum := getBulkIterationsNum(env) + totalDocs := docsPerBulk * bulksNum + + origDocs := make([]string, docsPerBulk) + for j := 0; j < bulksNum; j++ { + baseIdx := j * docsPerBulk + for i := range origDocs { + origDocs[i] = fmt.Sprintf(`{"service":"a","id":%d,"status":%d}`, baseIdx+i, i%3) + } + setup.Bulk(t, env.IngestorBulkAddr(), origDocs) + } + + type testCase struct { + name string + downsample *uint32 // nil = option not passed + wantAll bool // expect all documents returned + } + + cases := []testCase{{ + name: "no downsample option", + downsample: nil, + wantAll: true, + }, { + name: "downsample=0", + downsample: ptr[uint32](0), + wantAll: true, + }, { + name: "downsample=1", + downsample: ptr[uint32](1), + wantAll: true, + }, { + name: "downsample=10", + downsample: ptr[uint32](10), + wantAll: false, + }} + + for _, tc := range cases { + opts := []setup.SearchOption{setup.NoFetch(), setup.WithTotal(true)} + if tc.downsample != nil { + opts = append(opts, setup.WithDownsample(*tc.downsample)) + } + + resp := env.HTTPSearch(t, `service:a`, math.MaxInt32, opts...) + r.Equal(seqproxyapi.ErrorCode_ERROR_CODE_NO, resp.Error.Code, "store search with %s should succeed", tc.name) + + if tc.wantAll { + r.Equal(totalDocs, len(resp.Docs), "store search %s: should return all %d docs", tc.name, totalDocs) + } else { + r.Greater(len(resp.Docs), 0, "store search %s: should return at least some results", tc.name) + // downsample=N: expect approximately total/N docs with ±3% tolerance. + ds := int(*tc.downsample) + delta := float64(totalDocs/ds) * tolerancePercent + r.InDelta(totalDocs/ds, len(resp.Docs), delta, + "store search %s: should return ~%d docs", tc.name, totalDocs/ds) + } + + r.Equal(int64(totalDocs), resp.Total, + "store search %s: Total should reflect full count (%d)", tc.name, totalDocs) + } +} + +// ptr returns a pointer to the given value. +func ptr[T any](v T) *T { + return &v +} + func (s *IntegrationTestSuite) TestDocuments() { n := 32 env, origDocs := s.envWithDummyDocs(n) diff --git a/tests/setup/env.go b/tests/setup/env.go index a42ba006e..86d1703a8 100644 --- a/tests/setup/env.go +++ b/tests/setup/env.go @@ -8,17 +8,18 @@ import ( "net" "path/filepath" "runtime" + "testing" "time" - "go.uber.org/atomic" - "go.uber.org/zap" - "github.com/alecthomas/units" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/google/uuid" + "go.uber.org/atomic" + "go.uber.org/zap" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/ozontech/seq-db/buildinfo" "github.com/ozontech/seq-db/consts" @@ -27,6 +28,7 @@ import ( "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/mappingprovider" "github.com/ozontech/seq-db/network/circuitbreaker" + "github.com/ozontech/seq-db/pkg/seqproxyapi/v1" "github.com/ozontech/seq-db/proxy/bulk" "github.com/ozontech/seq-db/proxy/search" "github.com/ozontech/seq-db/proxy/stores" @@ -345,7 +347,7 @@ func MakeIngestors(cfg *TestingEnvConfig, hot, cold [][]string) []*Ingestor { API: proxyapi.APIConfig{ SearchTimeout: 10 * time.Minute, // long enough for debugging purposes with a debugger ExportTimeout: 10 * time.Minute, // the same (debugging purposes) - QueryRateLimit: 0, + QueryRateLimit: 10000, // todo: support no ratelimit if == 0 EsVersion: "test", GatewayAddr: grpcLis.Addr().String(), }, @@ -573,7 +575,13 @@ func WithOrder(o seq.DocsOrder) SearchOption { } } -func (t *TestingEnv) Search(q string, size int, options ...SearchOption) (*seq.QPR, [][]byte, time.Duration, error) { +func WithDownsample(downsample uint32) SearchOption { + return func(sr *search.SearchRequest) { + sr.Downsample = downsample + } +} + +func (t *TestingEnv) buildRequest(q string, size int, options ...SearchOption) *search.SearchRequest { sr := &search.SearchRequest{ Explain: false, Q: []byte(q), @@ -585,10 +593,33 @@ func (t *TestingEnv) Search(q string, size int, options ...SearchOption) (*seq.Q ShouldFetch: true, Order: seq.DocsOrderDesc, } - for _, option := range options { option(sr) } + return sr +} + +func (t *TestingEnv) HTTPSearch(tt *testing.T, q string, size int, options ...SearchOption) *seqproxyapi.SearchResponse { + sr := t.buildRequest(q, size, options...) + + return SearchHTTP(tt, t.IngestorSearchAddr(), &seqproxyapi.SearchRequest{ + Query: &seqproxyapi.SearchQuery{ + Query: string(sr.Q), + From: timestamppb.New(sr.From.Time()), + To: timestamppb.New(sr.To.Time()), + Explain: sr.Explain, + Downsample: sr.Downsample, + }, + Size: int64(sr.Size), + Offset: int64(sr.Offset), + WithTotal: sr.WithTotal, + Order: seqproxyapi.Order(sr.Order), + OffsetId: sr.OffsetId, + }) +} + +func (t *TestingEnv) Search(q string, size int, options ...SearchOption) (*seq.QPR, [][]byte, time.Duration, error) { + sr := t.buildRequest(q, size, options...) var docs [][]byte qpr, docsStream, duration, err := t.Ingestor().SearchIngestor.Search(context.Background(), sr, nil)