fix(client): handle SFU tag changes during reconnect#2149
fix(client): handle SFU tag changes during reconnect#2149
Conversation
|
📝 WalkthroughWalkthroughEnables dynamic tag resolution in event subscriptions and makes BasePeerConnection's Changes
Sequence DiagramsequenceDiagram
participant SFU as SFU Client
participant BasePeer as BasePeerConnection
participant Dispatcher as Dispatcher
participant Listener as Listener
SFU->>BasePeer: setSfuClient(newSfu)
BasePeer->>BasePeer: this.tag = newSfu.tag
Listener->>Dispatcher: on(tagSelector: () => string, handler)
Dispatcher->>Dispatcher: store handler in dynamic handlers
SFU->>BasePeer: emit event (payload)
BasePeer->>Dispatcher: dispatch(event, this.tag)
Dispatcher->>Dispatcher: emit static listeners (byTag & '*')
Dispatcher->>Dispatcher: for each dynamic handler
Dispatcher->>Listener: resolved = tagSelector()
alt resolved matches event tag (or wildcard)
Dispatcher->>Listener: call handler(payload)
else
Note over Dispatcher: skip handler
end
SFU->>BasePeer: setSfuClient(anotherSfu)
BasePeer->>BasePeer: this.tag = anotherSfu.tag
Note over Dispatcher: subsequent dispatch uses updated tagSelector results
Estimated Code Review Effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/client/src/rtc/__tests__/Subscriber.test.ts`:
- Around line 265-302: The test currently reuses the same SubscriberOffer for
both dispatches so a stale listener could pass; create two distinct offers
(e.g., subscriberOfferOld with sdp 'offer-sdp' and subscriberOfferNew with sdp
'next-offer-sdp') and dispatch the first to 'test' with subscriberOfferOld and
the second to 'next-tag' with subscriberOfferNew, then assert
subscriber.negotiate was called once with subscriberOfferNew (reference symbols:
negotiate, SubscriberOffer.create, setSfuClient, dispatcher.dispatch,
SfuEvent.create).
In `@packages/client/src/rtc/Dispatcher.ts`:
- Around line 99-105: The dynamic tag selector call inside Dispatcher.dispatch
can throw and abort the whole loop; wrap the call to tagSelector() in a
try/catch so a selector error is caught and logged/ignored and does not stop
other listeners from receiving the event. Specifically, inside the block
iterating over dynamic (the destructured { tagSelector, listener }), replace the
direct call to tagSelector() with a guarded call that catches exceptions, skips
the failing selector (optionally log via existing logger or swallow), and only
proceeds to compare dynamicTag and call this.emitOne(payload, listener) when the
selector returned successfully.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
packages/client/src/rtc/BasePeerConnection.tspackages/client/src/rtc/Dispatcher.tspackages/client/src/rtc/__tests__/Dispatcher.test.tspackages/client/src/rtc/__tests__/Subscriber.test.ts
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/client/src/rtc/Dispatcher.ts (1)
82-83:⚠️ Potential issue | 🟠 Major
'*'dispatch behavior does not match its documented contract.Line 82 says
*should dispatch to every tag, but Line 96-Line 103 only notifies wildcard listeners ('*') and not listeners registered on specific tags.Proposed fix to make wildcard truly broadcast
const { byTag, dynamic } = handlers; - this.emit(payload, byTag.get(tag)); - if (tag !== '*') this.emit(payload, byTag.get('*')); + if (tag === '*') { + for (const listeners of byTag.values()) { + this.emit(payload, listeners); + } + } else { + this.emit(payload, byTag.get(tag)); + this.emit(payload, byTag.get('*')); + } for (const { tagSelector, listener } of dynamic) { const dynamicTag = tagSelector(); - if (dynamicTag === tag || (tag !== '*' && dynamicTag === '*')) { + if (tag === '*' || dynamicTag === tag || dynamicTag === '*') { this.emitOne(payload, listener); } }Also applies to: 96-103
♻️ Duplicate comments (1)
packages/client/src/rtc/Dispatcher.ts (1)
99-103:⚠️ Potential issue | 🟠 MajorGuard dynamic tag selector failures so one bad subscriber can't break dispatch.
At Line 100,
tagSelector()can throw and abort the remaining dispatch flow for this event.Proposed defensive fix
for (const { tagSelector, listener } of dynamic) { - const dynamicTag = tagSelector(); + let dynamicTag: string; + try { + dynamicTag = tagSelector(); + } catch (e) { + this.logger.warn('Dynamic tag selector failed with error', e); + continue; + } if (dynamicTag === tag || (tag !== '*' && dynamicTag === '*')) { this.emitOne(payload, listener); } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/client/src/rtc/Dispatcher.ts` around lines 99 - 103, In the dynamic subscriber loop inside Dispatcher (where it iterates the dynamic array and calls tagSelector()), wrap the tagSelector() invocation in a try/catch so exceptions from a subscriber’s selector are contained and do not abort the whole dispatch; on catch, optionally log or swallow the error and continue to the next subscriber, only calling this.emitOne(payload, listener) when tagSelector() completes successfully and matches the tag.
🧹 Nitpick comments (1)
packages/client/src/rtc/Dispatcher.ts (1)
113-121: Avoidanyin emit path to preserve discriminated-union safety.Using
payload: anyat Line 113 and Line 119 weakens compile-time guarantees in the dispatcher core.Type-safe tightening (minimal change)
+type AnySfuPayload = AllSfuEvents[SfuEventKinds]; + - emit = (payload: any, listeners: AnyListener[] = []) => { + emit = (payload: AnySfuPayload, listeners: AnyListener[] = []) => { for (const listener of listeners) { this.emitOne(payload, listener); } }; - private emitOne = (payload: any, listener: AnyListener) => { + private emitOne = (payload: AnySfuPayload, listener: AnyListener) => { try { listener(payload); } catch (e) { this.logger.warn('Listener failed with error', e); } };As per coding guidelines: "Use Dispatcher pattern for type-safe event handling with discriminated unions via oneofKind pattern."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/client/src/rtc/Dispatcher.ts` around lines 113 - 121, The emit path uses payload: any in emit and emitOne which breaks discriminated-union safety; change these signatures to use a generic/event union type (e.g., EventPayload or a generic P) and update AnyListener to accept that specific union/generic instead of any so the oneofKind discriminant is preserved; modify emit = <P extends EventPayload>(payload: P, listeners: Array<AnyListenerFor<P>>) and emitOne = <P extends EventPayload>(payload: P, listener: AnyListenerFor<P>) (or equivalent type-mapping you already have) and update callers to pass correctly-typed payloads so compile-time checks catch invalid events. Ensure AnyListenerFor/dispatcher listener mappings reference the discriminant field (oneofKind) so handlers get narrowed properly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@packages/client/src/rtc/Dispatcher.ts`:
- Around line 99-103: In the dynamic subscriber loop inside Dispatcher (where it
iterates the dynamic array and calls tagSelector()), wrap the tagSelector()
invocation in a try/catch so exceptions from a subscriber’s selector are
contained and do not abort the whole dispatch; on catch, optionally log or
swallow the error and continue to the next subscriber, only calling
this.emitOne(payload, listener) when tagSelector() completes successfully and
matches the tag.
---
Nitpick comments:
In `@packages/client/src/rtc/Dispatcher.ts`:
- Around line 113-121: The emit path uses payload: any in emit and emitOne which
breaks discriminated-union safety; change these signatures to use a
generic/event union type (e.g., EventPayload or a generic P) and update
AnyListener to accept that specific union/generic instead of any so the
oneofKind discriminant is preserved; modify emit = <P extends
EventPayload>(payload: P, listeners: Array<AnyListenerFor<P>>) and emitOne = <P
extends EventPayload>(payload: P, listener: AnyListenerFor<P>) (or equivalent
type-mapping you already have) and update callers to pass correctly-typed
payloads so compile-time checks catch invalid events. Ensure
AnyListenerFor/dispatcher listener mappings reference the discriminant field
(oneofKind) so handlers get narrowed properly.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
packages/client/src/rtc/Dispatcher.ts
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
packages/client/src/rtc/Dispatcher.ts (1)
117-121:⚠️ Potential issue | 🟠 MajorGuard
tagSelector()failures so one dynamic selector cannot break dispatch.On Lines 117-121, an exception thrown by
tagSelector()bubbles out and can prevent remaining listeners from receiving the event.Proposed fix
emitDynamic = (payload: any, tag: string, dynamic: DynamicHandler[]) => { for (const { tagSelector, listener } of dynamic) { - const dynamicTag = tagSelector(); + let dynamicTag: string; + try { + dynamicTag = tagSelector(); + } catch (e) { + this.logger.warn('Dynamic tag selector failed with error', e); + continue; + } if (dynamicTag === tag || (tag !== '*' && dynamicTag === '*')) { this.emitOne(payload, listener); } } };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/client/src/rtc/Dispatcher.ts` around lines 117 - 121, In emitDynamic, a thrown exception from tagSelector() can abort the loop and prevent other dynamics from receiving the event; wrap the call to tagSelector() in a try/catch inside the loop (in emitDynamic) so if tagSelector() throws you catch the error, optionally record/log it, and continue to the next dynamic entry without calling emitOne for the failing selector; ensure the catch does not suppress normal behavior for other entries and that emitOne(payload, listener) is only called when tagSelector() successfully returns a tag.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/client/src/rtc/Dispatcher.ts`:
- Around line 96-98: The behavior for dispatching the wildcard tag should fan
out to every tag bucket instead of only emitting the '*' bucket: in the dispatch
logic (the calls to emit, emitDynamic and the byTag map) replace the current
three-line sequence with a branch that, when tag === '*' iterates over byTag
(for example for (const bucket of byTag.values())) and calls this.emit(payload,
bucket) for each bucket, and otherwise keep the existing behavior
(this.emit(payload, byTag.get(tag)); if (tag !== '*') this.emit(payload,
byTag.get('*')); this.emitDynamic(payload, tag, dynamic)); ensure emitDynamic is
only called for concrete tags as before.
---
Duplicate comments:
In `@packages/client/src/rtc/Dispatcher.ts`:
- Around line 117-121: In emitDynamic, a thrown exception from tagSelector() can
abort the loop and prevent other dynamics from receiving the event; wrap the
call to tagSelector() in a try/catch inside the loop (in emitDynamic) so if
tagSelector() throws you catch the error, optionally record/log it, and continue
to the next dynamic entry without calling emitOne for the failing selector;
ensure the catch does not suppress normal behavior for other entries and that
emitOne(payload, listener) is only called when tagSelector() successfully
returns a tag.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
packages/client/src/rtc/Dispatcher.ts
| this.emit(payload, byTag.get(tag)); | ||
| if (tag !== '*') this.emit(payload, byTag.get('*')); | ||
| this.emitDynamic(payload, tag, dynamic); |
There was a problem hiding this comment.
tag='*' runtime behavior does not match the current doc wording.
On Lines 96-98, dispatch('*') emits wildcard listeners, not listeners registered under every specific tag. The JSDoc says “dispatch to every tag”, which is misleading unless behavior is changed to fan out all tag buckets.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/client/src/rtc/Dispatcher.ts` around lines 96 - 98, The behavior for
dispatching the wildcard tag should fan out to every tag bucket instead of only
emitting the '*' bucket: in the dispatch logic (the calls to emit, emitDynamic
and the byTag map) replace the current three-line sequence with a branch that,
when tag === '*' iterates over byTag (for example for (const bucket of
byTag.values())) and calls this.emit(payload, bucket) for each bucket, and
otherwise keep the existing behavior (this.emit(payload, byTag.get(tag)); if
(tag !== '*') this.emit(payload, byTag.get('*')); this.emitDynamic(payload, tag,
dynamic)); ensure emitDynamic is only called for concrete tags as before.
💡 Overview
Fixes SFU signaling event routing across reconnect tag changes by using dynamic tag selectors in the dispatcher and peer subscriptions. This prevents FAST reconnect flows from ignoring new tagged SFU offers and ICE events.
Follow-up of: #2121
📝 Implementation notes
🎫 Ticket: https://linear.app/stream/issue/REACT-830/fast-reconnect-fails-due-to-ws-tag-filtering
Summary by CodeRabbit
New Features
Bug Fixes
Tests