Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions api/seqproxyapi/v1/seq_proxy_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package seqproxyapi.v1;

import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";

import "google/api/annotations.proto";

// seq-db public api. Exposes APIs related to document querying.
Expand Down Expand Up @@ -167,6 +168,7 @@ message SearchQuery {
google.protobuf.Timestamp from = 2; // Lower bound for search (inclusive).
google.protobuf.Timestamp to = 3; // Upper bound for search (inclusive).
bool explain = 4; // Should request be explained (tracing will be provided with the result).
uint32 downsample = 5; // If set, returns roughly 1 in N documents on a probabilistic basis
}

// Aggregation function used in request.
Expand Down Expand Up @@ -219,14 +221,14 @@ message SearchRequest {
}

message ComplexSearchRequest {
SearchQuery query = 1; // Search query.
repeated AggQuery aggs = 2; // List of aggregation queries.
optional HistQuery hist = 3; // Histogram query.
int64 size = 4; // Maximum number of documents to return.
int64 offset = 5; // Search offset.
bool with_total = 6; // Should total number of documents be returned in response.
Order order = 7; // Document order ORDER_DESC/ORDER_ASC.
string offset_id = 8; // ID offset for pagination.
SearchQuery query = 1; // Search query.
repeated AggQuery aggs = 2; // List of aggregation queries.
optional HistQuery hist = 3; // Histogram query.
int64 size = 4; // Maximum number of documents to return.
int64 offset = 5; // Search offset.
bool with_total = 6; // Should total number of documents be returned in response.
Order order = 7; // Document order ORDER_DESC/ORDER_ASC.
string offset_id = 8; // ID offset for pagination.
}

message SearchResponse {
Expand Down
1 change: 1 addition & 0 deletions api/storeapi/store_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ message SearchRequest {
repeated AggQuery aggs = 12;
Order order = 13;
string offset_id = 14;
uint32 downsample = 15; // If set, returns roughly 1 in N documents on a probabilistic basis
}

message SearchResponse {
Expand Down
282 changes: 282 additions & 0 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1912,6 +1912,281 @@ func (s *FractionTestSuite) TestFractionInfo() {
}
}

func (s *FractionTestSuite) TestSearchDownsample() {
const (
totalDocs = 5000
bulkSize = 200
queryAll = "message:*"
queryFiltered = "message:started"
eps = 0.05
)

_, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize)
s.insertDocuments(bulks...)

baseOpts := []searchOption{
withFrom(fromTime.Format(time.RFC3339Nano)),
withTo(toTime.Format(time.RFC3339Nano)),
}

// Step 1: verify that all documents are indexed and searchable
allResult, err := s.fraction.Search(context.Background(), *s.query(queryAll, baseOpts...))
s.Require().NoError(err, "search for all documents should succeed")
s.Require().Equal(totalDocs, allResult.IDs.Len(), "all %d documents should be found without downsample", totalDocs)

// Step 2: find how many documents match the filtered query (message:started)
// This count serves as the baseline for downsample expectations.
filteredResult, err := s.fraction.Search(context.Background(), *s.query(queryFiltered, baseOpts...))
s.Require().NoError(err, "search for filtered documents should succeed")
filteredDocCount := filteredResult.IDs.Len()
s.Require().Greater(filteredDocCount, 0, "at least one document should match %q", queryFiltered)

// Step 3: verify downsample produces approximately expected document counts
// With downsample=k, each document has a 1/k probability of being included,
// so we expect approximately total/k documents with ±eps.
downsampleValues := []int{10, 20, 50, 100}

assertSampled := func(q string, ds int, total int) {
actSum := 0
actCnt := 0
query := s.query(q, append(baseOpts, withDownsample(uint32(ds)))...)
for range 100 {
result, err := s.fraction.Search(s.T().Context(), *query)
s.Require().NoError(err, "search with downsample=%d should succeed", ds)
actSum += result.IDs.Len()
actCnt++
}
act := float64(actSum) / float64(actCnt)
exp := float64(total) / float64(ds)
s.Require().InEpsilon(exp, act, eps, "sampled count (%.2f) should be ~ %d/%d (±%f%%)", act, total, ds, eps)
}

for _, ds := range downsampleValues {
s.T().Run(fmt.Sprintf("downsample=%d", ds), func(t *testing.T) {
assertSampled(queryAll, ds, totalDocs)
assertSampled(queryFiltered, ds, filteredDocCount)
})
}
}

func (s *FractionTestSuite) TestSearchDownsampleWithTotal() {
const (
totalDocs = 1000
bulkSize = 200
eps = 0.05
)

_, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize)
s.insertDocuments(bulks...)

// downsample values to test: each should return ~1/ds of total documents
downsampleValues := []int{10, 20, 50, 100}

for _, ds := range downsampleValues {
s.T().Run(fmt.Sprintf("downsample=%d", ds), func(t *testing.T) {
params := s.query(
"message:*",
withFrom(fromTime.Format(time.RFC3339Nano)),
withTo(toTime.Format(time.RFC3339Nano)),
withDownsample(uint32(ds)),
withTotal(),
)

actSum := 0
actCnt := 0
for range 100 {
result, err := s.fraction.Search(s.T().Context(), *params)
s.Require().NoError(err, "search with downsample=%d failed", ds)
s.Require().Equal(totalDocs, int(result.Total), "total should not be affected by downsample")
actSum += result.IDs.Len()
actCnt++
}
act := float64(actSum) / float64(actCnt)
exp := float64(totalDocs) / float64(ds) // with downsample=k, expect approximately totalDocs/k documents
s.Require().InEpsilon(exp, act, eps, "sampled docs (%.2f) should be ~ %d/%d (±%0.2f)", act, totalDocs, ds, eps)

})
}
}

func (s *FractionTestSuite) TestSearchDownsampleZeroAndOne() {
const (
totalDocs = 5000
bulkSize = 200
queryAll = "message:*"
)

_, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize)
s.insertDocuments(bulks...)

baseOpts := []searchOption{
withFrom(fromTime.Format(time.RFC3339Nano)),
withTo(toTime.Format(time.RFC3339Nano)),
}

// searchAndAssertIDs is a local helper that runs a search with the given options
// and asserts that the result contains exactly totalDocs documents.
searchAndAssertIDs := func(name string, opts ...searchOption) {
s.T().Run(name, func(t *testing.T) {
params := s.query(queryAll, append(baseOpts, opts...)...)
qpr, err := s.fraction.Search(context.Background(), *params)
s.Require().NoError(err, "%s: search failed", name)
s.Require().NotNil(qpr, "%s: search result must not be nil", name)
s.Require().Equal(totalDocs, qpr.IDs.Len(),
"%s: expected %d documents, got %d", name, totalDocs, qpr.IDs.Len())
})
}

// downsample=0 (default) — should return all documents
searchAndAssertIDs("downsample=0 (default)")

// downsample=0 explicitly — should return all documents
searchAndAssertIDs("downsample=0 (explicit)", withDownsample(0))

// downsample=1 — should return all documents
searchAndAssertIDs("downsample=1", withDownsample(1))
}

func (s *FractionTestSuite) TestSearchDownsampleWithAggAndHist() {
Comment thread
eguguchkin marked this conversation as resolved.
const (
totalDocs = 10000
bulkSize = 200
hist = 1000
downsample = 3
)

_, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize)
s.insertDocuments(bulks...)

commonOpts := []searchOption{
withFrom(fromTime.Format(time.RFC3339Nano)),
withTo(toTime.Format(time.RFC3339Nano)),
withHist(uint64(hist)),
withAggQuery(processor.AggQuery{
GroupBy: aggField("service"),
Func: seq.AggFuncCount,
}),
}

s.T().Run("without downsample", func(t *testing.T) {
paramsNoDS := s.query("message:started", commonOpts...)
qprNoDS, err := s.fraction.Search(context.Background(), *paramsNoDS)
s.Require().NoError(err, "search without downsample failed")
s.Require().NotNil(qprNoDS, "search result must not be nil")
s.Require().Greater(len(qprNoDS.Aggs), 0, "should have aggregation results")

// Verify the histogram has a reasonable number of buckets.
actualHist := len(qprNoDS.Histogram)
s.Require().Greater(actualHist, 0, "histogram should have at least one bucket")
s.Require().InEpsilon(totalDocs/hist, actualHist, 0.1,
"histogram buckets (%d) should be ~ cntDocs/hist=%d",
actualHist, totalDocs/hist)

s.T().Run("with downsample", func(t *testing.T) {
paramsDS := s.query("message:started", append(commonOpts, withDownsample(downsample))...)
qprDS, err := s.fraction.Search(context.Background(), *paramsDS)
s.Require().NoError(err, "search with downsample=%d failed", downsample)
s.Require().NotNil(qprDS, "search result must not be nil")
assertSampledAggs(s, qprNoDS.Aggs, qprDS.Aggs, downsample)
assertSampledHist(s, qprNoDS.Histogram, qprDS.Histogram, downsample)
})
})

}

func assertSampledAggs(s *FractionTestSuite, expected, actual []seq.AggregatableSamples, ds uint32) {
const (
distEps = 0.3
totalEps = 0.05
)

s.Require().Equal(len(expected), len(actual),
"number of aggregation groups: expected %d, got %d",
len(expected), len(actual))

for i := range expected {
// convert aggregations to token → Total maps
expMap := samplesToMap(expected[i].SamplesByBin)
actMap := samplesToMap(actual[i].SamplesByBin)

// calculate totals and distributions
expTotal := sumMap(expMap)
actTotal := sumMap(actMap)
expDist := buildDistMap(expMap, expTotal)
actDist := buildDistMap(actMap, actTotal)

assertDistEqual(s, expDist, actDist, distEps, "aggs")
assertTotalScaled(s, expTotal, actTotal, ds, totalEps, "aggs")
}
}

func assertSampledHist(s *FractionTestSuite, expected, actual map[seq.MID]uint64, ds uint32) {
const (
distEps = 0.3
totalEps = 0.05
)

expTotal := sumMap(expected)
actTotal := sumMap(actual)
expDist := buildDistMap(expected, expTotal)
actDist := buildDistMap(actual, actTotal)

assertDistEqual(s, expDist, actDist, distEps, "histogram")
assertTotalScaled(s, expTotal, actTotal, ds, totalEps, "histogram")
}

func sumMap[K comparable](m map[K]uint64) uint64 {
var sum uint64
for _, v := range m {
sum += v
}
return sum
}

func buildDistMap[K comparable](m map[K]uint64, total uint64) map[K]float64 {
dist := make(map[K]float64, len(m))
if total == 0 {
return dist
}
for k, v := range m {
dist[k] = float64(v) / float64(total)
}
return dist
}

func assertDistEqual[K comparable](s *FractionTestSuite, expDist, actDist map[K]float64, eps float64, label string) {
allKeys := make(map[K]struct{})
for k := range expDist {
allKeys[k] = struct{}{}
}
for k := range actDist {
allKeys[k] = struct{}{}
}

for k := range allKeys {
expVal := expDist[k]
actVal := actDist[k]
s.Assert().InEpsilon(expVal, actVal, eps,
"%s: distribution mismatch for key \"%v\": expected %.2f, got %.2f",
label, k, expVal, actVal)
}
}

func assertTotalScaled(s *FractionTestSuite, expTotal, actTotal uint64, ds uint32, eps float64, label string) {
expScaled := float64(expTotal) / float64(ds)
s.Assert().InEpsilon(expScaled, float64(actTotal), eps,
"%s: total count mismatch: expected %.2f (scaled by ds=%d), got %d",
label, expScaled, ds, actTotal)
}

func samplesToMap(samplesByBin map[seq.AggBin]*seq.SamplesContainer) map[string]uint64 {
res := make(map[string]uint64, len(samplesByBin))
for bin, sample := range samplesByBin {
res[bin.Token] = uint64(sample.Total)
}
return res
}

type searchOption func(*processor.SearchParams) error

func (s *FractionTestSuite) query(queryString string, options ...searchOption) *processor.SearchParams {
Expand Down Expand Up @@ -1976,6 +2251,13 @@ func withHist(histInterval uint64) searchOption {
}
}

func withDownsample(k uint32) searchOption {
return func(sp *processor.SearchParams) error {
sp.Downsample = k
return nil
}
}

func aggField(field string) *parser.Literal {
searchAll := []parser.Term{{
Kind: parser.TermSymbol, Data: "*",
Expand Down
Loading
Loading