Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/apis/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ components:
- type: array
items:
type: string
description: '"*" or an array of enabled topics.'
description: '"*" or an array of enabled topics. Topic strings can include "*" as a wildcard matching any run of characters. When available topics are configured, wildcard patterns must match at least one available topic.'
example: "*"

Filter:
Expand Down
4 changes: 4 additions & 0 deletions docs/content/features/topics.mdoc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ curl '{% $OUTPOST_API_BASE_URL %}/tenants/<TENANT_ID>/destinations' \
}'
```

Destination topics can also use `*` inside a topic string as a wildcard that matches any run of characters. For example, `user.*` matches `user.created` and `user.profile.updated`, `*.created` matches `user.created` and `order.created`, and `order.*.completed` matches `order.payment.completed`.

When available topics are configured, wildcard patterns must match at least one available topic.

## Event Fanout

A single published event is independently delivered to every destination that matches its topic. Each delivery attempt is tracked separately.
Expand Down
31 changes: 31 additions & 0 deletions internal/apirouter/destination_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,37 @@ func TestAPI_Destinations(t *testing.T) {
require.Equal(t, http.StatusUnprocessableEntity, resp.Code)
})

t.Run("wildcard topic pattern matching configured topic is accepted", func(t *testing.T) {
h := newAPITest(t)
h.tenantStore.UpsertTenant(t.Context(), tf.Any(tf.WithID("t1")))

req := h.jsonReq(http.MethodPost, "/api/v1/tenants/t1/destinations", map[string]any{
"type": "webhook",
"topics": []string{"user.*"},
"config": map[string]string{"url": "https://example.com/hook"},
})
resp := h.do(h.withAPIKey(req))

require.Equal(t, http.StatusCreated, resp.Code)
var dest destregistry.DestinationDisplay
require.NoError(t, json.Unmarshal(resp.Body.Bytes(), &dest))
assert.Equal(t, models.Topics{"user.*"}, dest.Topics)
})

t.Run("wildcard topic pattern not matching configured topic returns 422", func(t *testing.T) {
h := newAPITest(t)
h.tenantStore.UpsertTenant(t.Context(), tf.Any(tf.WithID("t1")))

req := h.jsonReq(http.MethodPost, "/api/v1/tenants/t1/destinations", map[string]any{
"type": "webhook",
"topics": []string{"order.*"},
"config": map[string]string{"url": "https://example.com/hook"},
})
resp := h.do(h.withAPIKey(req))

require.Equal(t, http.StatusUnprocessableEntity, resp.Code)
})

t.Run("import timestamps", func(t *testing.T) {
t.Run("disabled_at preserved on create", func(t *testing.T) {
h := newAPITest(t)
Expand Down
61 changes: 60 additions & 1 deletion internal/models/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,15 @@ func (t *Topics) MatchesAll() bool {
}

func (t *Topics) MatchTopic(eventTopic string) bool {
return eventTopic == "" || eventTopic == "*" || t.MatchesAll() || slices.Contains(*t, eventTopic)
if eventTopic == "" || eventTopic == "*" || t.MatchesAll() {
return true
}
for _, topic := range *t {
if matchTopicPattern(topic, eventTopic) {
return true
}
}
return false
}

func (t *Topics) Validate(availableTopics []string) error {
Expand All @@ -166,13 +174,64 @@ func (t *Topics) Validate(availableTopics []string) error {
if topic == "*" {
return ErrInvalidTopics
}
if strings.Contains(topic, "*") {
if !topicPatternMatchesAny(topic, availableTopics) {
return ErrInvalidTopics
}
continue
}
if !slices.Contains(availableTopics, topic) {
return ErrInvalidTopics
}
}
return nil
}

func topicPatternMatchesAny(pattern string, topics []string) bool {
for _, topic := range topics {
if matchTopicPattern(pattern, topic) {
return true
}
}
return false
}

func matchTopicPattern(pattern, topic string) bool {
if pattern == topic {
return true
}
if !strings.Contains(pattern, "*") {
return false
}

patternIndex, topicIndex := 0, 0
starIndex, starTopicIndex := -1, 0
for topicIndex < len(topic) {
if patternIndex < len(pattern) && pattern[patternIndex] == topic[topicIndex] {
patternIndex++
topicIndex++
continue
}
if patternIndex < len(pattern) && pattern[patternIndex] == '*' {
starIndex = patternIndex
starTopicIndex = topicIndex
patternIndex++
continue
}
if starIndex != -1 {
patternIndex = starIndex + 1
starTopicIndex++
topicIndex = starTopicIndex
continue
}
return false
}
for patternIndex < len(pattern) && pattern[patternIndex] == '*' {
patternIndex++
}
return patternIndex == len(pattern)
}

func TopicsFromString(s string) Topics {
return Topics(strings.Split(s, ","))
}
111 changes: 111 additions & 0 deletions internal/models/entities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,26 @@ func TestDestinationTopics_Validate(t *testing.T) {
availableTopics: testutil.TestTopics,
validated: true,
},
{
topics: []string{"user.*"},
availableTopics: testutil.TestTopics,
validated: true,
},
{
topics: []string{"order.*"},
availableTopics: testutil.TestTopics,
validated: false,
},
{
topics: []string{"user.created", "order.*"},
availableTopics: testutil.TestTopics,
validated: false,
},
{
topics: []string{"order.*"},
availableTopics: []string{"order.created", "user.created"},
validated: true,
},
{
topics: []string{"*"},
availableTopics: testutil.TestTopics,
Expand Down Expand Up @@ -218,6 +238,54 @@ func TestTopics_MatchTopic(t *testing.T) {
eventTopic: "user.created",
expected: true,
},
{
name: "prefix wildcard matches topic family",
topics: []string{"user.*"},
eventTopic: "user.created",
expected: true,
},
{
name: "prefix wildcard matches nested topic family",
topics: []string{"user.*"},
eventTopic: "user.profile.updated",
expected: true,
},
{
name: "suffix wildcard matches topic family",
topics: []string{"*.created"},
eventTopic: "order.created",
expected: true,
},
{
name: "middle wildcard matches topic family",
topics: []string{"order.*.completed"},
eventTopic: "order.payment.completed",
expected: true,
},
{
name: "wildcard can match empty run of characters",
topics: []string{"user.*.created"},
eventTopic: "user..created",
expected: true,
},
{
name: "prefix wildcard does not match different prefix",
topics: []string{"user.*"},
eventTopic: "order.created",
expected: false,
},
{
name: "suffix wildcard does not match different suffix",
topics: []string{"*.created"},
eventTopic: "user.deleted",
expected: false,
},
{
name: "middle wildcard requires suffix",
topics: []string{"order.*.completed"},
eventTopic: "order.payment.failed",
expected: false,
},
// No match
{
name: "no topic match",
Expand Down Expand Up @@ -256,6 +324,49 @@ func TestTopics_MatchTopic(t *testing.T) {
}
}

func BenchmarkTopics_MatchTopic(b *testing.B) {
topicsExact := models.Topics{
"user.created",
"user.deleted",
"user.updated",
"order.created",
"order.deleted",
"order.updated",
}
topicsPattern := models.Topics{
"user.*",
"*.deleted",
"order.*.completed",
"invoice.created",
"invoice.deleted",
"invoice.updated",
}

b.Run("exact match", func(b *testing.B) {
for range b.N {
_ = topicsExact.MatchTopic("order.updated")
}
})

b.Run("exact miss", func(b *testing.B) {
for range b.N {
_ = topicsExact.MatchTopic("payment.created")
}
})

b.Run("pattern match", func(b *testing.B) {
for range b.N {
_ = topicsPattern.MatchTopic("order.payment.completed")
}
})

b.Run("pattern miss", func(b *testing.B) {
for range b.N {
_ = topicsPattern.MatchTopic("order.payment.failed")
}
})
}

func TestFilter_MarshalBinary(t *testing.T) {
t.Parallel()

Expand Down
38 changes: 38 additions & 0 deletions internal/publishmq/eventhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,44 @@ func TestEventHandler_HandleResult(t *testing.T) {
require.Len(t, result.DestinationIDs, 3)
})

t.Run("normal publish with wildcard destination topics", func(t *testing.T) {
wildcardTenant := models.Tenant{
ID: idgen.String(),
CreatedAt: time.Now(),
}
require.NoError(t, tenantStore.UpsertTenant(ctx, wildcardTenant))

destFactory := testutil.DestinationFactory
matchingDestinations := []models.Destination{
destFactory.Any(
destFactory.WithTenantID(wildcardTenant.ID),
destFactory.WithTopics([]string{"user.*"}),
),
destFactory.Any(
destFactory.WithTenantID(wildcardTenant.ID),
destFactory.WithTopics([]string{"*.created"}),
),
}
for _, dest := range matchingDestinations {
require.NoError(t, tenantStore.UpsertDestination(ctx, dest))
}
require.NoError(t, tenantStore.UpsertDestination(ctx, destFactory.Any(
destFactory.WithTenantID(wildcardTenant.ID),
destFactory.WithTopics([]string{"user.deleted*"}),
)))

event := testutil.EventFactory.AnyPointer(
testutil.EventFactory.WithTenantID(wildcardTenant.ID),
testutil.EventFactory.WithTopic("user.created"),
)

result, err := eventHandler.Handle(ctx, event)
require.NoError(t, err)
require.NotNil(t, result)
require.False(t, result.Duplicate)
require.ElementsMatch(t, []string{matchingDestinations[0].ID, matchingDestinations[1].ID}, result.DestinationIDs)
})

t.Run("no destinations matched", func(t *testing.T) {
event := testutil.EventFactory.AnyPointer(
testutil.EventFactory.WithTenantID(tenant.ID),
Expand Down
16 changes: 16 additions & 0 deletions internal/tenantstore/drivertest/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,22 @@ func testListDestination(t *testing.T, newHarness HarnessMaker) {
})
require.NoError(t, err)
require.Len(t, destinations, 3)

for _, topics := range [][]string{{"user.*"}, {"*.created"}, {"user.deleted*"}} {
require.NoError(t, store.UpsertDestination(ctx, testutil.DestinationFactory.Any(
testutil.DestinationFactory.WithTenantID(data.tenant.ID),
testutil.DestinationFactory.WithTopics(topics),
)))
}

// The filter topic is treated as a concrete published topic. Destinations
// subscribed with wildcard patterns are included when they would receive it.
destinations, err = store.ListDestination(ctx, driver.ListDestinationRequest{
TenantID: data.tenant.ID,
Topics: []string{"user.created"},
})
require.NoError(t, err)
require.Len(t, destinations, 5)
})
}

Expand Down
Loading