From 47092200c5a2e22a7867f97e062192b952dc3851 Mon Sep 17 00:00:00 2001 From: Dylan Tientcheu Date: Thu, 16 Apr 2026 15:15:20 +0200 Subject: [PATCH 1/3] fix: restore events tail streaming Wire the requested Insights region into the client config and poll overlapping event windows so tail reliably surfaces live events instead of timing out or skipping them. Made-with: Cursor --- api/insights/client.go | 1 + api/insights/client_test.go | 21 ++++++++++ pkg/cmd/events/tail/tail.go | 46 ++++++++++++++++++--- pkg/cmd/events/tail/tail_test.go | 68 ++++++++++++++++++++++++++++++++ 4 files changed, 131 insertions(+), 5 deletions(-) create mode 100644 api/insights/client_test.go create mode 100644 pkg/cmd/events/tail/tail_test.go diff --git a/api/insights/client.go b/api/insights/client.go index f74f48b9..93bfe5f7 100644 --- a/api/insights/client.go +++ b/api/insights/client.go @@ -31,6 +31,7 @@ func NewClient(appID, apiKey string, region algoliaInsights.Region) (*Client, er ApiKey: apiKey, UserAgent: userAgent, }, + Region: region, } client, err := algoliaInsights.NewClientWithConfig(clientConfig) if err != nil { diff --git a/api/insights/client_test.go b/api/insights/client_test.go new file mode 100644 index 00000000..2736086c --- /dev/null +++ b/api/insights/client_test.go @@ -0,0 +1,21 @@ +package insights + +import ( + "reflect" + "testing" + + algoliaInsights "github.com/algolia/algoliasearch-client-go/v4/algolia/insights" + "github.com/stretchr/testify/require" +) + +func TestNewClientSetsRequestedRegion(t *testing.T) { + client, err := NewClient("test-app-id", "test-api-key", algoliaInsights.DE) + require.NoError(t, err) + + cfg := client.GetConfiguration() + require.Equal(t, algoliaInsights.DE, cfg.Region) + require.NotEmpty(t, cfg.Hosts) + + host := reflect.ValueOf(cfg.Hosts[0]).FieldByName("host").String() + require.Equal(t, "insights.de.algolia.io", host) +} diff --git a/pkg/cmd/events/tail/tail.go b/pkg/cmd/events/tail/tail.go index 64053ab0..c4894f49 100644 --- a/pkg/cmd/events/tail/tail.go +++ b/pkg/cmd/events/tail/tail.go @@ -113,10 +113,15 @@ func runTailCmd(opts *TailOptions) error { fmt.Fprint(opts.IO.Out, "\nWaiting for events... Press Ctrl+C to stop.\n") } - c := time.Tick(Interval) - for t := range c { - utc := t.UTC() - events, err := client.GetEvents(utc.Add(-1*time.Second), utc, 1000) + ticker := time.NewTicker(Interval) + defer ticker.Stop() + + windowStart := time.Now().UTC().Add(-Interval) + seenRequestIDs := map[string]time.Time{} + + for { + windowEnd := time.Now().UTC() + events, err := client.GetEvents(windowStart, windowEnd, 1000) if err != nil { if strings.Contains(err.Error(), "The log processing region does not match") { cs := opts.IO.ColorScheme() @@ -127,9 +132,12 @@ func runTailCmd(opts *TailOptions) error { `, cs.FailureIcon(), opts.Region) return errors.New(errDetails) } + + return err } - for _, event := range events.Events { + pruneSeenRequestIDs(seenRequestIDs, windowStart.Add(-Interval)) + for _, event := range unseenEvents(events.Events, seenRequestIDs) { if p != nil { if err := p.Print(opts.IO, event); err != nil { return err @@ -140,11 +148,39 @@ func runTailCmd(opts *TailOptions) error { } } } + + windowStart = windowEnd.Add(-Interval) + <-ticker.C } return nil } +func unseenEvents(events []insights.EventWrapper, seenRequestIDs map[string]time.Time) []insights.EventWrapper { + freshEvents := make([]insights.EventWrapper, 0, len(events)) + for _, event := range events { + requestID := event.RequestID + if requestID != "" { + if _, ok := seenRequestIDs[requestID]; ok { + continue + } + seenRequestIDs[requestID] = event.Event.Timestamp.Time + } + + freshEvents = append(freshEvents, event) + } + + return freshEvents +} + +func pruneSeenRequestIDs(seenRequestIDs map[string]time.Time, cutoff time.Time) { + for requestID, timestamp := range seenRequestIDs { + if timestamp.Before(cutoff) { + delete(seenRequestIDs, requestID) + } + } +} + func printEvent(io *iostreams.IOStreams, event insights.EventWrapper) error { cs := io.ColorScheme() diff --git a/pkg/cmd/events/tail/tail_test.go b/pkg/cmd/events/tail/tail_test.go new file mode 100644 index 00000000..4058942d --- /dev/null +++ b/pkg/cmd/events/tail/tail_test.go @@ -0,0 +1,68 @@ +package tail + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/algolia/cli/api/insights" +) + +func TestUnseenEventsSkipsDuplicateRequestIDs(t *testing.T) { + now := time.Now().UTC() + seenRequestIDs := map[string]time.Time{} + events := []insights.EventWrapper{ + { + RequestID: "req-1", + Event: insights.Event{ + EventName: "first", + Timestamp: insights.Timestamp{Time: now}, + }, + }, + { + RequestID: "req-1", + Event: insights.Event{ + EventName: "duplicate", + Timestamp: insights.Timestamp{Time: now.Add(time.Second)}, + }, + }, + { + RequestID: "req-2", + Event: insights.Event{ + EventName: "second", + Timestamp: insights.Timestamp{Time: now.Add(2 * time.Second)}, + }, + }, + } + + freshEvents := unseenEvents(events, seenRequestIDs) + + require.Len(t, freshEvents, 2) + require.Equal(t, "first", freshEvents[0].Event.EventName) + require.Equal(t, "second", freshEvents[1].Event.EventName) + require.Contains(t, seenRequestIDs, "req-1") + require.Contains(t, seenRequestIDs, "req-2") +} + +func TestUnseenEventsKeepsEventsWithoutRequestID(t *testing.T) { + freshEvents := unseenEvents([]insights.EventWrapper{ + {Event: insights.Event{EventName: "first"}}, + {Event: insights.Event{EventName: "second"}}, + }, map[string]time.Time{}) + + require.Len(t, freshEvents, 2) +} + +func TestPruneSeenRequestIDsRemovesOldEntries(t *testing.T) { + now := time.Now().UTC() + seenRequestIDs := map[string]time.Time{ + "stale": now.Add(-2 * Interval), + "recent": now.Add(-Interval / 2), + } + + pruneSeenRequestIDs(seenRequestIDs, now.Add(-Interval)) + + require.NotContains(t, seenRequestIDs, "stale") + require.Contains(t, seenRequestIDs, "recent") +} From ce3bd409da32bff52cc077100ad3209c42d24c4d Mon Sep 17 00:00:00 2001 From: Dylan Tientcheu Date: Thu, 16 Apr 2026 15:37:34 +0200 Subject: [PATCH 2/3] fix: lint issue --- pkg/cmd/events/tail/tail.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/cmd/events/tail/tail.go b/pkg/cmd/events/tail/tail.go index c4894f49..e635889a 100644 --- a/pkg/cmd/events/tail/tail.go +++ b/pkg/cmd/events/tail/tail.go @@ -152,8 +152,6 @@ func runTailCmd(opts *TailOptions) error { windowStart = windowEnd.Add(-Interval) <-ticker.C } - - return nil } func unseenEvents(events []insights.EventWrapper, seenRequestIDs map[string]time.Time) []insights.EventWrapper { From 0e50c1d8781f379deac3cc79abda08a82db359e1 Mon Sep 17 00:00:00 2001 From: Dylan Tientcheu Date: Fri, 17 Apr 2026 17:19:53 +0200 Subject: [PATCH 3/3] fix: clock dedup issue. --- pkg/cmd/events/tail/tail.go | 6 +++--- pkg/cmd/events/tail/tail_test.go | 10 ++++++---- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/cmd/events/tail/tail.go b/pkg/cmd/events/tail/tail.go index e635889a..98836d2c 100644 --- a/pkg/cmd/events/tail/tail.go +++ b/pkg/cmd/events/tail/tail.go @@ -137,7 +137,7 @@ func runTailCmd(opts *TailOptions) error { } pruneSeenRequestIDs(seenRequestIDs, windowStart.Add(-Interval)) - for _, event := range unseenEvents(events.Events, seenRequestIDs) { + for _, event := range unseenEvents(events.Events, seenRequestIDs, windowEnd) { if p != nil { if err := p.Print(opts.IO, event); err != nil { return err @@ -154,7 +154,7 @@ func runTailCmd(opts *TailOptions) error { } } -func unseenEvents(events []insights.EventWrapper, seenRequestIDs map[string]time.Time) []insights.EventWrapper { +func unseenEvents(events []insights.EventWrapper, seenRequestIDs map[string]time.Time, seenAt time.Time) []insights.EventWrapper { freshEvents := make([]insights.EventWrapper, 0, len(events)) for _, event := range events { requestID := event.RequestID @@ -162,7 +162,7 @@ func unseenEvents(events []insights.EventWrapper, seenRequestIDs map[string]time if _, ok := seenRequestIDs[requestID]; ok { continue } - seenRequestIDs[requestID] = event.Event.Timestamp.Time + seenRequestIDs[requestID] = seenAt } freshEvents = append(freshEvents, event) diff --git a/pkg/cmd/events/tail/tail_test.go b/pkg/cmd/events/tail/tail_test.go index 4058942d..333a511b 100644 --- a/pkg/cmd/events/tail/tail_test.go +++ b/pkg/cmd/events/tail/tail_test.go @@ -11,6 +11,7 @@ import ( func TestUnseenEventsSkipsDuplicateRequestIDs(t *testing.T) { now := time.Now().UTC() + seenAt := now.Add(10 * time.Second) seenRequestIDs := map[string]time.Time{} events := []insights.EventWrapper{ { @@ -36,20 +37,21 @@ func TestUnseenEventsSkipsDuplicateRequestIDs(t *testing.T) { }, } - freshEvents := unseenEvents(events, seenRequestIDs) + freshEvents := unseenEvents(events, seenRequestIDs, seenAt) require.Len(t, freshEvents, 2) require.Equal(t, "first", freshEvents[0].Event.EventName) require.Equal(t, "second", freshEvents[1].Event.EventName) - require.Contains(t, seenRequestIDs, "req-1") - require.Contains(t, seenRequestIDs, "req-2") + require.Equal(t, seenAt, seenRequestIDs["req-1"]) + require.Equal(t, seenAt, seenRequestIDs["req-2"]) } func TestUnseenEventsKeepsEventsWithoutRequestID(t *testing.T) { + seenAt := time.Now().UTC() freshEvents := unseenEvents([]insights.EventWrapper{ {Event: insights.Event{EventName: "first"}}, {Event: insights.Event{EventName: "second"}}, - }, map[string]time.Time{}) + }, map[string]time.Time{}, seenAt) require.Len(t, freshEvents, 2) }