fix(channels): thread workspace identity through channel events to prevent cross-workspace persistence#2633
Conversation
…event cross-workspace persistence `ChannelMessageReceived` and `ChannelMessageProcessed` carried no workspace identity, so `ConversationPersistenceSubscriber` and `TelegramRemoteSubscriber` would write events into the wrong workspace when login/workspace changed while events were still in flight. - Add `workspace_dir: PathBuf` to both `DomainEvent` variants with rustdoc describing the publisher/subscriber routing contract - Populate from `ctx.workspace_dir` at all three publish sites in dispatch.rs - Add early-return workspace guards in both persistence subscribers; mismatched events are silently dropped with a debug log - Add 11 exhaustive tests covering matching workspace (positive controls), stale workspace drops, workspace-switch mid-conversation race, multiple stale workspaces, and correct-after-stale ordering Closes tinyhumansai#2602
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThreads workspace_dir through ChannelMessageReceived/ChannelMessageProcessed; publishers attach ctx.workspace_dir and workspace-scoped subscribers (Telegram, persistence) validate and drop events whose workspace_dir differs from their configured workspace. ChangesWorkspace-identity event contract and guards
Sequence DiagramssequenceDiagram
participant Dispatch as Channel Dispatch
participant EventBus as Event Bus
participant TelegramSub as Telegram Remote Subscriber
participant PersistSub as Conversation Persistence Subscriber
participant Store as Conversation Store
Dispatch->>EventBus: publish ChannelMessageReceived{workspace_dir: A}
par Telegram handling
EventBus->>TelegramSub: deliver ChannelMessageReceived{workspace_dir: A}
TelegramSub->>TelegramSub: extract & validate workspace_dir
alt matches
TelegramSub->>TelegramSub: set busy state
else stale
TelegramSub->>TelegramSub: debug log & drop
end
and Persistence handling
EventBus->>PersistSub: deliver ChannelMessageReceived{workspace_dir: A}
PersistSub->>PersistSub: extract & validate workspace_dir
alt matches
PersistSub->>Store: persist_channel_turn
else stale
PersistSub->>PersistSub: debug log & drop
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
…s 4 and 11 auth_store_session calls GET /auth/me internally; without waiting for it the session token may not be stored yet and composio_list_triggers / composio_enable_trigger fail with "no backend session token". Matches the pattern already used in Scenario 1.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@app/test/e2e/specs/mega-flow.spec.ts`:
- Around line 240-243: The waitForMockRequest('GET', '/auth/me', 10_000) call
can silently return undefined on timeout; change the test to explicitly assert
the returned value and fail fast with diagnostic output: call waitForMockRequest
and if it returns undefined throw an Error that includes a summary of recent
mock request logs and call dumpAccessibilityTree() (or include its output) to
aid debugging; apply the same explicit assertion+diagnostic pattern for the
other occurrence of waitForMockRequest in this spec so Composio RPCs never
proceed when /auth/me wasn't observed.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 84438ee8-3f80-4e95-bc53-e81dda0c420a
📒 Files selected for processing (1)
app/test/e2e/specs/mega-flow.spec.ts
M3gA-Mind
left a comment
There was a problem hiding this comment.
Walkthrough
This PR threads a workspace_dir: PathBuf field through two DomainEvent variants (ChannelMessageReceived and ChannelMessageProcessed) so that channel-event subscribers can reject events that originated from a different workspace. Both active subscribers — memory_conversations/bus.rs and channels/providers/telegram/bus.rs — gain an early-return guard that compares the event's workspace_dir against the subscriber's own binding and drops the event if they differ. The implementation is completed with 11 new workspace-identity tests, updated existing test constructions, and an E2E fix in mega-flow.spec.ts that adds a missing GET /auth/me wait so composio RPC calls no longer race against session-store completion.
Change Summary
| File | Change |
|---|---|
src/core/event_bus/events.rs |
Added workspace_dir: std::path::PathBuf field to ChannelMessageReceived and ChannelMessageProcessed; added module-level doc section |
src/openhuman/channels/runtime/dispatch.rs |
Populated workspace_dir at all 3 publish_global call sites from ctx.workspace_dir |
src/openhuman/memory_conversations/bus.rs |
Added workspace guard in both handled variants; 7 new tests + 4 updated event constructions |
src/openhuman/channels/providers/telegram/bus.rs |
Added workspace guard for ChannelMessageReceived; 4 new tests + 2 updated event constructions |
src/core/event_bus/events_tests.rs |
Updated 2 event constructions with workspace_dir |
src/openhuman/channels/routes_tests.rs |
Updated 1 event construction using the real tempdir path |
app/test/e2e/specs/mega-flow.spec.ts |
Added waitForMockRequest('GET', '/auth/me', 10_000) after deep-link auth in Scenarios 4 and 11 |
Per-File Analysis
src/core/event_bus/events.rs
The new workspace_dir field is correctly placed on both channel-message variants. Using std::path::PathBuf (fully qualified) keeps the events file free of a use import and matches the type used throughout the codebase for workspace paths. The module-level doc section clearly documents the guard contract, which is valuable given that DomainEvent is #[non_exhaustive].
src/openhuman/channels/runtime/dispatch.rs
All three publish_global call sites are updated. ctx.workspace_dir.as_ref().clone() is the correct idiom for Arc<PathBuf> → PathBuf. No publish site was missed.
src/openhuman/memory_conversations/bus.rs
The guard pattern (if *workspace_dir != self.workspace_dir { log::debug!(...); return; }) is idiomatic and consistent. The debug log includes both paths, enabling correlation in traces. Test coverage is thorough: 7 new cases cover matching workspace (persists), mismatching workspace (drops), and both event variants. The 4 updated existing constructions correctly include workspace_dir.
Note: CONVERSATION_PERSISTENCE_HANDLE is a static OnceLock<SubscriptionHandle> — this is pre-existing design that prevents re-registration if the core process restarts within the same OS process (e.g. restart_core_process). The workspace guard added here does not change or worsen this behavior, but it is worth tracking as a follow-on.
src/openhuman/channels/providers/telegram/bus.rs
Guard is correctly applied using tracing::debug! (consistent with the existing logging style in this file). 4 new tests mirror the memory_conversations pattern. Both updated existing event constructions include the workspace_dir field.
src/core/event_bus/events_tests.rs and src/openhuman/channels/routes_tests.rs
Mechanical updates. routes_tests.rs correctly uses the actual tempdir.path().to_path_buf() rather than a placeholder, which is good hygiene.
app/test/e2e/specs/mega-flow.spec.ts
The root cause was correct: auth_store_session internally calls GET /auth/me before writing the session token to disk, so composio RPC calls issued immediately after POST /telegram/login-tokens/ could race. Adding await waitForMockRequest('GET', '/auth/me', 10_000) is the right fix and matches the pattern already used in Scenario 1.
Actionable Inline Comments
app/test/e2e/specs/mega-flow.spec.ts — Scenarios 4 and 11: assert the waitForMockRequest return value
Scenario 1 (the reference pattern) assigns and asserts the result:
const me = await waitForMockRequest('GET', '/auth/me', 10_000);
expect(me).toBeDefined();Scenarios 4 and 11 fire-and-forget the call. If the wait times out and returns undefined, the test silently continues and the composio race condition will resurface in a harder-to-diagnose way. Prefer the asserting form so a timeout fails visibly at the wait site instead of at a downstream assertion.
Nitpicks
src/openhuman/memory_conversations/bus.rslog: the debug message doesn't includemessage_idorchannel— both are available in the destructure and would help correlate log lines to specific events in traces.src/openhuman/channels/providers/telegram/bus.rs: TheChannelMessageProcessedarm uses..to ignoreworkspace_dir(telegram bus only handlesChannelMessageReceived). A short comment noting thatChannelMessageProcessedis intentionally unguarded here would help future readers.
Summary
The implementation is correct and complete. All publish sites are updated, both active subscribers are guarded, workspace identity is tested at unit level, and the E2E race is fixed. The one actionable item is asserting the waitForMockRequest return value in the E2E fix — cosmetic but materially improves debuggability on timeout.
graycyrus
left a comment
There was a problem hiding this comment.
Clean PR — well done. Moving to approval queue.
This is a solid fix for the cross-workspace persistence race. The approach of threading workspace_dir through the event variants and having subscribers guard against mismatches is exactly right — no new abstractions, minimal allocation cost, and the existing PathBuf identity convention is maintained.
What I verified
| Check | Result |
|---|---|
All 3 publish_global call sites in dispatch.rs thread ctx.workspace_dir |
✅ |
Both affected subscribers (ConversationPersistenceSubscriber, TelegramRemoteSubscriber) have workspace guards |
✅ |
No other handlers match ChannelMessageReceived/ChannelMessageProcessed without the diff updating them |
✅ |
Publisher/subscriber contracts documented in events.rs module docs |
✅ |
| 11 new tests cover: positive control, stale drop, mid-conversation switch, multiple stale, correct-after-stale | ✅ |
| Issue #2602 acceptance criteria (repro gone, regression safety, diff coverage, event contract docs) | ✅ All met |
CodeRabbit overlap
CodeRabbit flagged the waitForMockRequest silent-timeout issue in the E2E test — that's a valid nit but orthogonal to the core fix. No additional findings from my review.
Nice work threading this through cleanly without over-engineering it. 👍
Resolves conflict in mega-flow.spec.ts: upstream used a slightly different comment on the /auth/me wait; accepted upstream wording.
…arios Addresses @coderabbitai: bare await calls could silently time out and return undefined, letting downstream composio assertions carry the error in a confusing way. Use expect(...).toBeDefined() so a timeout fails visibly at the wait site, consistent with the pattern in Scenario 1.
There was a problem hiding this comment.
🧹 Nitpick comments (3)
app/test/e2e/specs/mega-flow.spec.ts (3)
242-242: ⚡ Quick winFollow the Scenario 1 pattern and add failure diagnostics before Composio RPCs.
The comment at lines 240-241 explains this wait ensures session persistence before Composio RPCs. A timeout here would cause cryptic downstream failures. Capturing the result enables diagnostic logging that would speed debugging.
Refactor to match the established pattern
- expect(await waitForMockRequest('GET', '/auth/me', 10_000)).toBeDefined(); + const me = await waitForMockRequest('GET', '/auth/me', 10_000); + expect(me).toBeDefined(); + if (!me) { + console.log(`${LOG} Scenario 4: /auth/me timeout before Composio RPCs; recent requests:`, getRequestLog()); + }As per coding guidelines: "In E2E specs, assert both UI outcomes and backend/mock effects when relevant. Add failure diagnostics (request logs,
dumpAccessibilityTree()) for faster debugging by agents."🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/test/e2e/specs/mega-flow.spec.ts` at line 242, Capture the result of waitForMockRequest('GET', '/auth/me', 10_000) into a variable (e.g., const authMeReq = await waitForMockRequest(...)) instead of asserting directly, and if it is undefined or times out, log diagnostic details (mock request logs, the response body/headers if available) and call dumpAccessibilityTree() before proceeding to any Composio RPC calls; update the assertion to expect(authMeReq).toBeDefined() so failures produce those diagnostics and speed debugging (reference: waitForMockRequest and dumpAccessibilityTree()).
202-202: ⚡ Quick winFollow the Scenario 1 pattern for consistency and add failure diagnostics.
The inline assertion prevents adding diagnostic output on timeout. Scenario 1 (lines 148-150) demonstrates the preferred pattern: assign the result, assert it's defined, then log. This approach enables conditional diagnostic logging if the wait times out.
Refactor to match the established pattern
- expect(await waitForMockRequest('GET', '/auth/me', 10_000)).toBeDefined(); + const me = await waitForMockRequest('GET', '/auth/me', 10_000); + expect(me).toBeDefined(); + if (!me) { + console.log(`${LOG} Scenario 3: /auth/me timeout after Gmail login; recent requests:`, getRequestLog()); + }As per coding guidelines: "In E2E specs, assert both UI outcomes and backend/mock effects when relevant. Add failure diagnostics (request logs,
dumpAccessibilityTree()) for faster debugging by agents."🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/test/e2e/specs/mega-flow.spec.ts` at line 202, Replace the inline assertion that directly expects waitForMockRequest('GET', '/auth/me', 10_000) with the Scenario 1 pattern: first assign the awaited result of waitForMockRequest('GET', '/auth/me', 10_000) to a variable (e.g., authRequest), then assert authRequest is defined, and on the failure branch log diagnostic info (e.g., request logs and call dumpAccessibilityTree()) to aid debugging; ensure you reference the waitForMockRequest call and add the conditional diagnostics after the undefined check so timeouts produce useful output.
569-569: ⚡ Quick winFollow the Scenario 1 pattern and add failure diagnostics before webhook setup.
The comment at line 568 indicates this wait ensures session persistence before proceeding. Capturing the result enables diagnostic output that would clarify timeout failures in this complex Composio + webhook scenario.
Refactor to match the established pattern
- expect(await waitForMockRequest('GET', '/auth/me', 10_000)).toBeDefined(); + const me = await waitForMockRequest('GET', '/auth/me', 10_000); + expect(me).toBeDefined(); + if (!me) { + console.log(`${LOG} Scenario 11: /auth/me timeout before Composio+webhook setup; recent requests:`, getRequestLog()); + }As per coding guidelines: "In E2E specs, assert both UI outcomes and backend/mock effects when relevant. Add failure diagnostics (request logs,
dumpAccessibilityTree()) for faster debugging by agents."🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/test/e2e/specs/mega-flow.spec.ts` at line 569, Capture the result of the waitForMockRequest call (the GET '/auth/me' call) instead of discarding it, then assert it and emit failure diagnostics before the webhook setup: if the awaited value is undefined/null or the expect fails, log the returned mock request object (or its request/response details) and call dumpAccessibilityTree() to help debug UI state; update the line using waitForMockRequest('GET', '/auth/me', 10_000) to follow the Scenario 1 pattern of storing the result, asserting it with expect(...).toBeDefined(), and printing/logging the request details and dumpAccessibilityTree() on failure.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@app/test/e2e/specs/mega-flow.spec.ts`:
- Line 242: Capture the result of waitForMockRequest('GET', '/auth/me', 10_000)
into a variable (e.g., const authMeReq = await waitForMockRequest(...)) instead
of asserting directly, and if it is undefined or times out, log diagnostic
details (mock request logs, the response body/headers if available) and call
dumpAccessibilityTree() before proceeding to any Composio RPC calls; update the
assertion to expect(authMeReq).toBeDefined() so failures produce those
diagnostics and speed debugging (reference: waitForMockRequest and
dumpAccessibilityTree()).
- Line 202: Replace the inline assertion that directly expects
waitForMockRequest('GET', '/auth/me', 10_000) with the Scenario 1 pattern: first
assign the awaited result of waitForMockRequest('GET', '/auth/me', 10_000) to a
variable (e.g., authRequest), then assert authRequest is defined, and on the
failure branch log diagnostic info (e.g., request logs and call
dumpAccessibilityTree()) to aid debugging; ensure you reference the
waitForMockRequest call and add the conditional diagnostics after the undefined
check so timeouts produce useful output.
- Line 569: Capture the result of the waitForMockRequest call (the GET
'/auth/me' call) instead of discarding it, then assert it and emit failure
diagnostics before the webhook setup: if the awaited value is undefined/null or
the expect fails, log the returned mock request object (or its request/response
details) and call dumpAccessibilityTree() to help debug UI state; update the
line using waitForMockRequest('GET', '/auth/me', 10_000) to follow the Scenario
1 pattern of storing the result, asserting it with expect(...).toBeDefined(),
and printing/logging the request details and dumpAccessibilityTree() on failure.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 0ccc89ee-f7be-4a9f-a26c-ac3991666bf5
📒 Files selected for processing (2)
.claude/memory.mdapp/test/e2e/specs/mega-flow.spec.ts
✅ Files skipped from review due to trivial changes (1)
- .claude/memory.md
…rkspace-identity guard Upstream refactored ConversationPersistenceSubscriber to use Arc<RwLock<PathBuf>> + workspace_dir_snapshot() so the binding can be updated across core restarts (solving the OnceLock re-registration issue). The merge combines both approaches: - workspace_dir_snapshot() resolves the subscriber's current binding - event workspace_dir is compared against it; mismatches are dropped with a debug log (our PR's guard) - persist_channel_turn uses the subscriber's own snapshot as the path
The mock server logs /auth/me on receipt (before the response is sent), so waitForMockRequest returns while the core is still awaiting the response. Without a brief pause, composio_list_triggers / composio_enable_trigger race against auth_store_session finishing and see "no backend session token". 500ms matches the pattern already used in Scenario 3 for a similar post-deep-link settle. Also assert /telegram/login-tokens/ wait in both scenarios so a timeout fails visibly at the wait site.
|
Pre-existing CI failures — not caused by this PR The two failing checks (
See upstream main run 26410626968 for confirmation. This PR does not touch |
…workspace-identity
…o race fix Upstream implemented waitForCoreSessionToken() which polls the core RPC directly to confirm the JWT has been flushed to disk — more reliable than our browser.pause(500) or the /auth/me mock-log signal. Keeping our expect(...).toBeDefined() assertions on the /telegram/login-tokens/ waits.
Summary
workspace_dir: PathBuftoDomainEvent::ChannelMessageReceivedandChannelMessageProcessedso publishers carry workspace identity at event creation time.ctx.workspace_dirinto all threepublish_globalcall sites inchannels/runtime/dispatch.rs.ConversationPersistenceSubscriberandTelegramRemoteSubscriber; mismatched-workspace events are silently dropped with adebuglog, preventing cross-workspace JSONL writes.Problem
ConversationPersistenceSubscriber(andTelegramRemoteSubscriber) capturedworkspace_dirat registration time with no way to tell whether an in-flight event came from the current workspace or a stale one. When login/workspace switched whileChannelMessageReceived/ChannelMessageProcessedevents were still being processed, the subscriber wrote them into the wrong workspace's JSONL store.Solution
Thread explicit workspace identity through the two affected
DomainEventvariants so every subscriber can make a local decision to accept or reject. ThePathBuftype was chosen because workspace identity is already represented as a filesystem path throughout the codebase — no new abstraction needed. Mismatched events are dropped before any store write occurs.Submission Checklist
diff-cover) meet the gate enforced by.github/workflows/coverage.yml. Runpnpm test:coverageandpnpm test:rustlocally; PRs below 80% on changed lines will not merge.## Related— no matrix feature IDs map to the channel event workspace routing fix.Closes #NNNin the## RelatedsectionImpact
PathBuf::clone()at publish is one allocation per event on an already-async broadcast path.Related
Closes #2602
AI Authored PR Metadata (required for Codex/Linear PRs)
Linear Issue
Commit & Branch
fix/channel-events-workspace-identity8c13e6d5211416174333e3682b2354165c0671dbValidation Run
pnpm --filter openhuman-app format:checkpnpm typecheck— 4 pre-existing iOS module errors onupstream/main(confirmed present before these changes by stashing and re-running)cargo test -p openhuman --lib memory_conversations::bus(13 passed),cargo test -p openhuman --lib channels::providers::telegram::bus(6 passed), fullcargo test -p openhuman --lib(9492 passed)cargo fmt --all -- --checkclean,cargo check --manifest-path Cargo.tomlcleanValidation Blocked
pnpm build/pnpm compileTS2307: Cannot find module 'qrcode.react'(and 3 other iOS experimental deps)upstream/main; confirmed by stashing changes and reproducing identically. Not introduced by this PR.Behavior Changes
workspace_dirdoes not match the subscriber's binding, instead of silently writing to the wrong workspace store.Parity Contract
Duplicate / Superseded PR Handling
Summary by CodeRabbit
Bug Fixes
Tests
Documentation