Skip to content

feat(tenant-manager): add TenantDiscovery orchestrator for background workers#416

Open
jeffersonrodrigues92 wants to merge 3 commits intodevelopfrom
feat/tenant-discovery-orchestrator
Open

feat(tenant-manager): add TenantDiscovery orchestrator for background workers#416
jeffersonrodrigues92 wants to merge 3 commits intodevelopfrom
feat/tenant-discovery-orchestrator

Conversation

@jeffersonrodrigues92
Copy link
Copy Markdown
Contributor

Closes #415

Summary

Adds TenantDiscovery orchestrator in commons/tenant-manager/discovery/ that combines:

  1. Bootstrap via GET /tenants/active?service={name} to populate an in-memory tenant set on start
  2. Real-time updates via Redis Pub/Sub TenantEventListener to keep the set current (add/remove/suspend)
  3. Simple API for background workers to iterate active tenants without polling

API

discovery, _ := tmdiscovery.NewTenantDiscovery(tmdiscovery.Config{
    ServiceName: "ledger",
    TMClient:    tenantClient,
    RedisConfig: tmredis.TenantPubSubRedisConfig{Host: "redis", Port: "6379"},
    Logger:      logger,
})
discovery.Start(ctx)

for _, tenantID := range discovery.ActiveTenantIDs() {
    // process tenant
}

Design decisions

  • sync.RWMutex + map[string]struct{} for tenant set (simple, fast)
  • Lightweight internal event handler instead of full EventDispatcher (avoids heavy TenantCache + TenantLoader dependencies)
  • Service-level events filtered by service_name in payload
  • Redis Pub/Sub failure is non-fatal at start (bootstrap-only mode with warning log)
  • Bootstrap failure is fatal (fail-fast — no tenants to process)

Event routing

Event Action
tenant.service.associated Add to set + OnTenantAdded callback
tenant.service.reactivated Add to set + OnTenantAdded callback
tenant.service.disassociated Remove from set + OnTenantRemoved callback
tenant.service.suspended Remove + OnTenantRemoved
tenant.service.purged Remove + OnTenantRemoved
tenant.suspended Remove + OnTenantRemoved
tenant.deleted Remove + OnTenantRemoved
Others (created, updated, activated, credentials.rotated, connections.updated) No-op

Test plan

  • 17 unit tests passing (TDD: RED → GREEN)
  • Lint: 0 issues
  • Validation, bootstrap success/failure, Redis unavailable, event routing, callbacks, snapshot isolation, close

jeffersonrodrigues92 and others added 3 commits March 27, 2026 14:45
… workers

Combines bootstrap via GET /tenants/active with real-time Pub/Sub updates in a single entry point. Workers call Start() once and use ActiveTenantIDs() to iterate tenants. Supports OnTenantAdded/OnTenantRemoved callbacks. Redis Pub/Sub failure is non-fatal (bootstrap-only mode).

X-Lerian-Ref: 0x1
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 28, 2026

Walkthrough

A new TenantDiscovery package has been introduced in commons/tenant-manager/discovery/. This package provides a background worker that maintains an in-memory set of active tenant IDs. On startup, it bootstraps tenant data via the tenant-manager client and optionally subscribes to Redis Pub/Sub for tenant lifecycle events. The implementation offers methods to retrieve active tenant snapshots, register event callbacks for tenant additions and removals, handle incoming lifecycle events, and gracefully close the listener. Configuration requires a service name, tenant-manager client implementation, and Redis configuration. The test suite validates constructor validation, bootstrap behavior, callback invocation, and event routing logic.

Sequence Diagram(s)

sequenceDiagram
    participant Caller
    participant TenantDiscovery
    participant TMClient
    participant Redis as Redis<br/>Pub/Sub
    participant Callback as Callback<br/>Handlers

    Caller->>TenantDiscovery: Start(ctx)
    
    rect rgba(100, 150, 200, 0.5)
    Note over TenantDiscovery,TMClient: Bootstrap Phase
    TenantDiscovery->>TMClient: GetActiveTenantsByService(ctx, serviceName)
    TMClient-->>TenantDiscovery: []*TenantSummary
    TenantDiscovery->>TenantDiscovery: Populate in-memory tenant set
    end
    
    rect rgba(150, 100, 200, 0.5)
    Note over TenantDiscovery,Redis: Redis Listener Setup
    TenantDiscovery->>Redis: Create Pub/Sub listener
    TenantDiscovery->>Redis: Start listening for events
    Redis-->>TenantDiscovery: Connected (or warning if unavailable)
    end
    
    TenantDiscovery-->>Caller: error or nil

    Caller->>TenantDiscovery: HandleEvent(ctx, event)
    
    rect rgba(200, 150, 100, 0.5)
    Note over TenantDiscovery,Callback: Event Processing
    TenantDiscovery->>TenantDiscovery: Parse event payload
    TenantDiscovery->>TenantDiscovery: Match service_name
    alt Service matches
        TenantDiscovery->>TenantDiscovery: Route to add/remove logic
        TenantDiscovery->>TenantDiscovery: Update tenant set
        TenantDiscovery->>Callback: Invoke OnTenantAdded or<br/>OnTenantRemoved
        Callback-->>TenantDiscovery: callback complete
    else Service mismatch or no-op event
        TenantDiscovery->>TenantDiscovery: Skip/ignore
    end
    end
    
    TenantDiscovery-->>Caller: error or nil
Loading
🚥 Pre-merge checks | ✅ 4
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main feature addition: a new TenantDiscovery orchestrator for background workers to discover active tenants.
Description check ✅ Passed The PR description comprehensively covers the summary, API design, design decisions, event routing table, and test results matching the template requirements.
Linked Issues check ✅ Passed All acceptance criteria from issue #415 are met: validation, bootstrap via GetActiveTenantsByService, TenantEventListener integration, concurrent-safe ActiveTenantIDs, callback support, Close cleanup, Redis failure tolerance, and 17 passing unit tests.
Out of Scope Changes check ✅ Passed Both files (discovery.go and discovery_test.go) are directly in scope: new TenantDiscovery implementation and comprehensive unit tests validating the feature requirements from issue #415.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@commons/tenant-manager/discovery/discovery.go`:
- Around line 143-149: The callback fields (onAdded/onRemoved) are written
without synchronization and addTenant calls onAdded even for existing tenants;
fix by adding a mutex on TenantDiscovery (e.g., callbacksMu) and: set
onAdded/onRemoved under this lock in OnTenantAdded/OnTenantRemoved; when firing
callbacks (in Start/event listeners and in addTenant/removeTenant) acquire the
lock briefly to copy the callback to a local variable then release the lock and
call the copy (nil-safe). Also change addTenant to check if the tenant was
already present and only invoke onAdded when the insert is new to avoid
duplicate work; apply the same guarded pattern for removals.
- Around line 100-119: When tmredis.NewTenantPubSubRedisClient succeeds but
tmevent.NewTenantEventListener returns an error, call redisClient.Close (or the
appropriate Close/Shutdown method on the returned client) before returning to
avoid leaking connections; likewise, if listener.Start(ctx) returns an error,
call listener.Stop (or Shutdown) and then redisClient.Close and log any cleanup
errors using d.logger.WarnfCtx. Update the branches that handle listenerErr and
startErr to perform these cleanup calls (reference
tmredis.NewTenantPubSubRedisClient, tmevent.NewTenantEventListener,
listener.Start, redisClient.Close, listener.Stop and keep using d.HandleEvent
and d.logger.WarnfCtx for context).
- Around line 84-126: The Start method currently bootstraps via
d.cfg.TMClient.GetActiveTenantsByService before creating/starting the Redis
listener (tmevent.NewTenantEventListener and listener.Start), which creates a
missed-event window; fix by starting the subscriber first and ensuring events
received during bootstrap are applied: create and Start the listener (using
NewTenantEventListener and d.HandleEvent) before calling
GetActiveTenantsByService, buffer incoming events (e.g., into a temporary
channel or in-memory queue) while bootstrapping d.tenants, then after populating
d.tenants drain the buffer by invoking d.HandleEvent for each buffered event (or
alternatively perform a second GetActiveTenantsByService reconciliation after
listener.Start and merge results into d.tenants) so no tenant changes are
permanently lost.
- Around line 206-224: The handlers handleServiceAdd and handleServiceRemove
currently ignore JSON decode failures because matchesService collapses unmarshal
errors into false; change matchesService to surface errors (e.g., return (bool,
error)) or provide a separate validateServicePayload(ctx, evt) that returns an
error, then update handleServiceAdd and handleServiceRemove to call the new API,
and if an error is returned log it with event context and return the error to
the caller (HandleEvent) instead of silently treating it as a mismatch; apply
the same change pattern to the other service handlers referenced around lines
228-236 so malformed tenant.service.* payloads are propagated and not swallowed.
- Around line 97-124: Replace printf-style logger calls (d.logger.InfofCtx /
d.logger.WarnfCtx) with the structured logger API d.logger.Log(ctx, level, msg,
fields...) in the discovery bootstrap and event-listener setup (the calls
logging bootstrapped tenant count, redis pub/sub unavailable, failed to create
event listener, failed to start event listener, and event listener started).
Emit structured fields such as service: d.cfg.ServiceName, tenant_count:
len(summaries), error: <error variable> and event_type where relevant instead of
interpolating into the message string; do the same for the similar occurrences
referenced later (the calls around the other reported lines). Ensure you pass
the correct log level (Info/Warn) to Log and assign d.listener only after
successful Start.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 55d7b844-3ed5-4cdc-bb5b-d159cfb14d5d

