diff --git a/docs/apis/openapi.yaml b/docs/apis/openapi.yaml index 0832890e..3238a79f 100644 --- a/docs/apis/openapi.yaml +++ b/docs/apis/openapi.yaml @@ -194,7 +194,7 @@ components: - type: array items: type: string - description: '"*" or an array of enabled topics.' + description: '"*" or an array of enabled topics. Topic strings can include "*" as a wildcard matching any run of characters. When available topics are configured, wildcard patterns must match at least one available topic.' example: "*" Filter: diff --git a/docs/content/features/topics.mdoc b/docs/content/features/topics.mdoc index 2bae1af7..3a862238 100644 --- a/docs/content/features/topics.mdoc +++ b/docs/content/features/topics.mdoc @@ -64,6 +64,10 @@ curl '{% $OUTPOST_API_BASE_URL %}/tenants//destinations' \ }' ``` +Destination topics can also use `*` inside a topic string as a wildcard that matches any run of characters. For example, `user.*` matches `user.created` and `user.profile.updated`, `*.created` matches `user.created` and `order.created`, and `order.*.completed` matches `order.payment.completed`. + +When available topics are configured, wildcard patterns must match at least one available topic. + ## Event Fanout A single published event is independently delivered to every destination that matches its topic. Each delivery attempt is tracked separately. diff --git a/internal/apirouter/destination_handlers_test.go b/internal/apirouter/destination_handlers_test.go index abaa973f..4bbb81a5 100644 --- a/internal/apirouter/destination_handlers_test.go +++ b/internal/apirouter/destination_handlers_test.go @@ -94,6 +94,37 @@ func TestAPI_Destinations(t *testing.T) { require.Equal(t, http.StatusUnprocessableEntity, resp.Code) }) + t.Run("wildcard topic pattern matching configured topic is accepted", func(t *testing.T) { + h := newAPITest(t) + h.tenantStore.UpsertTenant(t.Context(), tf.Any(tf.WithID("t1"))) + + req := h.jsonReq(http.MethodPost, "/api/v1/tenants/t1/destinations", map[string]any{ + "type": "webhook", + "topics": []string{"user.*"}, + "config": map[string]string{"url": "https://example.com/hook"}, + }) + resp := h.do(h.withAPIKey(req)) + + require.Equal(t, http.StatusCreated, resp.Code) + var dest destregistry.DestinationDisplay + require.NoError(t, json.Unmarshal(resp.Body.Bytes(), &dest)) + assert.Equal(t, models.Topics{"user.*"}, dest.Topics) + }) + + t.Run("wildcard topic pattern not matching configured topic returns 422", func(t *testing.T) { + h := newAPITest(t) + h.tenantStore.UpsertTenant(t.Context(), tf.Any(tf.WithID("t1"))) + + req := h.jsonReq(http.MethodPost, "/api/v1/tenants/t1/destinations", map[string]any{ + "type": "webhook", + "topics": []string{"order.*"}, + "config": map[string]string{"url": "https://example.com/hook"}, + }) + resp := h.do(h.withAPIKey(req)) + + require.Equal(t, http.StatusUnprocessableEntity, resp.Code) + }) + t.Run("import timestamps", func(t *testing.T) { t.Run("disabled_at preserved on create", func(t *testing.T) { h := newAPITest(t) diff --git a/internal/models/entities.go b/internal/models/entities.go index e577d868..39ce26a1 100644 --- a/internal/models/entities.go +++ b/internal/models/entities.go @@ -148,7 +148,15 @@ func (t *Topics) MatchesAll() bool { } func (t *Topics) MatchTopic(eventTopic string) bool { - return eventTopic == "" || eventTopic == "*" || t.MatchesAll() || slices.Contains(*t, eventTopic) + if eventTopic == "" || eventTopic == "*" || t.MatchesAll() { + return true + } + for _, topic := range *t { + if matchTopicPattern(topic, eventTopic) { + return true + } + } + return false } func (t *Topics) Validate(availableTopics []string) error { @@ -166,6 +174,12 @@ func (t *Topics) Validate(availableTopics []string) error { if topic == "*" { return ErrInvalidTopics } + if strings.Contains(topic, "*") { + if !topicPatternMatchesAny(topic, availableTopics) { + return ErrInvalidTopics + } + continue + } if !slices.Contains(availableTopics, topic) { return ErrInvalidTopics } @@ -173,6 +187,51 @@ func (t *Topics) Validate(availableTopics []string) error { return nil } +func topicPatternMatchesAny(pattern string, topics []string) bool { + for _, topic := range topics { + if matchTopicPattern(pattern, topic) { + return true + } + } + return false +} + +func matchTopicPattern(pattern, topic string) bool { + if pattern == topic { + return true + } + if !strings.Contains(pattern, "*") { + return false + } + + patternIndex, topicIndex := 0, 0 + starIndex, starTopicIndex := -1, 0 + for topicIndex < len(topic) { + if patternIndex < len(pattern) && pattern[patternIndex] == topic[topicIndex] { + patternIndex++ + topicIndex++ + continue + } + if patternIndex < len(pattern) && pattern[patternIndex] == '*' { + starIndex = patternIndex + starTopicIndex = topicIndex + patternIndex++ + continue + } + if starIndex != -1 { + patternIndex = starIndex + 1 + starTopicIndex++ + topicIndex = starTopicIndex + continue + } + return false + } + for patternIndex < len(pattern) && pattern[patternIndex] == '*' { + patternIndex++ + } + return patternIndex == len(pattern) +} + func TopicsFromString(s string) Topics { return Topics(strings.Split(s, ",")) } diff --git a/internal/models/entities_test.go b/internal/models/entities_test.go index cff0e674..f02dc188 100644 --- a/internal/models/entities_test.go +++ b/internal/models/entities_test.go @@ -30,6 +30,26 @@ func TestDestinationTopics_Validate(t *testing.T) { availableTopics: testutil.TestTopics, validated: true, }, + { + topics: []string{"user.*"}, + availableTopics: testutil.TestTopics, + validated: true, + }, + { + topics: []string{"order.*"}, + availableTopics: testutil.TestTopics, + validated: false, + }, + { + topics: []string{"user.created", "order.*"}, + availableTopics: testutil.TestTopics, + validated: false, + }, + { + topics: []string{"order.*"}, + availableTopics: []string{"order.created", "user.created"}, + validated: true, + }, { topics: []string{"*"}, availableTopics: testutil.TestTopics, @@ -218,6 +238,54 @@ func TestTopics_MatchTopic(t *testing.T) { eventTopic: "user.created", expected: true, }, + { + name: "prefix wildcard matches topic family", + topics: []string{"user.*"}, + eventTopic: "user.created", + expected: true, + }, + { + name: "prefix wildcard matches nested topic family", + topics: []string{"user.*"}, + eventTopic: "user.profile.updated", + expected: true, + }, + { + name: "suffix wildcard matches topic family", + topics: []string{"*.created"}, + eventTopic: "order.created", + expected: true, + }, + { + name: "middle wildcard matches topic family", + topics: []string{"order.*.completed"}, + eventTopic: "order.payment.completed", + expected: true, + }, + { + name: "wildcard can match empty run of characters", + topics: []string{"user.*.created"}, + eventTopic: "user..created", + expected: true, + }, + { + name: "prefix wildcard does not match different prefix", + topics: []string{"user.*"}, + eventTopic: "order.created", + expected: false, + }, + { + name: "suffix wildcard does not match different suffix", + topics: []string{"*.created"}, + eventTopic: "user.deleted", + expected: false, + }, + { + name: "middle wildcard requires suffix", + topics: []string{"order.*.completed"}, + eventTopic: "order.payment.failed", + expected: false, + }, // No match { name: "no topic match", @@ -256,6 +324,49 @@ func TestTopics_MatchTopic(t *testing.T) { } } +func BenchmarkTopics_MatchTopic(b *testing.B) { + topicsExact := models.Topics{ + "user.created", + "user.deleted", + "user.updated", + "order.created", + "order.deleted", + "order.updated", + } + topicsPattern := models.Topics{ + "user.*", + "*.deleted", + "order.*.completed", + "invoice.created", + "invoice.deleted", + "invoice.updated", + } + + b.Run("exact match", func(b *testing.B) { + for range b.N { + _ = topicsExact.MatchTopic("order.updated") + } + }) + + b.Run("exact miss", func(b *testing.B) { + for range b.N { + _ = topicsExact.MatchTopic("payment.created") + } + }) + + b.Run("pattern match", func(b *testing.B) { + for range b.N { + _ = topicsPattern.MatchTopic("order.payment.completed") + } + }) + + b.Run("pattern miss", func(b *testing.B) { + for range b.N { + _ = topicsPattern.MatchTopic("order.payment.failed") + } + }) +} + func TestFilter_MarshalBinary(t *testing.T) { t.Parallel() diff --git a/internal/publishmq/eventhandler_test.go b/internal/publishmq/eventhandler_test.go index 02cd47ad..7fc9fa45 100644 --- a/internal/publishmq/eventhandler_test.go +++ b/internal/publishmq/eventhandler_test.go @@ -262,6 +262,44 @@ func TestEventHandler_HandleResult(t *testing.T) { require.Len(t, result.DestinationIDs, 3) }) + t.Run("normal publish with wildcard destination topics", func(t *testing.T) { + wildcardTenant := models.Tenant{ + ID: idgen.String(), + CreatedAt: time.Now(), + } + require.NoError(t, tenantStore.UpsertTenant(ctx, wildcardTenant)) + + destFactory := testutil.DestinationFactory + matchingDestinations := []models.Destination{ + destFactory.Any( + destFactory.WithTenantID(wildcardTenant.ID), + destFactory.WithTopics([]string{"user.*"}), + ), + destFactory.Any( + destFactory.WithTenantID(wildcardTenant.ID), + destFactory.WithTopics([]string{"*.created"}), + ), + } + for _, dest := range matchingDestinations { + require.NoError(t, tenantStore.UpsertDestination(ctx, dest)) + } + require.NoError(t, tenantStore.UpsertDestination(ctx, destFactory.Any( + destFactory.WithTenantID(wildcardTenant.ID), + destFactory.WithTopics([]string{"user.deleted*"}), + ))) + + event := testutil.EventFactory.AnyPointer( + testutil.EventFactory.WithTenantID(wildcardTenant.ID), + testutil.EventFactory.WithTopic("user.created"), + ) + + result, err := eventHandler.Handle(ctx, event) + require.NoError(t, err) + require.NotNil(t, result) + require.False(t, result.Duplicate) + require.ElementsMatch(t, []string{matchingDestinations[0].ID, matchingDestinations[1].ID}, result.DestinationIDs) + }) + t.Run("no destinations matched", func(t *testing.T) { event := testutil.EventFactory.AnyPointer( testutil.EventFactory.WithTenantID(tenant.ID), diff --git a/internal/tenantstore/drivertest/list.go b/internal/tenantstore/drivertest/list.go index 326197f1..5866a5c5 100644 --- a/internal/tenantstore/drivertest/list.go +++ b/internal/tenantstore/drivertest/list.go @@ -319,6 +319,22 @@ func testListDestination(t *testing.T, newHarness HarnessMaker) { }) require.NoError(t, err) require.Len(t, destinations, 3) + + for _, topics := range [][]string{{"user.*"}, {"*.created"}, {"user.deleted*"}} { + require.NoError(t, store.UpsertDestination(ctx, testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithTenantID(data.tenant.ID), + testutil.DestinationFactory.WithTopics(topics), + ))) + } + + // The filter topic is treated as a concrete published topic. Destinations + // subscribed with wildcard patterns are included when they would receive it. + destinations, err = store.ListDestination(ctx, driver.ListDestinationRequest{ + TenantID: data.tenant.ID, + Topics: []string{"user.created"}, + }) + require.NoError(t, err) + require.Len(t, destinations, 5) }) } diff --git a/internal/tenantstore/drivertest/match.go b/internal/tenantstore/drivertest/match.go index 39b49a09..1a9d9a97 100644 --- a/internal/tenantstore/drivertest/match.go +++ b/internal/tenantstore/drivertest/match.go @@ -133,6 +133,75 @@ func testMatch(t *testing.T, newHarness HarnessMaker) { }) }) + t.Run("MatchByWildcardTopic", func(t *testing.T) { + ctx := context.Background() + h, err := newHarness(ctx, t) + require.NoError(t, err) + t.Cleanup(h.Close) + + store, err := h.MakeDriver(ctx) + require.NoError(t, err) + + tenant := models.Tenant{ID: idgen.String()} + require.NoError(t, store.UpsertTenant(ctx, tenant)) + + destUserFamily := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithID("dest_user_family"), + testutil.DestinationFactory.WithTenantID(tenant.ID), + testutil.DestinationFactory.WithTopics([]string{"user.*"}), + ) + destCreatedFamily := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithID("dest_created_family"), + testutil.DestinationFactory.WithTenantID(tenant.ID), + testutil.DestinationFactory.WithTopics([]string{"*.created"}), + ) + destOrderCompletedFamily := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithID("dest_order_completed_family"), + testutil.DestinationFactory.WithTenantID(tenant.ID), + testutil.DestinationFactory.WithTopics([]string{"order.*.completed"}), + ) + destExact := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithID("dest_exact"), + testutil.DestinationFactory.WithTenantID(tenant.ID), + testutil.DestinationFactory.WithTopics([]string{"user.created"}), + ) + + require.NoError(t, store.CreateDestination(ctx, destUserFamily)) + require.NoError(t, store.CreateDestination(ctx, destCreatedFamily)) + require.NoError(t, store.CreateDestination(ctx, destOrderCompletedFamily)) + require.NoError(t, store.CreateDestination(ctx, destExact)) + + t.Run("matches prefix and suffix wildcard subscriptions", func(t *testing.T) { + event := testutil.EventFactory.Any( + testutil.EventFactory.WithTenantID(tenant.ID), + testutil.EventFactory.WithTopic("user.created"), + ) + matched, err := store.MatchEvent(ctx, event) + require.NoError(t, err) + assert.ElementsMatch(t, []string{"dest_user_family", "dest_created_family", "dest_exact"}, matched) + }) + + t.Run("matches separator agnostic middle wildcard subscription", func(t *testing.T) { + event := testutil.EventFactory.Any( + testutil.EventFactory.WithTenantID(tenant.ID), + testutil.EventFactory.WithTopic("order.payment.completed"), + ) + matched, err := store.MatchEvent(ctx, event) + require.NoError(t, err) + assert.ElementsMatch(t, []string{"dest_order_completed_family"}, matched) + }) + + t.Run("does not overmatch unrelated topic", func(t *testing.T) { + event := testutil.EventFactory.Any( + testutil.EventFactory.WithTenantID(tenant.ID), + testutil.EventFactory.WithTopic("order.payment.failed"), + ) + matched, err := store.MatchEvent(ctx, event) + require.NoError(t, err) + assert.Empty(t, matched) + }) + }) + t.Run("MatchEventWithFilter", func(t *testing.T) { ctx := context.Background() h, err := newHarness(ctx, t) diff --git a/internal/tenantstore/memtenantstore/memtenantstore.go b/internal/tenantstore/memtenantstore/memtenantstore.go index 07f5e3d8..fa438afb 100644 --- a/internal/tenantstore/memtenantstore/memtenantstore.go +++ b/internal/tenantstore/memtenantstore/memtenantstore.go @@ -478,7 +478,7 @@ func matchDestFilter(filter *destinationFilter, dest models.Destination) bool { return false } for _, topic := range filter.Topics { - if !slices.Contains(dest.Topics, topic) { + if !dest.Topics.MatchTopic(topic) { return false } } diff --git a/internal/tenantstore/redistenantstore/serialization.go b/internal/tenantstore/redistenantstore/serialization.go index f0007760..d4ebc3b8 100644 --- a/internal/tenantstore/redistenantstore/serialization.go +++ b/internal/tenantstore/redistenantstore/serialization.go @@ -368,14 +368,7 @@ func matchDestinationFilter(filter *destinationFilter, summary destinationSummar return false } for _, topic := range filter.Topics { - found := false - for _, st := range summary.Topics { - if st == topic { - found = true - break - } - } - if !found { + if !summary.Topics.MatchTopic(topic) { return false } }