feat(tenant-manager): add TenantDiscovery orchestrator for background workers#416
feat(tenant-manager): add TenantDiscovery orchestrator for background workers#416jeffersonrodrigues92 wants to merge 3 commits intodevelopfrom
Conversation
… workers Combines bootstrap via GET /tenants/active with real-time Pub/Sub updates in a single entry point. Workers call Start() once and use ActiveTenantIDs() to iterate tenants. Supports OnTenantAdded/OnTenantRemoved callbacks. Redis Pub/Sub failure is non-fatal (bootstrap-only mode). X-Lerian-Ref: 0x1
WalkthroughA new Sequence Diagram(s)sequenceDiagram
participant Caller
participant TenantDiscovery
participant TMClient
participant Redis as Redis<br/>Pub/Sub
participant Callback as Callback<br/>Handlers
Caller->>TenantDiscovery: Start(ctx)
rect rgba(100, 150, 200, 0.5)
Note over TenantDiscovery,TMClient: Bootstrap Phase
TenantDiscovery->>TMClient: GetActiveTenantsByService(ctx, serviceName)
TMClient-->>TenantDiscovery: []*TenantSummary
TenantDiscovery->>TenantDiscovery: Populate in-memory tenant set
end
rect rgba(150, 100, 200, 0.5)
Note over TenantDiscovery,Redis: Redis Listener Setup
TenantDiscovery->>Redis: Create Pub/Sub listener
TenantDiscovery->>Redis: Start listening for events
Redis-->>TenantDiscovery: Connected (or warning if unavailable)
end
TenantDiscovery-->>Caller: error or nil
Caller->>TenantDiscovery: HandleEvent(ctx, event)
rect rgba(200, 150, 100, 0.5)
Note over TenantDiscovery,Callback: Event Processing
TenantDiscovery->>TenantDiscovery: Parse event payload
TenantDiscovery->>TenantDiscovery: Match service_name
alt Service matches
TenantDiscovery->>TenantDiscovery: Route to add/remove logic
TenantDiscovery->>TenantDiscovery: Update tenant set
TenantDiscovery->>Callback: Invoke OnTenantAdded or<br/>OnTenantRemoved
Callback-->>TenantDiscovery: callback complete
else Service mismatch or no-op event
TenantDiscovery->>TenantDiscovery: Skip/ignore
end
end
TenantDiscovery-->>Caller: error or nil
🚥 Pre-merge checks | ✅ 4✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@commons/tenant-manager/discovery/discovery.go`:
- Around line 143-149: The callback fields (onAdded/onRemoved) are written
without synchronization and addTenant calls onAdded even for existing tenants;
fix by adding a mutex on TenantDiscovery (e.g., callbacksMu) and: set
onAdded/onRemoved under this lock in OnTenantAdded/OnTenantRemoved; when firing
callbacks (in Start/event listeners and in addTenant/removeTenant) acquire the
lock briefly to copy the callback to a local variable then release the lock and
call the copy (nil-safe). Also change addTenant to check if the tenant was
already present and only invoke onAdded when the insert is new to avoid
duplicate work; apply the same guarded pattern for removals.
- Around line 100-119: When tmredis.NewTenantPubSubRedisClient succeeds but
tmevent.NewTenantEventListener returns an error, call redisClient.Close (or the
appropriate Close/Shutdown method on the returned client) before returning to
avoid leaking connections; likewise, if listener.Start(ctx) returns an error,
call listener.Stop (or Shutdown) and then redisClient.Close and log any cleanup
errors using d.logger.WarnfCtx. Update the branches that handle listenerErr and
startErr to perform these cleanup calls (reference
tmredis.NewTenantPubSubRedisClient, tmevent.NewTenantEventListener,
listener.Start, redisClient.Close, listener.Stop and keep using d.HandleEvent
and d.logger.WarnfCtx for context).
- Around line 84-126: The Start method currently bootstraps via
d.cfg.TMClient.GetActiveTenantsByService before creating/starting the Redis
listener (tmevent.NewTenantEventListener and listener.Start), which creates a
missed-event window; fix by starting the subscriber first and ensuring events
received during bootstrap are applied: create and Start the listener (using
NewTenantEventListener and d.HandleEvent) before calling
GetActiveTenantsByService, buffer incoming events (e.g., into a temporary
channel or in-memory queue) while bootstrapping d.tenants, then after populating
d.tenants drain the buffer by invoking d.HandleEvent for each buffered event (or
alternatively perform a second GetActiveTenantsByService reconciliation after
listener.Start and merge results into d.tenants) so no tenant changes are
permanently lost.
- Around line 206-224: The handlers handleServiceAdd and handleServiceRemove
currently ignore JSON decode failures because matchesService collapses unmarshal
errors into false; change matchesService to surface errors (e.g., return (bool,
error)) or provide a separate validateServicePayload(ctx, evt) that returns an
error, then update handleServiceAdd and handleServiceRemove to call the new API,
and if an error is returned log it with event context and return the error to
the caller (HandleEvent) instead of silently treating it as a mismatch; apply
the same change pattern to the other service handlers referenced around lines
228-236 so malformed tenant.service.* payloads are propagated and not swallowed.
- Around line 97-124: Replace printf-style logger calls (d.logger.InfofCtx /
d.logger.WarnfCtx) with the structured logger API d.logger.Log(ctx, level, msg,
fields...) in the discovery bootstrap and event-listener setup (the calls
logging bootstrapped tenant count, redis pub/sub unavailable, failed to create
event listener, failed to start event listener, and event listener started).
Emit structured fields such as service: d.cfg.ServiceName, tenant_count:
len(summaries), error: <error variable> and event_type where relevant instead of
interpolating into the message string; do the same for the similar occurrences
referenced later (the calls around the other reported lines). Ensure you pass
the correct log level (Info/Warn) to Log and assign d.listener only after
successful Start.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 55d7b844-3ed5-4cdc-bb5b-d159cfb14d5d
📒 Files selected for processing (2)
commons/tenant-manager/discovery/discovery.gocommons/tenant-manager/discovery/discovery_test.go
| func (d *TenantDiscovery) Start(ctx context.Context) error { | ||
| // Bootstrap from tenant-manager API | ||
| summaries, err := d.cfg.TMClient.GetActiveTenantsByService(ctx, d.cfg.ServiceName) | ||
| if err != nil { | ||
| return fmt.Errorf("discovery.Start: bootstrap failed: %w", err) | ||
| } | ||
|
|
||
| d.mu.Lock() | ||
| for _, s := range summaries { | ||
| d.tenants[s.ID] = struct{}{} | ||
| } | ||
| d.mu.Unlock() | ||
|
|
||
| d.logger.InfofCtx(ctx, "discovery: bootstrapped %d tenants for service %s", len(summaries), d.cfg.ServiceName) | ||
|
|
||
| // Connect to Redis Pub/Sub for live updates | ||
| redisClient, redisErr := tmredis.NewTenantPubSubRedisClient(ctx, d.cfg.RedisConfig) | ||
| if redisErr != nil { | ||
| d.logger.WarnfCtx(ctx, "discovery: redis pub/sub unavailable, running in bootstrap-only mode: %v", redisErr) | ||
| return nil | ||
| } | ||
|
|
||
| listener, listenerErr := tmevent.NewTenantEventListener( | ||
| redisClient, | ||
| d.HandleEvent, | ||
| tmevent.WithListenerLogger(d.cfg.Logger), | ||
| tmevent.WithService(d.cfg.ServiceName), | ||
| ) | ||
| if listenerErr != nil { | ||
| d.logger.WarnfCtx(ctx, "discovery: failed to create event listener: %v", listenerErr) | ||
| return nil | ||
| } | ||
|
|
||
| if startErr := listener.Start(ctx); startErr != nil { | ||
| d.logger.WarnfCtx(ctx, "discovery: failed to start event listener: %v", startErr) | ||
| return nil | ||
| } | ||
|
|
||
| d.listener = listener | ||
|
|
||
| d.logger.InfofCtx(ctx, "discovery: event listener started for service %s", d.cfg.ServiceName) | ||
|
|
||
| return nil |
There was a problem hiding this comment.
Bootstrap before subscribing leaves a missed-event window.
A tenant change that lands after GetActiveTenantsByService returns but before listener.Start subscribes is permanently lost with Redis Pub/Sub, so this cache can stay stale until restart. Subscribe first and buffer until reconciliation, or run a second snapshot after the listener is live.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@commons/tenant-manager/discovery/discovery.go` around lines 84 - 126, The
Start method currently bootstraps via d.cfg.TMClient.GetActiveTenantsByService
before creating/starting the Redis listener (tmevent.NewTenantEventListener and
listener.Start), which creates a missed-event window; fix by starting the
subscriber first and ensuring events received during bootstrap are applied:
create and Start the listener (using NewTenantEventListener and d.HandleEvent)
before calling GetActiveTenantsByService, buffer incoming events (e.g., into a
temporary channel or in-memory queue) while bootstrapping d.tenants, then after
populating d.tenants drain the buffer by invoking d.HandleEvent for each
buffered event (or alternatively perform a second GetActiveTenantsByService
reconciliation after listener.Start and merge results into d.tenants) so no
tenant changes are permanently lost.
| d.logger.InfofCtx(ctx, "discovery: bootstrapped %d tenants for service %s", len(summaries), d.cfg.ServiceName) | ||
|
|
||
| // Connect to Redis Pub/Sub for live updates | ||
| redisClient, redisErr := tmredis.NewTenantPubSubRedisClient(ctx, d.cfg.RedisConfig) | ||
| if redisErr != nil { | ||
| d.logger.WarnfCtx(ctx, "discovery: redis pub/sub unavailable, running in bootstrap-only mode: %v", redisErr) | ||
| return nil | ||
| } | ||
|
|
||
| listener, listenerErr := tmevent.NewTenantEventListener( | ||
| redisClient, | ||
| d.HandleEvent, | ||
| tmevent.WithListenerLogger(d.cfg.Logger), | ||
| tmevent.WithService(d.cfg.ServiceName), | ||
| ) | ||
| if listenerErr != nil { | ||
| d.logger.WarnfCtx(ctx, "discovery: failed to create event listener: %v", listenerErr) | ||
| return nil | ||
| } | ||
|
|
||
| if startErr := listener.Start(ctx); startErr != nil { | ||
| d.logger.WarnfCtx(ctx, "discovery: failed to start event listener: %v", startErr) | ||
| return nil | ||
| } | ||
|
|
||
| d.listener = listener | ||
|
|
||
| d.logger.InfofCtx(ctx, "discovery: event listener started for service %s", d.cfg.ServiceName) |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Use structured logging instead of InfofCtx/WarnfCtx.
These calls make fields like service, tenant_count, event_type, and error harder to query consistently. Please emit them through Log(ctx, level, msg, fields...) instead of printf-style helpers.
As per coding guidelines, "Use the structured log interface Log(ctx, level, msg, fields...) instead of printf-style methods".
Also applies to: 194-196
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@commons/tenant-manager/discovery/discovery.go` around lines 97 - 124, Replace
printf-style logger calls (d.logger.InfofCtx / d.logger.WarnfCtx) with the
structured logger API d.logger.Log(ctx, level, msg, fields...) in the discovery
bootstrap and event-listener setup (the calls logging bootstrapped tenant count,
redis pub/sub unavailable, failed to create event listener, failed to start
event listener, and event listener started). Emit structured fields such as
service: d.cfg.ServiceName, tenant_count: len(summaries), error: <error
variable> and event_type where relevant instead of interpolating into the
message string; do the same for the similar occurrences referenced later (the
calls around the other reported lines). Ensure you pass the correct log level
(Info/Warn) to Log and assign d.listener only after successful Start.
| redisClient, redisErr := tmredis.NewTenantPubSubRedisClient(ctx, d.cfg.RedisConfig) | ||
| if redisErr != nil { | ||
| d.logger.WarnfCtx(ctx, "discovery: redis pub/sub unavailable, running in bootstrap-only mode: %v", redisErr) | ||
| return nil | ||
| } | ||
|
|
||
| listener, listenerErr := tmevent.NewTenantEventListener( | ||
| redisClient, | ||
| d.HandleEvent, | ||
| tmevent.WithListenerLogger(d.cfg.Logger), | ||
| tmevent.WithService(d.cfg.ServiceName), | ||
| ) | ||
| if listenerErr != nil { | ||
| d.logger.WarnfCtx(ctx, "discovery: failed to create event listener: %v", listenerErr) | ||
| return nil | ||
| } | ||
|
|
||
| if startErr := listener.Start(ctx); startErr != nil { | ||
| d.logger.WarnfCtx(ctx, "discovery: failed to start event listener: %v", startErr) | ||
| return nil |
There was a problem hiding this comment.
Release Redis resources on listener setup failures.
After tmredis.NewTenantPubSubRedisClient succeeds, both the listenerErr and startErr branches return without cleaning up the newly created client/listener. Retried Start() calls will leak sockets/goroutines unless the client is closed on constructor failure and the listener is stopped on start failure.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@commons/tenant-manager/discovery/discovery.go` around lines 100 - 119, When
tmredis.NewTenantPubSubRedisClient succeeds but tmevent.NewTenantEventListener
returns an error, call redisClient.Close (or the appropriate Close/Shutdown
method on the returned client) before returning to avoid leaking connections;
likewise, if listener.Start(ctx) returns an error, call listener.Stop (or
Shutdown) and then redisClient.Close and log any cleanup errors using
d.logger.WarnfCtx. Update the branches that handle listenerErr and startErr to
perform these cleanup calls (reference tmredis.NewTenantPubSubRedisClient,
tmevent.NewTenantEventListener, listener.Start, redisClient.Close, listener.Stop
and keep using d.HandleEvent and d.logger.WarnfCtx for context).
| func (d *TenantDiscovery) OnTenantAdded(fn func(ctx context.Context, tenantID string)) { | ||
| d.onAdded = fn | ||
| } | ||
|
|
||
| // OnTenantRemoved registers a callback invoked when a tenant is removed from the active set. | ||
| func (d *TenantDiscovery) OnTenantRemoved(fn func(ctx context.Context, tenantID string)) { | ||
| d.onRemoved = fn |
There was a problem hiding this comment.
Guard callback state and fire onAdded only on real inserts.
OnTenantAdded/OnTenantRemoved write callback fields without synchronization while event handling reads them, so registering callbacks after Start() races with listener goroutines. addTenant also invokes onAdded even when the tenant was already present, which causes duplicate worker work on repeated tenant.service.associated / tenant.service.reactivated events.
Possible fix
func (d *TenantDiscovery) OnTenantAdded(fn func(ctx context.Context, tenantID string)) {
- d.onAdded = fn
+ d.mu.Lock()
+ d.onAdded = fn
+ d.mu.Unlock()
}
// OnTenantRemoved registers a callback invoked when a tenant is removed from the active set.
func (d *TenantDiscovery) OnTenantRemoved(fn func(ctx context.Context, tenantID string)) {
- d.onRemoved = fn
+ d.mu.Lock()
+ d.onRemoved = fn
+ d.mu.Unlock()
}
// addTenant adds a tenant to the active set and fires the onAdded callback.
func (d *TenantDiscovery) addTenant(ctx context.Context, tenantID string) {
d.mu.Lock()
+ _, existed := d.tenants[tenantID]
d.tenants[tenantID] = struct{}{}
+ onAdded := d.onAdded
d.mu.Unlock()
- if d.onAdded != nil {
- d.onAdded(ctx, tenantID)
+ if !existed && onAdded != nil {
+ onAdded(ctx, tenantID)
}
}
// removeTenant removes a tenant from the active set and fires the onRemoved callback.
func (d *TenantDiscovery) removeTenant(ctx context.Context, tenantID string) {
d.mu.Lock()
_, existed := d.tenants[tenantID]
delete(d.tenants, tenantID)
+ onRemoved := d.onRemoved
d.mu.Unlock()
- if existed && d.onRemoved != nil {
- d.onRemoved(ctx, tenantID)
+ if existed && onRemoved != nil {
+ onRemoved(ctx, tenantID)
}
}As per coding guidelines, "Keep behavior nil-safe and concurrency-safe by default in exported APIs".
Also applies to: 242-260
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@commons/tenant-manager/discovery/discovery.go` around lines 143 - 149, The
callback fields (onAdded/onRemoved) are written without synchronization and
addTenant calls onAdded even for existing tenants; fix by adding a mutex on
TenantDiscovery (e.g., callbacksMu) and: set onAdded/onRemoved under this lock
in OnTenantAdded/OnTenantRemoved; when firing callbacks (in Start/event
listeners and in addTenant/removeTenant) acquire the lock briefly to copy the
callback to a local variable then release the lock and call the copy (nil-safe).
Also change addTenant to check if the tenant was already present and only invoke
onAdded when the insert is new to avoid duplicate work; apply the same guarded
pattern for removals.
| func (d *TenantDiscovery) handleServiceAdd(ctx context.Context, evt tmevent.TenantLifecycleEvent) error { | ||
| if !d.matchesService(evt) { | ||
| return nil | ||
| } | ||
|
|
||
| d.addTenant(ctx, evt.TenantID) | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // handleServiceRemove processes service-level remove events (disassociated, suspended, purged). | ||
| func (d *TenantDiscovery) handleServiceRemove(ctx context.Context, evt tmevent.TenantLifecycleEvent) error { | ||
| if !d.matchesService(evt) { | ||
| return nil | ||
| } | ||
|
|
||
| d.removeTenant(ctx, evt.TenantID) | ||
|
|
||
| return nil |
There was a problem hiding this comment.
Do not silently drop malformed service payloads.
matchesService collapses JSON decode failures into false, so a bad tenant.service.* event is treated like a service mismatch and the in-memory set can drift with no signal. Since HandleEvent already returns error, propagate the unmarshal failure or at least log it with event context.
As per coding guidelines, "Do not swallow errors; return or handle errors with context".
Also applies to: 228-236
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@commons/tenant-manager/discovery/discovery.go` around lines 206 - 224, The
handlers handleServiceAdd and handleServiceRemove currently ignore JSON decode
failures because matchesService collapses unmarshal errors into false; change
matchesService to surface errors (e.g., return (bool, error)) or provide a
separate validateServicePayload(ctx, evt) that returns an error, then update
handleServiceAdd and handleServiceRemove to call the new API, and if an error is
returned log it with event context and return the error to the caller
(HandleEvent) instead of silently treating it as a mismatch; apply the same
change pattern to the other service handlers referenced around lines 228-236 so
malformed tenant.service.* payloads are propagated and not swallowed.
Closes #415
Summary
Adds
TenantDiscoveryorchestrator incommons/tenant-manager/discovery/that combines:GET /tenants/active?service={name}to populate an in-memory tenant set on startTenantEventListenerto keep the set current (add/remove/suspend)API
Design decisions
sync.RWMutex+map[string]struct{}for tenant set (simple, fast)EventDispatcher(avoids heavyTenantCache+TenantLoaderdependencies)service_namein payloadEvent routing
tenant.service.associatedtenant.service.reactivatedtenant.service.disassociatedtenant.service.suspendedtenant.service.purgedtenant.suspendedtenant.deletedTest plan