Skip to content

fix(client): handle SFU tag changes during reconnect#2149

Open
oliverlaz wants to merge 3 commits intomainfrom
fix/fast-reconnect-dynamic-dispatcher-tag
Open

fix(client): handle SFU tag changes during reconnect#2149
oliverlaz wants to merge 3 commits intomainfrom
fix/fast-reconnect-dynamic-dispatcher-tag

Conversation

@oliverlaz
Copy link
Member

@oliverlaz oliverlaz commented Mar 3, 2026

💡 Overview

Fixes SFU signaling event routing across reconnect tag changes by using dynamic tag selectors in the dispatcher and peer subscriptions. This prevents FAST reconnect flows from ignoring new tagged SFU offers and ICE events.

Follow-up of: #2121

📝 Implementation notes

  • Update Dispatcher.on to support tag as string or tag selector function
  • Use hybrid listener storage: static tag map plus dynamic selector list
  • Update BasePeerConnection to subscribe with dynamic tag selectors and refresh tag on setSfuClient
  • Add or extend tests for dynamic tag routing, dynamic unsubscribe, and subscriber offer handling after tag changes

🎫 Ticket: https://linear.app/stream/issue/REACT-830/fast-reconnect-fails-due-to-ws-tag-filtering

Summary by CodeRabbit

  • New Features

    • Dispatcher supports dynamic tag selectors so listeners can resolve routing at dispatch time.
    • Peer connection propagates and updates its current SFU identifier for aligned event routing.
  • Bug Fixes

    • Subscriptions are cleared on detach to avoid stale handlers.
  • Tests

    • Added tests for dynamic tag routing, unsubscribe behavior, and identifier-transition edge cases.

@changeset-bot
Copy link

changeset-bot bot commented Mar 3, 2026

⚠️ No Changeset found

Latest commit: 69c1dd0

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 3, 2026

📝 Walkthrough

Walkthrough

Enables dynamic tag resolution in event subscriptions and makes BasePeerConnection's tag and subscriptions writable so a peer connection can adopt a new SFU client's tag and register handlers that resolve tags at dispatch time. Tests added for dynamic selectors and unsubscribe/negotiation behavior.

Changes

Cohort / File(s) Summary
BasePeerConnection mutability
packages/client/src/rtc/BasePeerConnection.ts
Removed readonly from tag and subscriptions. setSfuClient updates this.tag. Event registration uses a lazy getTag so handlers resolve tag at dispatch; detach clears subscriptions by reassigning the array.
Dispatcher dynamic-tag listeners
packages/client/src/rtc/Dispatcher.ts
Added `ListenerTag = string
Tests — dynamic behavior
packages/client/src/rtc/__tests__/Dispatcher.test.ts, packages/client/src/rtc/__tests__/Subscriber.test.ts
Added tests validating dynamic tag selector routing, unsubscribing dynamic listeners, and that a SubscriberOffer triggers a single negotiation when SFU client tag changes between events.

Sequence Diagram

sequenceDiagram
    participant SFU as SFU Client
    participant BasePeer as BasePeerConnection
    participant Dispatcher as Dispatcher
    participant Listener as Listener

    SFU->>BasePeer: setSfuClient(newSfu)
    BasePeer->>BasePeer: this.tag = newSfu.tag

    Listener->>Dispatcher: on(tagSelector: () => string, handler)
    Dispatcher->>Dispatcher: store handler in dynamic handlers

    SFU->>BasePeer: emit event (payload)
    BasePeer->>Dispatcher: dispatch(event, this.tag)
    Dispatcher->>Dispatcher: emit static listeners (byTag & '*')
    Dispatcher->>Dispatcher: for each dynamic handler
    Dispatcher->>Listener: resolved = tagSelector()
    alt resolved matches event tag (or wildcard)
        Dispatcher->>Listener: call handler(payload)
    else
        Note over Dispatcher: skip handler
    end

    SFU->>BasePeer: setSfuClient(anotherSfu)
    BasePeer->>BasePeer: this.tag = anotherSfu.tag
    Note over Dispatcher: subsequent dispatch uses updated tagSelector results
Loading

Estimated Code Review Effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐰 I twitch my whiskers, tags anew,

handlers wander, finding who.
When selectors hop and listeners spring,
events find ears and start to sing,
a joyful dispatch — hop!

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main change: handling SFU tag changes during reconnect by enabling dynamic tag resolution in the dispatcher and peer connections.
Description check ✅ Passed The description covers all required sections: Overview explains the fix and its impact, Implementation notes detail key changes, and ticket/docs links are provided as per template.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/fast-reconnect-dynamic-dispatcher-tag

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@oliverlaz oliverlaz requested a review from jdimovska March 3, 2026 16:45
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/client/src/rtc/__tests__/Subscriber.test.ts`:
- Around line 265-302: The test currently reuses the same SubscriberOffer for
both dispatches so a stale listener could pass; create two distinct offers
(e.g., subscriberOfferOld with sdp 'offer-sdp' and subscriberOfferNew with sdp
'next-offer-sdp') and dispatch the first to 'test' with subscriberOfferOld and
the second to 'next-tag' with subscriberOfferNew, then assert
subscriber.negotiate was called once with subscriberOfferNew (reference symbols:
negotiate, SubscriberOffer.create, setSfuClient, dispatcher.dispatch,
SfuEvent.create).