📥 Commits

Reviewing files that changed from the base of the PR and between 66ba5cb and 39d4939.

📒 Files selected for processing (2)
  • commons/tenant-manager/discovery/discovery.go
  • commons/tenant-manager/discovery/discovery_test.go

Comment on lines +84 to +126
func (d *TenantDiscovery) Start(ctx context.Context) error {
// Bootstrap from tenant-manager API
summaries, err := d.cfg.TMClient.GetActiveTenantsByService(ctx, d.cfg.ServiceName)
if err != nil {
return fmt.Errorf("discovery.Start: bootstrap failed: %w", err)
}

d.mu.Lock()
for _, s := range summaries {
d.tenants[s.ID] = struct{}{}
}
d.mu.Unlock()

d.logger.InfofCtx(ctx, "discovery: bootstrapped %d tenants for service %s", len(summaries), d.cfg.ServiceName)

// Connect to Redis Pub/Sub for live updates
redisClient, redisErr := tmredis.NewTenantPubSubRedisClient(ctx, d.cfg.RedisConfig)
if redisErr != nil {
d.logger.WarnfCtx(ctx, "discovery: redis pub/sub unavailable, running in bootstrap-only mode: %v", redisErr)
return nil
}

listener, listenerErr := tmevent.NewTenantEventListener(
redisClient,
d.HandleEvent,
tmevent.WithListenerLogger(d.cfg.Logger),
tmevent.WithService(d.cfg.ServiceName),
)
if listenerErr != nil {
d.logger.WarnfCtx(ctx, "discovery: failed to create event listener: %v", listenerErr)
return nil
}

if startErr := listener.Start(ctx); startErr != nil {
d.logger.WarnfCtx(ctx, "discovery: failed to start event listener: %v", startErr)
return nil
}

d.listener = listener

d.logger.InfofCtx(ctx, "discovery: event listener started for service %s", d.cfg.ServiceName)

return nil
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Bootstrap before subscribing leaves a missed-event window.

A tenant change that lands after GetActiveTenantsByService returns but before listener.Start subscribes is permanently lost with Redis Pub/Sub, so this cache can stay stale until restart. Subscribe first and buffer until reconciliation, or run a second snapshot after the listener is live.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/discovery/discovery.go` around lines 84 - 126, The
Start method currently bootstraps via d.cfg.TMClient.GetActiveTenantsByService
before creating/starting the Redis listener (tmevent.NewTenantEventListener and
listener.Start), which creates a missed-event window; fix by starting the
subscriber first and ensuring events received during bootstrap are applied:
create and Start the listener (using NewTenantEventListener and d.HandleEvent)
before calling GetActiveTenantsByService, buffer incoming events (e.g., into a
temporary channel or in-memory queue) while bootstrapping d.tenants, then after
populating d.tenants drain the buffer by invoking d.HandleEvent for each
buffered event (or alternatively perform a second GetActiveTenantsByService
reconciliation after listener.Start and merge results into d.tenants) so no
tenant changes are permanently lost.

Comment on lines +97 to +124
d.logger.InfofCtx(ctx, "discovery: bootstrapped %d tenants for service %s", len(summaries), d.cfg.ServiceName)

// Connect to Redis Pub/Sub for live updates
redisClient, redisErr := tmredis.NewTenantPubSubRedisClient(ctx, d.cfg.RedisConfig)
if redisErr != nil {
d.logger.WarnfCtx(ctx, "discovery: redis pub/sub unavailable, running in bootstrap-only mode: %v", redisErr)
return nil
}

listener, listenerErr := tmevent.NewTenantEventListener(
redisClient,
d.HandleEvent,
tmevent.WithListenerLogger(d.cfg.Logger),
tmevent.WithService(d.cfg.ServiceName),
)
if listenerErr != nil {
d.logger.WarnfCtx(ctx, "discovery: failed to create event listener: %v", listenerErr)
return nil
}

if startErr := listener.Start(ctx); startErr != nil {
d.logger.WarnfCtx(ctx, "discovery: failed to start event listener: %v", startErr)
return nil
}

d.listener = listener

d.logger.InfofCtx(ctx, "discovery: event listener started for service %s", d.cfg.ServiceName)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Use structured logging instead of InfofCtx/WarnfCtx.

These calls make fields like service, tenant_count, event_type, and error harder to query consistently. Please emit them through Log(ctx, level, msg, fields...) instead of printf-style helpers.
As per coding guidelines, "Use the structured log interface Log(ctx, level, msg, fields...) instead of printf-style methods".

Also applies to: 194-196

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/discovery/discovery.go` around lines 97 - 124, Replace
printf-style logger calls (d.logger.InfofCtx / d.logger.WarnfCtx) with the
structured logger API d.logger.Log(ctx, level, msg, fields...) in the discovery
bootstrap and event-listener setup (the calls logging bootstrapped tenant count,
redis pub/sub unavailable, failed to create event listener, failed to start
event listener, and event listener started). Emit structured fields such as
service: d.cfg.ServiceName, tenant_count: len(summaries), error: <error
variable> and event_type where relevant instead of interpolating into the
message string; do the same for the similar occurrences referenced later (the
calls around the other reported lines). Ensure you pass the correct log level
(Info/Warn) to Log and assign d.listener only after successful Start.

Comment on lines +100 to +119
redisClient, redisErr := tmredis.NewTenantPubSubRedisClient(ctx, d.cfg.RedisConfig)
if redisErr != nil {
d.logger.WarnfCtx(ctx, "discovery: redis pub/sub unavailable, running in bootstrap-only mode: %v", redisErr)
return nil
}

listener, listenerErr := tmevent.NewTenantEventListener(
redisClient,
d.HandleEvent,
tmevent.WithListenerLogger(d.cfg.Logger),
tmevent.WithService(d.cfg.ServiceName),
)
if listenerErr != nil {
d.logger.WarnfCtx(ctx, "discovery: failed to create event listener: %v", listenerErr)
return nil
}

if startErr := listener.Start(ctx); startErr != nil {
d.logger.WarnfCtx(ctx, "discovery: failed to start event listener: %v", startErr)
return nil
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Release Redis resources on listener setup failures.

After tmredis.NewTenantPubSubRedisClient succeeds, both the listenerErr and startErr branches return without cleaning up the newly created client/listener. Retried Start() calls will leak sockets/goroutines unless the client is closed on constructor failure and the listener is stopped on start failure.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/discovery/discovery.go` around lines 100 - 119, When
tmredis.NewTenantPubSubRedisClient succeeds but tmevent.NewTenantEventListener
returns an error, call redisClient.Close (or the appropriate Close/Shutdown
method on the returned client) before returning to avoid leaking connections;
likewise, if listener.Start(ctx) returns an error, call listener.Stop (or
Shutdown) and then redisClient.Close and log any cleanup errors using
d.logger.WarnfCtx. Update the branches that handle listenerErr and startErr to
perform these cleanup calls (reference tmredis.NewTenantPubSubRedisClient,
tmevent.NewTenantEventListener, listener.Start, redisClient.Close, listener.Stop
and keep using d.HandleEvent and d.logger.WarnfCtx for context).

Comment on lines +143 to +149
func (d *TenantDiscovery) OnTenantAdded(fn func(ctx context.Context, tenantID string)) {
d.onAdded = fn
}

// OnTenantRemoved registers a callback invoked when a tenant is removed from the active set.
func (d *TenantDiscovery) OnTenantRemoved(fn func(ctx context.Context, tenantID string)) {
d.onRemoved = fn
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Guard callback state and fire onAdded only on real inserts.

OnTenantAdded/OnTenantRemoved write callback fields without synchronization while event handling reads them, so registering callbacks after Start() races with listener goroutines. addTenant also invokes onAdded even when the tenant was already present, which causes duplicate worker work on repeated tenant.service.associated / tenant.service.reactivated events.

Possible fix
 func (d *TenantDiscovery) OnTenantAdded(fn func(ctx context.Context, tenantID string)) {
-	d.onAdded = fn
+	d.mu.Lock()
+	d.onAdded = fn
+	d.mu.Unlock()
 }

 // OnTenantRemoved registers a callback invoked when a tenant is removed from the active set.
 func (d *TenantDiscovery) OnTenantRemoved(fn func(ctx context.Context, tenantID string)) {
-	d.onRemoved = fn
+	d.mu.Lock()
+	d.onRemoved = fn
+	d.mu.Unlock()
 }

 // addTenant adds a tenant to the active set and fires the onAdded callback.
 func (d *TenantDiscovery) addTenant(ctx context.Context, tenantID string) {
 	d.mu.Lock()
+	_, existed := d.tenants[tenantID]
 	d.tenants[tenantID] = struct{}{}
+	onAdded := d.onAdded
 	d.mu.Unlock()

-	if d.onAdded != nil {
-		d.onAdded(ctx, tenantID)
+	if !existed && onAdded != nil {
+		onAdded(ctx, tenantID)
 	}
 }

 // removeTenant removes a tenant from the active set and fires the onRemoved callback.
 func (d *TenantDiscovery) removeTenant(ctx context.Context, tenantID string) {
 	d.mu.Lock()
 	_, existed := d.tenants[tenantID]
 	delete(d.tenants, tenantID)
+	onRemoved := d.onRemoved
 	d.mu.Unlock()

-	if existed && d.onRemoved != nil {
-		d.onRemoved(ctx, tenantID)
+	if existed && onRemoved != nil {
+		onRemoved(ctx, tenantID)
 	}
 }

As per coding guidelines, "Keep behavior nil-safe and concurrency-safe by default in exported APIs".

Also applies to: 242-260

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/discovery/discovery.go` around lines 143 - 149, The
callback fields (onAdded/onRemoved) are written without synchronization and
addTenant calls onAdded even for existing tenants; fix by adding a mutex on
TenantDiscovery (e.g., callbacksMu) and: set onAdded/onRemoved under this lock
in OnTenantAdded/OnTenantRemoved; when firing callbacks (in Start/event
listeners and in addTenant/removeTenant) acquire the lock briefly to copy the
callback to a local variable then release the lock and call the copy (nil-safe).
Also change addTenant to check if the tenant was already present and only invoke
onAdded when the insert is new to avoid duplicate work; apply the same guarded
pattern for removals.

Comment on lines +206 to +224
func (d *TenantDiscovery) handleServiceAdd(ctx context.Context, evt tmevent.TenantLifecycleEvent) error {
if !d.matchesService(evt) {
return nil
}

d.addTenant(ctx, evt.TenantID)

return nil
}

// handleServiceRemove processes service-level remove events (disassociated, suspended, purged).
func (d *TenantDiscovery) handleServiceRemove(ctx context.Context, evt tmevent.TenantLifecycleEvent) error {
if !d.matchesService(evt) {
return nil
}

d.removeTenant(ctx, evt.TenantID)

return nil
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Do not silently drop malformed service payloads.

matchesService collapses JSON decode failures into false, so a bad tenant.service.* event is treated like a service mismatch and the in-memory set can drift with no signal. Since HandleEvent already returns error, propagate the unmarshal failure or at least log it with event context.

As per coding guidelines, "Do not swallow errors; return or handle errors with context".

Also applies to: 228-236

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/discovery/discovery.go` around lines 206 - 224, The
handlers handleServiceAdd and handleServiceRemove currently ignore JSON decode
failures because matchesService collapses unmarshal errors into false; change
matchesService to surface errors (e.g., return (bool, error)) or provide a
separate validateServicePayload(ctx, evt) that returns an error, then update
handleServiceAdd and handleServiceRemove to call the new API, and if an error is
returned log it with event context and return the error to the caller
(HandleEvent) instead of silently treating it as a mismatch; apply the same
change pattern to the other service handlers referenced around lines 228-236 so
malformed tenant.service.* payloads are propagated and not swallowed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant