From 80e5ed16ff48a40fa52c3affb0c3041b2174691c Mon Sep 17 00:00:00 2001 From: IlyaasK <86218345+IlyaasK@users.noreply.github.com> Date: Wed, 24 Jun 2026 21:03:32 +0000 Subject: [PATCH] Optimize telemetry event publishing --- server/lib/events/event.go | 36 ++++++- server/lib/events/events_test.go | 107 +++++++++++++++++++ server/lib/events/eventstream.go | 20 +--- server/lib/events/eventstream_bench_test.go | 49 +++++++++ server/lib/events/ringbuffer.go | 92 ++++++++++++---- server/lib/telemetry/telemetry.go | 10 +- server/lib/telemetry/telemetry_bench_test.go | 37 +++++++ server/lib/telemetry/telemetry_test.go | 17 +++ 8 files changed, 326 insertions(+), 42 deletions(-) create mode 100644 server/lib/events/eventstream_bench_test.go create mode 100644 server/lib/telemetry/telemetry_bench_test.go diff --git a/server/lib/events/event.go b/server/lib/events/event.go index ab4f4270..307ef444 100644 --- a/server/lib/events/event.go +++ b/server/lib/events/event.go @@ -12,6 +12,15 @@ import ( // maxS2RecordBytes is the maximum record size for the S2 event pipeline (1 MB). const maxS2RecordBytes = 1_000_000 +// Below this conservative size estimate, JSON overhead cannot push a record +// near the S2 limit, so Publish can avoid a full marshal on the hot path. +const truncateMarshalThreshold = maxS2RecordBytes - 64*1024 + +const ( + jsonEnvelopeOverhead = 512 + jsonMetadataEntryOverhead = 16 +) + const ( Console = oapi.TelemetryEventCategory("console") Network = oapi.TelemetryEventCategory("network") @@ -97,8 +106,11 @@ type Envelope struct { // truncateIfNeeded marshals env and returns the (possibly truncated) envelope. // If the envelope still exceeds maxS2RecordBytes after nulling data (e.g. huge -// source.metadata), it is returned as-is, callers must handle nil data. +// source.metadata), it is returned as-is with data set to null. func truncateIfNeeded(env Envelope) (Envelope, []byte) { + if estimatedEnvelopeBytes(env) < truncateMarshalThreshold { + return env, nil + } data, err := json.Marshal(env) if err != nil { return env, nil @@ -113,7 +125,27 @@ func truncateIfNeeded(env Envelope) (Envelope, []byte) { return env, nil } if len(data) > maxS2RecordBytes { - slog.Warn("truncateIfNeeded: envelope exceeds limit even without data", "seq", env.Seq, "size", len(data)) + slog.Warn("truncateIfNeeded: envelope exceeds limit even without data", "size", len(data)) } return env, data } + +func estimatedEnvelopeBytes(env Envelope) int { + n := jsonEnvelopeOverhead + len(env.Event.Data) + n += maxEscapedJSONLen(env.Event.Type) + n += maxEscapedJSONLen(string(env.Event.Category)) + n += maxEscapedJSONLen(string(env.Event.Source.Kind)) + if env.Event.Source.Event != nil { + n += maxEscapedJSONLen(*env.Event.Source.Event) + } + if env.Event.Source.Metadata != nil { + for k, v := range *env.Event.Source.Metadata { + n += jsonMetadataEntryOverhead + maxEscapedJSONLen(k) + maxEscapedJSONLen(v) + } + } + return n +} + +func maxEscapedJSONLen(s string) int { + return len(s) * 6 +} diff --git a/server/lib/events/events_test.go b/server/lib/events/events_test.go index 86a5744f..2117f2a8 100644 --- a/server/lib/events/events_test.go +++ b/server/lib/events/events_test.go @@ -3,6 +3,7 @@ package events import ( "context" "encoding/json" + "strings" "sync" "testing" "time" @@ -118,6 +119,44 @@ func TestEventOmitEmpty(t *testing.T) { assert.NotContains(t, s, `"event"`) } +func TestTruncateIfNeededChecksLargeMetadata(t *testing.T) { + metadata := map[string]string{"large": strings.Repeat("x", maxS2RecordBytes)} + env := Envelope{ + Event: Event{ + Type: "console.log", + Category: Console, + Source: oapi.BrowserEventSource{Kind: oapi.Cdp, Metadata: &metadata}, + Data: json.RawMessage(`{"message":"hello"}`), + }, + } + + truncated, data := truncateIfNeeded(env) + + require.NotNil(t, data) + assert.True(t, truncated.Event.Truncated) + assert.Equal(t, json.RawMessage("null"), truncated.Event.Data) + assert.Greater(t, len(data), maxS2RecordBytes) +} + +func TestTruncateIfNeededChecksEscapedMetadata(t *testing.T) { + metadata := map[string]string{"escaped": strings.Repeat("<", maxS2RecordBytes/5)} + env := Envelope{ + Event: Event{ + Type: "console.log", + Category: Console, + Source: oapi.BrowserEventSource{Kind: oapi.Cdp, Metadata: &metadata}, + Data: json.RawMessage(`{"message":"hello"}`), + }, + } + + truncated, data := truncateIfNeeded(env) + + require.NotNil(t, data) + assert.True(t, truncated.Event.Truncated) + assert.Equal(t, json.RawMessage("null"), truncated.Event.Data) + assert.Greater(t, len(data), maxS2RecordBytes) +} + func mkEnv(seq uint64, ev Event) Envelope { return Envelope{Seq: seq, Event: ev} } @@ -133,6 +172,74 @@ func newTestRingBuffer(t *testing.T, capacity int) *ringBuffer { return rb } +func TestEventStreamPublishAssignsSeq(t *testing.T) { + es, err := NewEventStream(EventStreamConfig{RingCapacity: 10}) + require.NoError(t, err) + reader := es.NewReader(0) + + first := es.Publish(Envelope{Event: cdpEvent("console.log", Console)}) + second := es.Publish(Envelope{Event: cdpEvent("network.request", Network)}) + + assert.Equal(t, uint64(1), first.Seq) + assert.Equal(t, uint64(2), second.Seq) + assert.Equal(t, uint64(2), es.Seq()) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + assert.Equal(t, uint64(1), readEnvelope(t, reader, ctx).Seq) + assert.Equal(t, uint64(2), readEnvelope(t, reader, ctx).Seq) +} + +func TestEventStreamPublishTruncatesWithAssignedSeq(t *testing.T) { + es, err := NewEventStream(EventStreamConfig{RingCapacity: 10}) + require.NoError(t, err) + nextSeq := uint64(1_000_000_000_000_000_000) + es.ring.latestSeq = nextSeq - 1 + + env := Envelope{ + Event: Event{ + Type: "console.log", + Category: Console, + Source: oapi.BrowserEventSource{Kind: oapi.Cdp}, + Data: json.RawMessage(`""`), + }, + } + seqZeroLen := marshaledEnvelopeLen(t, env) + env.Seq = nextSeq + nextSeqLen := marshaledEnvelopeLen(t, env) + payloadLen := maxS2RecordBytes - seqZeroLen + payload := json.RawMessage(`"` + strings.Repeat("x", payloadLen) + `"`) + seqZeroEnv := Envelope{Event: Event{ + Type: "console.log", + Category: Console, + Source: oapi.BrowserEventSource{Kind: oapi.Cdp}, + Data: payload, + }} + nextSeqEnv := seqZeroEnv + nextSeqEnv.Seq = nextSeq + require.LessOrEqual(t, marshaledEnvelopeLen(t, seqZeroEnv), maxS2RecordBytes) + require.Greater(t, marshaledEnvelopeLen(t, nextSeqEnv), maxS2RecordBytes) + require.Greater(t, nextSeqLen-seqZeroLen, 0) + + published := es.Publish(Envelope{Event: Event{ + Type: "console.log", + Category: Console, + Source: oapi.BrowserEventSource{Kind: oapi.Cdp}, + Data: payload, + }}) + + assert.Equal(t, nextSeq, published.Seq) + assert.True(t, published.Event.Truncated) + assert.Equal(t, json.RawMessage("null"), published.Event.Data) +} + +func marshaledEnvelopeLen(t *testing.T, env Envelope) int { + t.Helper() + data, err := json.Marshal(env) + require.NoError(t, err) + return len(data) +} + // TestRingBuffer: publish 3 envelopes; reader reads all 3 in order func TestRingBuffer(t *testing.T) { rb := newTestRingBuffer(t, 10) diff --git a/server/lib/events/eventstream.go b/server/lib/events/eventstream.go index 371061d4..ceb53ba6 100644 --- a/server/lib/events/eventstream.go +++ b/server/lib/events/eventstream.go @@ -2,14 +2,11 @@ package events import ( "fmt" - "sync" ) -// EventStream is the process-lifetime event bus. It owns the ring buffer and -// sequence counter, which outlive individual capture sessions. +// EventStream is the process-lifetime event bus. Its ring buffer and sequence +// counter outlive individual capture sessions. type EventStream struct { - mu sync.Mutex - seq uint64 ring *ringBuffer } @@ -29,14 +26,7 @@ func NewEventStream(cfg EventStreamConfig) (*EventStream, error) { // Publish assigns a monotonically increasing seq to env, truncates oversized // payloads, and pushes it to the ring buffer. func (es *EventStream) Publish(env Envelope) Envelope { - es.mu.Lock() - es.seq++ - env.Seq = es.seq - es.mu.Unlock() - - env, _ = truncateIfNeeded(env) - es.ring.publish(env) - return env + return es.ring.publishNext(env) } // NewReader returns a Reader positioned after afterSeq. Pass 0 to start from @@ -47,7 +37,5 @@ func (es *EventStream) NewReader(afterSeq uint64) *Reader { // Seq returns the sequence number of the last published event. func (es *EventStream) Seq() uint64 { - es.mu.Lock() - defer es.mu.Unlock() - return es.seq + return es.ring.seq() } diff --git a/server/lib/events/eventstream_bench_test.go b/server/lib/events/eventstream_bench_test.go new file mode 100644 index 00000000..9f4ebc41 --- /dev/null +++ b/server/lib/events/eventstream_bench_test.go @@ -0,0 +1,49 @@ +package events + +import ( + "encoding/json" + "testing" + + oapi "github.com/kernel/kernel-images/server/lib/oapi" +) + +func benchmarkEvent() Event { + return Event{ + Ts: 123456789, + Type: "console.log", + Category: Console, + Source: oapi.BrowserEventSource{Kind: oapi.Cdp}, + Data: json.RawMessage(`{"message":"hello","level":"log"}`), + } +} + +func BenchmarkEventStreamPublish(b *testing.B) { + es, err := NewEventStream(EventStreamConfig{RingCapacity: 1024}) + if err != nil { + b.Fatal(err) + } + ev := benchmarkEvent() + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + es.Publish(Envelope{Event: ev}) + } +} + +func BenchmarkEventStreamPublishRead(b *testing.B) { + es, err := NewEventStream(EventStreamConfig{RingCapacity: 1024}) + if err != nil { + b.Fatal(err) + } + reader := es.NewReader(0) + ev := benchmarkEvent() + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + es.Publish(Envelope{Event: ev}) + res, ok := reader.TryRead() + if !ok || res.Envelope == nil { + b.Fatal("expected envelope") + } + } +} diff --git a/server/lib/events/ringbuffer.go b/server/lib/events/ringbuffer.go index e5dfec84..e41d1369 100644 --- a/server/lib/events/ringbuffer.go +++ b/server/lib/events/ringbuffer.go @@ -13,7 +13,8 @@ type ringBuffer struct { buf []Envelope cap uint64 latestSeq uint64 // highest envelope.Seq published - readerWake chan struct{} // closed-and-replaced on each Publish to wake blocked readers + readerWake chan struct{} // closed-and-replaced when blocked readers need a wakeup + waiters int } func newRingBuffer(capacity int) (*ringBuffer, error) { @@ -36,21 +37,56 @@ func (rb *ringBuffer) reset() { rb.buf[i] = Envelope{} } rb.latestSeq = 0 - old := rb.readerWake - rb.readerWake = make(chan struct{}) + var old chan struct{} + if rb.waiters > 0 { + old = rb.readerWake + rb.readerWake = make(chan struct{}) + } rb.mu.Unlock() - close(old) + if old != nil { + close(old) + } } -// publish adds an envelope to the ring, evicting the oldest on overflow. -func (rb *ringBuffer) publish(env Envelope) { - rb.mu.Lock() +func (rb *ringBuffer) publishLocked(env Envelope) chan struct{} { rb.buf[env.Seq%rb.cap] = env rb.latestSeq = env.Seq + if rb.waiters == 0 { + return nil + } old := rb.readerWake rb.readerWake = make(chan struct{}) + return old +} + +func (rb *ringBuffer) closeWake(old chan struct{}) { + if old != nil { + close(old) + } +} + +// publish adds an envelope to the ring, evicting the oldest on overflow. +func (rb *ringBuffer) publish(env Envelope) { + rb.mu.Lock() + old := rb.publishLocked(env) + rb.mu.Unlock() + rb.closeWake(old) +} + +func (rb *ringBuffer) publishNext(env Envelope) Envelope { + rb.mu.Lock() + env.Seq = rb.latestSeq + 1 + env, _ = truncateIfNeeded(env) + old := rb.publishLocked(env) rb.mu.Unlock() - close(old) + rb.closeWake(old) + return env +} + +func (rb *ringBuffer) seq() uint64 { + rb.mu.RLock() + defer rb.mu.RUnlock() + return rb.latestSeq } func (rb *ringBuffer) oldestSeq() uint64 { @@ -108,7 +144,7 @@ func (r *Reader) TryRead() (ReadResult, bool) { // Read blocks until the next envelope is available or ctx is cancelled. func (r *Reader) Read(ctx context.Context) (ReadResult, error) { for { - r.rb.mu.RLock() + r.rb.mu.Lock() wake := r.rb.readerWake latest := r.rb.latestSeq oldest := r.rb.oldestSeq() @@ -117,35 +153,49 @@ func (r *Reader) Read(ctx context.Context) (ReadResult, error) { // Buffer is empty (or was just reset). Reset reader position // so it starts from the beginning when new data arrives. r.nextSeq = 1 - r.rb.mu.RUnlock() - select { - case <-ctx.Done(): + r.rb.waiters++ + r.rb.mu.Unlock() + err := waitForWake(ctx, wake) + r.rb.mu.Lock() + r.rb.waiters-- + r.rb.mu.Unlock() + if err != nil { return ReadResult{}, ctx.Err() - case <-wake: - continue } + continue } if r.nextSeq < oldest { dropped := oldest - r.nextSeq r.nextSeq = oldest - r.rb.mu.RUnlock() + r.rb.mu.Unlock() return ReadResult{Dropped: dropped}, nil } if r.nextSeq <= latest { env := r.rb.buf[r.nextSeq%r.rb.cap] r.nextSeq++ - r.rb.mu.RUnlock() + r.rb.mu.Unlock() return ReadResult{Envelope: &env}, nil } - r.rb.mu.RUnlock() - - select { - case <-ctx.Done(): + r.rb.waiters++ + r.rb.mu.Unlock() + err := waitForWake(ctx, wake) + r.rb.mu.Lock() + r.rb.waiters-- + r.rb.mu.Unlock() + if err != nil { return ReadResult{}, ctx.Err() - case <-wake: } } } + +func waitForWake(ctx context.Context, wake <-chan struct{}) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-wake: + return nil + } +} diff --git a/server/lib/telemetry/telemetry.go b/server/lib/telemetry/telemetry.go index 576c8ecd..6c66e47c 100644 --- a/server/lib/telemetry/telemetry.go +++ b/server/lib/telemetry/telemetry.go @@ -31,6 +31,7 @@ type TelemetrySession struct { id string sessionStartSeq uint64 categories map[oapi.TelemetryEventCategory]struct{} + sessionMetadata *map[string]string appliedAt time.Time } @@ -68,6 +69,8 @@ func (s *TelemetrySession) Start(telemetrySessionID string, cfg TelemetryConfig) s.sessionStartSeq = s.es.Seq() s.appliedAt = time.Now() s.categories = categorySet(cfg.Categories) + metadata := map[string]string{"telemetry_session_id": telemetrySessionID} + s.sessionMetadata = &metadata } // publishLocked stamps telemetry_session_id into ev.Source.Metadata and forwards to the bus. @@ -77,10 +80,10 @@ func (s *TelemetrySession) publishLocked(ev events.Event) events.Envelope { ev.Ts = time.Now().UnixMicro() } if ev.Source.Metadata == nil { - m := make(map[string]string) - ev.Source.Metadata = &m + ev.Source.Metadata = s.sessionMetadata + } else { + (*ev.Source.Metadata)["telemetry_session_id"] = s.id } - (*ev.Source.Metadata)["telemetry_session_id"] = s.id return s.es.Publish(events.Envelope{Event: ev}) } @@ -176,5 +179,6 @@ func (s *TelemetrySession) Stop() { s.mu.Lock() defer s.mu.Unlock() s.id = "" + s.sessionMetadata = nil s.appliedAt = time.Time{} } diff --git a/server/lib/telemetry/telemetry_bench_test.go b/server/lib/telemetry/telemetry_bench_test.go new file mode 100644 index 00000000..a49f629e --- /dev/null +++ b/server/lib/telemetry/telemetry_bench_test.go @@ -0,0 +1,37 @@ +package telemetry + +import ( + "encoding/json" + "testing" + + "github.com/kernel/kernel-images/server/lib/events" + oapi "github.com/kernel/kernel-images/server/lib/oapi" +) + +func benchmarkTelemetryEvent() events.Event { + return events.Event{ + Ts: 123456789, + Type: "console.log", + Category: events.Console, + Source: oapi.BrowserEventSource{Kind: oapi.Cdp}, + Data: json.RawMessage(`{"message":"hello","level":"log"}`), + } +} + +func BenchmarkTelemetrySessionPublish(b *testing.B) { + es, err := events.NewEventStream(events.EventStreamConfig{RingCapacity: 1024}) + if err != nil { + b.Fatal(err) + } + ts := NewTelemetrySession(es) + ts.Start("bench-session", TelemetryConfig{Categories: events.UserCategories}) + ev := benchmarkTelemetryEvent() + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + if _, ok := ts.Publish(ev); !ok { + b.Fatal("expected event to publish") + } + } +} diff --git a/server/lib/telemetry/telemetry_test.go b/server/lib/telemetry/telemetry_test.go index 18d2929d..3de8e056 100644 --- a/server/lib/telemetry/telemetry_test.go +++ b/server/lib/telemetry/telemetry_test.go @@ -102,6 +102,23 @@ func TestTelemetrySession(t *testing.T) { assert.Equal(t, "ev.three", env.Event.Type) }) + t.Run("nil_metadata_events_keep_original_session_id", func(t *testing.T) { + ts := NewTelemetrySession(newTestEventStream(t, 100)) + reader := ts.NewReader(0) + + ts.Start("session-1", TelemetryConfig{Categories: events.UserCategories}) + ts.Publish(events.Event{Type: "page.navigation", Category: events.Page, Source: oapi.BrowserEventSource{Kind: oapi.Cdp}, Ts: 1}) + ts.Start("session-2", TelemetryConfig{Categories: events.UserCategories}) + ts.Publish(events.Event{Type: "page.navigation", Category: events.Page, Source: oapi.BrowserEventSource{Kind: oapi.Cdp}, Ts: 2}) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + first := readEnvelope(t, reader, ctx) + second := readEnvelope(t, reader, ctx) + assert.Equal(t, "session-1", telemetrySessionIDFromMetadata(t, first.Event.Source)) + assert.Equal(t, "session-2", telemetrySessionIDFromMetadata(t, second.Event.Source)) + }) + t.Run("publish_increments_seq", func(t *testing.T) { ts := newTestTelemetrySession(t) reader := ts.NewReader(0)