In `@packages/client/src/rtc/Dispatcher.ts`:
- Around line 99-105: The dynamic tag selector call inside Dispatcher.dispatch
can throw and abort the whole loop; wrap the call to tagSelector() in a
try/catch so a selector error is caught and logged/ignored and does not stop
other listeners from receiving the event. Specifically, inside the block
iterating over dynamic (the destructured { tagSelector, listener }), replace the
direct call to tagSelector() with a guarded call that catches exceptions, skips
the failing selector (optionally log via existing logger or swallow), and only
proceeds to compare dynamicTag and call this.emitOne(payload, listener) when the
selector returned successfully.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 6d4e2d7 and 8354974.

📒 Files selected for processing (4)
  • packages/client/src/rtc/BasePeerConnection.ts
  • packages/client/src/rtc/Dispatcher.ts
  • packages/client/src/rtc/__tests__/Dispatcher.test.ts
  • packages/client/src/rtc/__tests__/Subscriber.test.ts

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
packages/client/src/rtc/Dispatcher.ts (1)

82-83: ⚠️ Potential issue | 🟠 Major

'*' dispatch behavior does not match its documented contract.

Line 82 says * should dispatch to every tag, but Line 96-Line 103 only notifies wildcard listeners ('*') and not listeners registered on specific tags.

Proposed fix to make wildcard truly broadcast
     const { byTag, dynamic } = handlers;
