diff --git a/app/src/ai/mcp/file_mcp_watcher.rs b/app/src/ai/mcp/file_mcp_watcher.rs index 003188d58..98d83f0e3 100644 --- a/app/src/ai/mcp/file_mcp_watcher.rs +++ b/app/src/ai/mcp/file_mcp_watcher.rs @@ -11,9 +11,10 @@ use std::io::ErrorKind; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::sync::LazyLock; +use std::time::Duration; use warp_core::safe_warn; use warpui::{Entity, ModelContext, ModelHandle, SingletonEntity}; -use watcher::HomeDirectoryWatcherEvent; +use watcher::{BulkFilesystemWatcher, BulkFilesystemWatcherEvent, HomeDirectoryWatcherEvent}; use crate::ai::mcp::{ home_config_file_path, parsing::normalize_codex_toml_to_json, MCPProvider, @@ -32,6 +33,7 @@ static ENV_VAR_REGEX: LazyLock = /// `.warp/.mcp.json`), capturing the parent directory component. static HOME_SUBDIR_REGEX: LazyLock = LazyLock::new(|| Regex::new(r"^([^/]+)/[^/]+$").expect("Regex is valid")); +const PROJECT_FILE_MCP_WATCHER_DEBOUNCE_MILLIS: u64 = 500; /// Returns the subdirectory under the home directory that needs its own [`DirectoryWatcher`], /// inferred from the provider's home config path. Matches paths that are exactly one directory @@ -122,12 +124,18 @@ impl RepositorySubscriber for FileMCPSubscriber { /// [`FileMCPWatcherEvent`]s. pub struct FileMCPWatcher { file_mcp_tx: Sender, + /// Non-recursive watcher for project-scoped MCP config parent directories. + project_watcher: ModelHandle, /// Watcher handles for home provider subdirectories (e.g. `~/.codex`), keyed by subdir path. /// Used to cleanup watchers when the subdir is deleted at runtime. home_provider_watchers: HashMap, SubscriberId)>, /// Set of project repository root paths we are already watching for file-based MCP configs. - /// Used purely for deduplication — we never tear down project watchers during the session. + /// Used purely for deduplication — we never tear down project root watches during the session. project_repo_watchers: HashSet, + /// Project-scoped MCP config paths keyed to the logical roots and providers they affect. + project_config_targets_by_path: HashMap>, + /// Non-recursive project config parent directories currently registered with `project_watcher`. + project_roots_by_watch_dir: HashMap>, /// Tracks how many provider config files remain to be parsed for each cloud environment repo. /// When the count reaches zero, a `CloudEnvironmentScanComplete` event is emitted. cloud_env_pending: HashMap, @@ -136,6 +144,12 @@ pub struct FileMCPWatcher { impl FileMCPWatcher { pub fn new(ctx: &mut ModelContext) -> Self { let (file_mcp_tx, file_mcp_rx) = async_channel::unbounded::(); + let project_watcher = ctx.add_model(|ctx| { + BulkFilesystemWatcher::new( + Duration::from_millis(PROJECT_FILE_MCP_WATCHER_DEBOUNCE_MILLIS), + ctx, + ) + }); ctx.spawn_stream_local( file_mcp_rx, @@ -144,10 +158,12 @@ impl FileMCPWatcher { }, |_, _| {}, ); + ctx.subscribe_to_model(&project_watcher, |me, event, ctx| { + me.handle_project_watcher_event(event, ctx); + }); // Subscribe to changes to detected repositories. ctx.subscribe_to_model(&DetectedRepositories::handle(ctx), { - let file_mcp_tx = file_mcp_tx.clone(); move |me, event, ctx| { let DetectedRepositoriesEvent::DetectedGitRepo { repository, source } = event; // Register MCP servers for repos the user actively navigated to, and for @@ -164,7 +180,7 @@ impl FileMCPWatcher { providers_in_scope(repo_path.clone(), repo_path.clone()).count(); me.cloud_env_pending.insert(repo_path.clone(), count); } - me.register_repo_for_file_mcp_watching(repo_path, ctx, file_mcp_tx.clone()); + me.register_repo_for_file_mcp_watching(repo_path, ctx); } } }); @@ -221,54 +237,150 @@ impl FileMCPWatcher { Self { file_mcp_tx, + project_watcher, home_provider_watchers, project_repo_watchers: HashSet::new(), + project_config_targets_by_path: HashMap::new(), + project_roots_by_watch_dir: HashMap::new(), cloud_env_pending: HashMap::new(), } } - /// Register a project repo for file-based MCP watching via DirectoryWatcher. + /// Register a project repo for file-based MCP watching. + /// + /// Project-scoped MCP config files live at a small set of known paths relative to the repo + /// root. Watch those config parent directories non-recursively instead of subscribing to + /// recursive repository updates, which can otherwise consume one inotify watch per subdirectory + /// in large ignored trees such as `node_modules`. fn register_repo_for_file_mcp_watching( &mut self, repo_path: PathBuf, ctx: &mut ModelContext, - file_mcp_tx: Sender, ) { if self.project_repo_watchers.contains(&repo_path) { return; } + self.project_repo_watchers.insert(repo_path.clone()); - let Some(repo_handle) = - DetectedRepositories::as_ref(ctx).get_watched_repo_for_path(&repo_path, ctx) - else { + for (provider, config_path) in providers_in_scope(repo_path.clone(), repo_path.clone()) { + insert_project_config_target( + &mut self.project_config_targets_by_path, + config_path.clone(), + repo_path.clone(), + provider, + ); + Self::spawn_config_parse(config_path, repo_path.clone(), provider, ctx); + } + + for watch_dir in project_config_parent_dirs(repo_path.clone()) { + self.register_project_watch_dir(repo_path.clone(), watch_dir, ctx); + } + } + + fn register_project_watch_dir( + &mut self, + repo_path: PathBuf, + watch_dir: PathBuf, + ctx: &mut ModelContext, + ) { + if !watch_dir.is_dir() { return; - }; + } - let start = repo_handle.update(ctx, |repo, ctx| { - repo.start_watching( - Box::new(FileMCPSubscriber { - stored_dir: repo_path.clone(), - message_tx: file_mcp_tx, - }), - ctx, - ) - }); - let subscriber_id = start.subscriber_id; - // Store optimistically; removed in the error callback below if registration fails. - self.project_repo_watchers.insert(repo_path.clone()); + let roots = self + .project_roots_by_watch_dir + .entry(watch_dir.clone()) + .or_default(); + let should_register = roots.is_empty(); + roots.insert(repo_path); + + if should_register { + self.project_watcher.update(ctx, |watcher, _ctx| { + use notify_debouncer_full::notify::{RecursiveMode, WatchFilter}; + + std::mem::drop(watcher.register_path( + &watch_dir, + WatchFilter::accept_all(), + RecursiveMode::NonRecursive, + )); + }); + } + } - ctx.spawn(start.registration_future, move |me, res, ctx| { - if let Err(err) = res { - log::warn!( - "Failed to start watching {repo_path} for file-based MCP servers: {err}", - repo_path = repo_path.display(), - ); - me.project_repo_watchers.remove(&repo_path); - repo_handle.update(ctx, |repo, ctx| { - repo.stop_watching(subscriber_id, ctx); - }); + fn handle_project_watcher_event( + &mut self, + event: &BulkFilesystemWatcherEvent, + ctx: &mut ModelContext, + ) { + let added_or_updated = event.added_or_updated_set(); + let moved_to: HashSet<_> = event.moved.keys().cloned().collect(); + let moved_from: HashSet<_> = event.moved.values().cloned().collect(); + + let config_targets: Vec<_> = self + .project_config_targets_by_path + .iter() + .flat_map(|(config_path, targets)| { + targets.iter().map(move |(root_path, provider)| { + (config_path.clone(), root_path.clone(), *provider) + }) + }) + .collect(); + let mut watch_dirs_to_register = Vec::new(); + let mut watch_dirs_to_forget = HashSet::new(); + let mut config_updates = Vec::new(); + + for (config_path, root_path, provider) in config_targets { + let parent_dir = config_path.parent().map(Path::to_path_buf); + let parent_was_added = parent_dir.as_ref().is_some_and(|parent| { + parent != &root_path + && (added_or_updated.contains(parent) || moved_to.contains(parent)) + }); + let parent_was_deleted = parent_dir.as_ref().is_some_and(|parent| { + parent != &root_path + && (event.deleted.contains(parent) || moved_from.contains(parent)) + }); + + if parent_was_added { + if let Some(parent) = parent_dir.clone() { + watch_dirs_to_register.push((root_path.clone(), parent)); + } } - }); + if parent_was_deleted { + if let Some(parent) = parent_dir.clone() { + watch_dirs_to_forget.insert(parent); + } + } + + let was_deleted = event.deleted.contains(&config_path) + || moved_from.contains(&config_path) + || parent_was_deleted; + let was_added = added_or_updated.contains(&config_path) + || moved_to.contains(&config_path) + || parent_was_added; + + if was_deleted || was_added { + config_updates.push((root_path, provider, config_path, was_deleted, was_added)); + } + } + + for watch_dir in watch_dirs_to_forget { + self.project_roots_by_watch_dir.remove(&watch_dir); + } + + for (root_path, watch_dir) in watch_dirs_to_register { + self.register_project_watch_dir(root_path, watch_dir, ctx); + } + + for (root_path, provider, config_path, was_deleted, was_added) in config_updates { + self.handle_single_config_update( + root_path, + provider, + config_path, + was_deleted, + was_added, + ctx, + ); + } } /// Register a home provider subdir (e.g. `~/.codex`) for watching via `DirectoryWatcher`, @@ -610,6 +722,26 @@ fn providers_in_scope( }) } +fn project_config_parent_dirs(root_path: PathBuf) -> HashSet { + providers_in_scope(root_path.clone(), root_path) + .filter_map(|(_, config_path)| config_path.parent().map(Path::to_path_buf)) + .collect() +} + +fn insert_project_config_target( + targets_by_path: &mut HashMap>, + config_path: PathBuf, + root_path: PathBuf, + provider: MCPProvider, +) { + let targets = targets_by_path.entry(config_path).or_default(); + if !targets.iter().any(|(existing_root, existing_provider)| { + existing_root == &root_path && existing_provider == &provider + }) { + targets.push((root_path, provider)); + } +} + /// Substitutes environment variables in the format ${VAR_NAME} in the given JSON string. /// Returns an error if any environment variable is not found, as the server cannot be started. fn substitute_env_vars(json_content: &str) -> Result { diff --git a/app/src/ai/mcp/file_mcp_watcher_tests.rs b/app/src/ai/mcp/file_mcp_watcher_tests.rs index 9d4d0d6fa..54a649f37 100644 --- a/app/src/ai/mcp/file_mcp_watcher_tests.rs +++ b/app/src/ai/mcp/file_mcp_watcher_tests.rs @@ -1,5 +1,9 @@ -use super::substitute_env_vars; -use std::env; +use super::{ + insert_project_config_target, project_config_parent_dirs, providers_in_scope, + substitute_env_vars, +}; +use crate::ai::mcp::MCPProvider; +use std::{collections::HashMap, env, path::PathBuf}; fn cleanup_env_vars(vars: &[&str]) { for var in vars { @@ -72,3 +76,61 @@ fn test_substitute_env_vars_missing_or_empty() { // Cleanup cleanup_env_vars(&["EMPTY_VAR"]); } + +#[test] +fn test_project_config_parent_dirs_only_tracks_known_config_locations() { + let root = PathBuf::from("/tmp/repo"); + let mut parent_dirs = project_config_parent_dirs(root) + .into_iter() + .map(|path| path.to_string_lossy().to_string()) + .collect::>(); + parent_dirs.sort(); + + assert_eq!( + parent_dirs, + vec![ + "/tmp/repo", + "/tmp/repo/.agents", + "/tmp/repo/.codex", + "/tmp/repo/.warp", + ] + ); +} + +#[test] +fn test_providers_in_scope_preserves_project_and_provider_config_paths() { + let root = PathBuf::from("/tmp/repo"); + let mut config_paths = providers_in_scope(root.clone(), root) + .map(|(provider, path)| (provider, path.to_string_lossy().to_string())) + .collect::>(); + config_paths.sort_by(|(_, left), (_, right)| left.cmp(right)); + + assert!(config_paths.contains(&(MCPProvider::Claude, "/tmp/repo/.claude.json".into()))); + assert!(config_paths.contains(&(MCPProvider::Claude, "/tmp/repo/.mcp.json".into()))); + assert!(config_paths.contains(&(MCPProvider::Codex, "/tmp/repo/.codex/config.toml".into()))); + assert!(config_paths.contains(&(MCPProvider::Warp, "/tmp/repo/.warp/.mcp.json".into()))); + assert!(config_paths.contains(&(MCPProvider::Agents, "/tmp/repo/.agents/.mcp.json".into()))); +} + +#[test] +fn test_insert_project_config_target_deduplicates_root_provider_pairs() { + let mut targets = HashMap::new(); + let config_path = PathBuf::from("/tmp/repo/.mcp.json"); + let root = PathBuf::from("/tmp/repo"); + + insert_project_config_target( + &mut targets, + config_path.clone(), + root.clone(), + MCPProvider::Claude, + ); + insert_project_config_target( + &mut targets, + config_path.clone(), + root.clone(), + MCPProvider::Claude, + ); + insert_project_config_target(&mut targets, config_path.clone(), root, MCPProvider::Agents); + + assert_eq!(targets.get(&config_path).unwrap().len(), 2); +} diff --git a/crates/repo_metadata/src/watcher.rs b/crates/repo_metadata/src/watcher.rs index 9b588e89e..cd350929b 100644 --- a/crates/repo_metadata/src/watcher.rs +++ b/crates/repo_metadata/src/watcher.rs @@ -293,12 +293,13 @@ impl DirectoryWatcher { let registration_future = if let Some(ref watcher) = self.watcher { if let Some(local_path) = local_path.clone() { watcher.update(ctx, |watcher, _ctx| { - use crate::entry::should_ignore_git_path; + use crate::entry::{gitignores_for_directory, path_passes_filters}; use notify_debouncer_full::notify::{RecursiveMode, WatchFilter}; use std::sync::Arc; + let gitignores = gitignores_for_directory(&local_path); let watch_filter = WatchFilter::with_filter(Arc::new(move |watch_path| { - !should_ignore_git_path(watch_path) + path_passes_filters(watch_path, &gitignores) })); Some(watcher.register_path(&local_path, watch_filter, RecursiveMode::Recursive))