diff --git a/api/insights/client.go b/api/insights/client.go index f74f48b9..93bfe5f7 100644 --- a/api/insights/client.go +++ b/api/insights/client.go @@ -31,6 +31,7 @@ func NewClient(appID, apiKey string, region algoliaInsights.Region) (*Client, er ApiKey: apiKey, UserAgent: userAgent, }, + Region: region, } client, err := algoliaInsights.NewClientWithConfig(clientConfig) if err != nil { diff --git a/api/insights/client_test.go b/api/insights/client_test.go new file mode 100644 index 00000000..2736086c --- /dev/null +++ b/api/insights/client_test.go @@ -0,0 +1,21 @@ +package insights + +import ( + "reflect" + "testing" + + algoliaInsights "github.com/algolia/algoliasearch-client-go/v4/algolia/insights" + "github.com/stretchr/testify/require" +) + +func TestNewClientSetsRequestedRegion(t *testing.T) { + client, err := NewClient("test-app-id", "test-api-key", algoliaInsights.DE) + require.NoError(t, err) + + cfg := client.GetConfiguration() + require.Equal(t, algoliaInsights.DE, cfg.Region) + require.NotEmpty(t, cfg.Hosts) + + host := reflect.ValueOf(cfg.Hosts[0]).FieldByName("host").String() + require.Equal(t, "insights.de.algolia.io", host) +} diff --git a/pkg/cmd/events/tail/tail.go b/pkg/cmd/events/tail/tail.go index 64053ab0..98836d2c 100644 --- a/pkg/cmd/events/tail/tail.go +++ b/pkg/cmd/events/tail/tail.go @@ -113,10 +113,15 @@ func runTailCmd(opts *TailOptions) error { fmt.Fprint(opts.IO.Out, "\nWaiting for events... Press Ctrl+C to stop.\n") } - c := time.Tick(Interval) - for t := range c { - utc := t.UTC() - events, err := client.GetEvents(utc.Add(-1*time.Second), utc, 1000) + ticker := time.NewTicker(Interval) + defer ticker.Stop() + + windowStart := time.Now().UTC().Add(-Interval) + seenRequestIDs := map[string]time.Time{} + + for { + windowEnd := time.Now().UTC() + events, err := client.GetEvents(windowStart, windowEnd, 1000) if err != nil { if strings.Contains(err.Error(), "The log processing region does not match") { cs := opts.IO.ColorScheme() @@ -127,9 +132,12 @@ func runTailCmd(opts *TailOptions) error { `, cs.FailureIcon(), opts.Region) return errors.New(errDetails) } + + return err } - for _, event := range events.Events { + pruneSeenRequestIDs(seenRequestIDs, windowStart.Add(-Interval)) + for _, event := range unseenEvents(events.Events, seenRequestIDs, windowEnd) { if p != nil { if err := p.Print(opts.IO, event); err != nil { return err @@ -140,9 +148,35 @@ func runTailCmd(opts *TailOptions) error { } } } + + windowStart = windowEnd.Add(-Interval) + <-ticker.C } +} - return nil +func unseenEvents(events []insights.EventWrapper, seenRequestIDs map[string]time.Time, seenAt time.Time) []insights.EventWrapper { + freshEvents := make([]insights.EventWrapper, 0, len(events)) + for _, event := range events { + requestID := event.RequestID + if requestID != "" { + if _, ok := seenRequestIDs[requestID]; ok { + continue + } + seenRequestIDs[requestID] = seenAt + } + + freshEvents = append(freshEvents, event) + } + + return freshEvents +} + +func pruneSeenRequestIDs(seenRequestIDs map[string]time.Time, cutoff time.Time) { + for requestID, timestamp := range seenRequestIDs { + if timestamp.Before(cutoff) { + delete(seenRequestIDs, requestID) + } + } } func printEvent(io *iostreams.IOStreams, event insights.EventWrapper) error { diff --git a/pkg/cmd/events/tail/tail_test.go b/pkg/cmd/events/tail/tail_test.go new file mode 100644 index 00000000..333a511b --- /dev/null +++ b/pkg/cmd/events/tail/tail_test.go @@ -0,0 +1,70 @@ +package tail + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/algolia/cli/api/insights" +) + +func TestUnseenEventsSkipsDuplicateRequestIDs(t *testing.T) { + now := time.Now().UTC() + seenAt := now.Add(10 * time.Second) + seenRequestIDs := map[string]time.Time{} + events := []insights.EventWrapper{ + { + RequestID: "req-1", + Event: insights.Event{ + EventName: "first", + Timestamp: insights.Timestamp{Time: now}, + }, + }, + { + RequestID: "req-1", + Event: insights.Event{ + EventName: "duplicate", + Timestamp: insights.Timestamp{Time: now.Add(time.Second)}, + }, + }, + { + RequestID: "req-2", + Event: insights.Event{ + EventName: "second", + Timestamp: insights.Timestamp{Time: now.Add(2 * time.Second)}, + }, + }, + } + + freshEvents := unseenEvents(events, seenRequestIDs, seenAt) + + require.Len(t, freshEvents, 2) + require.Equal(t, "first", freshEvents[0].Event.EventName) + require.Equal(t, "second", freshEvents[1].Event.EventName) + require.Equal(t, seenAt, seenRequestIDs["req-1"]) + require.Equal(t, seenAt, seenRequestIDs["req-2"]) +} + +func TestUnseenEventsKeepsEventsWithoutRequestID(t *testing.T) { + seenAt := time.Now().UTC() + freshEvents := unseenEvents([]insights.EventWrapper{ + {Event: insights.Event{EventName: "first"}}, + {Event: insights.Event{EventName: "second"}}, + }, map[string]time.Time{}, seenAt) + + require.Len(t, freshEvents, 2) +} + +func TestPruneSeenRequestIDsRemovesOldEntries(t *testing.T) { + now := time.Now().UTC() + seenRequestIDs := map[string]time.Time{ + "stale": now.Add(-2 * Interval), + "recent": now.Add(-Interval / 2), + } + + pruneSeenRequestIDs(seenRequestIDs, now.Add(-Interval)) + + require.NotContains(t, seenRequestIDs, "stale") + require.Contains(t, seenRequestIDs, "recent") +}