-    this.emit(payload, byTag.get(tag));
-    if (tag !== '*') this.emit(payload, byTag.get('*'));
+    if (tag === '*') {
+      for (const listeners of byTag.values()) {
+        this.emit(payload, listeners);
+      }
+    } else {
+      this.emit(payload, byTag.get(tag));
+      this.emit(payload, byTag.get('*'));
+    }

     for (const { tagSelector, listener } of dynamic) {
       const dynamicTag = tagSelector();
-      if (dynamicTag === tag || (tag !== '*' && dynamicTag === '*')) {
+      if (tag === '*' || dynamicTag === tag || dynamicTag === '*') {
         this.emitOne(payload, listener);
       }
     }

Also applies to: 96-103

♻️ Duplicate comments (1)
packages/client/src/rtc/Dispatcher.ts (1)

99-103: ⚠️ Potential issue | 🟠 Major

Guard dynamic tag selector failures so one bad subscriber can't break dispatch.

At Line 100, tagSelector() can throw and abort the remaining dispatch flow for this event.

Proposed defensive fix
     for (const { tagSelector, listener } of dynamic) {
-      const dynamicTag = tagSelector();
+      let dynamicTag: string;
+      try {
+        dynamicTag = tagSelector();
+      } catch (e) {
+        this.logger.warn('Dynamic tag selector failed with error', e);
+        continue;
+      }
       if (dynamicTag === tag || (tag !== '*' && dynamicTag === '*')) {
         this.emitOne(payload, listener);
       }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/client/src/rtc/Dispatcher.ts` around lines 99 - 103, In the dynamic
subscriber loop inside Dispatcher (where it iterates the dynamic array and calls
tagSelector()), wrap the tagSelector() invocation in a try/catch so exceptions
from a subscriber’s selector are contained and do not abort the whole dispatch;
on catch, optionally log or swallow the error and continue to the next
subscriber, only calling this.emitOne(payload, listener) when tagSelector()
completes successfully and matches the tag.
🧹 Nitpick comments (1)
packages/client/src/rtc/Dispatcher.ts (1)

113-121: Avoid any in emit path to preserve discriminated-union safety.

Using payload: any at Line 113 and Line 119 weakens compile-time guarantees in the dispatcher core.

Type-safe tightening (minimal change)
+type AnySfuPayload = AllSfuEvents[SfuEventKinds];
+
-  emit = (payload: any, listeners: AnyListener[] = []) => {
+  emit = (payload: AnySfuPayload, listeners: AnyListener[] = []) => {
     for (const listener of listeners) {
       this.emitOne(payload, listener);
     }
   };

-  private emitOne = (payload: any, listener: AnyListener) => {
+  private emitOne = (payload: AnySfuPayload, listener: AnyListener) => {
     try {
       listener(payload);
     } catch (e) {
       this.logger.warn('Listener failed with error', e);
     }
   };

As per coding guidelines: "Use Dispatcher pattern for type-safe event handling with discriminated unions via oneofKind pattern."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/client/src/rtc/Dispatcher.ts` around lines 113 - 121, The emit path
uses payload: any in emit and emitOne which breaks discriminated-union safety;
change these signatures to use a generic/event union type (e.g., EventPayload or
a generic P) and update AnyListener to accept that specific union/generic
instead of any so the oneofKind discriminant is preserved; modify emit = <P
extends EventPayload>(payload: P, listeners: Array<AnyListenerFor<P>>) and
emitOne = <P extends EventPayload>(payload: P, listener: AnyListenerFor<P>) (or
equivalent type-mapping you already have) and update callers to pass
correctly-typed payloads so compile-time checks catch invalid events. Ensure
AnyListenerFor/dispatcher listener mappings reference the discriminant field
(oneofKind) so handlers get narrowed properly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@packages/client/src/rtc/Dispatcher.ts`:
- Around line 99-103: In the dynamic subscriber loop inside Dispatcher (where it
iterates the dynamic array and calls tagSelector()), wrap the tagSelector()
invocation in a try/catch so exceptions from a subscriber’s selector are
contained and do not abort the whole dispatch; on catch, optionally log or
swallow the error and continue to the next subscriber, only calling
this.emitOne(payload, listener) when tagSelector() completes successfully and
matches the tag.

---

Nitpick comments:
In `@packages/client/src/rtc/Dispatcher.ts`:
- Around line 113-121: The emit path uses payload: any in emit and emitOne which
breaks discriminated-union safety; change these signatures to use a
generic/event union type (e.g., EventPayload or a generic P) and update
AnyListener to accept that specific union/generic instead of any so the
oneofKind discriminant is preserved; modify emit = <P extends
EventPayload>(payload: P, listeners: Array<AnyListenerFor<P>>) and emitOne = <P
extends EventPayload>(payload: P, listener: AnyListenerFor<P>) (or equivalent
type-mapping you already have) and update callers to pass correctly-typed
payloads so compile-time checks catch invalid events. Ensure
AnyListenerFor/dispatcher listener mappings reference the discriminant field
(oneofKind) so handlers get narrowed properly.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 8354974 and 20998f8.

📒 Files selected for processing (1)
  • packages/client/src/rtc/Dispatcher.ts

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
packages/client/src/rtc/Dispatcher.ts (1)

117-121: ⚠️ Potential issue | 🟠 Major

Guard tagSelector() failures so one dynamic selector cannot break dispatch.

On Lines 117-121, an exception thrown by tagSelector() bubbles out and can prevent remaining listeners from receiving the event.

Proposed fix
   emitDynamic = (payload: any, tag: string, dynamic: DynamicHandler[]) => {
     for (const { tagSelector, listener } of dynamic) {
-      const dynamicTag = tagSelector();
+      let dynamicTag: string;
+      try {
+        dynamicTag = tagSelector();
+      } catch (e) {
+        this.logger.warn('Dynamic tag selector failed with error', e);
+        continue;
+      }
       if (dynamicTag === tag || (tag !== '*' && dynamicTag === '*')) {
         this.emitOne(payload, listener);
       }
     }
   };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/client/src/rtc/Dispatcher.ts` around lines 117 - 121, In
emitDynamic, a thrown exception from tagSelector() can abort the loop and
prevent other dynamics from receiving the event; wrap the call to tagSelector()
in a try/catch inside the loop (in emitDynamic) so if tagSelector() throws you
catch the error, optionally record/log it, and continue to the next dynamic
entry without calling emitOne for the failing selector; ensure the catch does
not suppress normal behavior for other entries and that emitOne(payload,
listener) is only called when tagSelector() successfully returns a tag.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/client/src/rtc/Dispatcher.ts`:
- Around line 96-98: The behavior for dispatching the wildcard tag should fan
out to every tag bucket instead of only emitting the '*' bucket: in the dispatch
logic (the calls to emit, emitDynamic and the byTag map) replace the current
three-line sequence with a branch that, when tag === '*' iterates over byTag
(for example for (const bucket of byTag.values())) and calls this.emit(payload,
bucket) for each bucket, and otherwise keep the existing behavior
(this.emit(payload, byTag.get(tag)); if (tag !== '*') this.emit(payload,
byTag.get('*')); this.emitDynamic(payload, tag, dynamic)); ensure emitDynamic is
only called for concrete tags as before.

---

Duplicate comments:
In `@packages/client/src/rtc/Dispatcher.ts`:
- Around line 117-121: In emitDynamic, a thrown exception from tagSelector() can
abort the loop and prevent other dynamics from receiving the event; wrap the
call to tagSelector() in a try/catch inside the loop (in emitDynamic) so if
tagSelector() throws you catch the error, optionally record/log it, and continue
to the next dynamic entry without calling emitOne for the failing selector;
ensure the catch does not suppress normal behavior for other entries and that
emitOne(payload, listener) is only called when tagSelector() successfully
returns a tag.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 20998f8 and 69c1dd0.

📒 Files selected for processing (1)
  • packages/client/src/rtc/Dispatcher.ts

Comment on lines +96 to +98
this.emit(payload, byTag.get(tag));
if (tag !== '*') this.emit(payload, byTag.get('*'));
this.emitDynamic(payload, tag, dynamic);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

tag='*' runtime behavior does not match the current doc wording.

On Lines 96-98, dispatch('*') emits wildcard listeners, not listeners registered under every specific tag. The JSDoc says “dispatch to every tag”, which is misleading unless behavior is changed to fan out all tag buckets.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/client/src/rtc/Dispatcher.ts` around lines 96 - 98, The behavior for
dispatching the wildcard tag should fan out to every tag bucket instead of only
emitting the '*' bucket: in the dispatch logic (the calls to emit, emitDynamic
and the byTag map) replace the current three-line sequence with a branch that,
when tag === '*' iterates over byTag (for example for (const bucket of
byTag.values())) and calls this.emit(payload, bucket) for each bucket, and
otherwise keep the existing behavior (this.emit(payload, byTag.get(tag)); if
(tag !== '*') this.emit(payload, byTag.get('*')); this.emitDynamic(payload, tag,
dynamic)); ensure emitDynamic is only called for concrete tags as before.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants