Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 165 additions & 33 deletions app/src/ai/mcp/file_mcp_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ use std::io::ErrorKind;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::LazyLock;
use std::time::Duration;
use warp_core::safe_warn;
use warpui::{Entity, ModelContext, ModelHandle, SingletonEntity};
use watcher::HomeDirectoryWatcherEvent;
use watcher::{BulkFilesystemWatcher, BulkFilesystemWatcherEvent, HomeDirectoryWatcherEvent};

use crate::ai::mcp::{
home_config_file_path, parsing::normalize_codex_toml_to_json, MCPProvider,
Expand All @@ -32,6 +33,7 @@ static ENV_VAR_REGEX: LazyLock<Regex> =
/// `.warp/.mcp.json`), capturing the parent directory component.
static HOME_SUBDIR_REGEX: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^([^/]+)/[^/]+$").expect("Regex is valid"));
const PROJECT_FILE_MCP_WATCHER_DEBOUNCE_MILLIS: u64 = 500;

/// Returns the subdirectory under the home directory that needs its own [`DirectoryWatcher`],
/// inferred from the provider's home config path. Matches paths that are exactly one directory
Expand Down Expand Up @@ -122,12 +124,18 @@ impl RepositorySubscriber for FileMCPSubscriber {
/// [`FileMCPWatcherEvent`]s.
pub struct FileMCPWatcher {
file_mcp_tx: Sender<FileMCPDetectionMessage>,
/// Non-recursive watcher for project-scoped MCP config parent directories.
project_watcher: ModelHandle<BulkFilesystemWatcher>,
/// Watcher handles for home provider subdirectories (e.g. `~/.codex`), keyed by subdir path.
/// Used to cleanup watchers when the subdir is deleted at runtime.
home_provider_watchers: HashMap<PathBuf, (ModelHandle<Repository>, SubscriberId)>,
/// Set of project repository root paths we are already watching for file-based MCP configs.
/// Used purely for deduplication — we never tear down project watchers during the session.
/// Used purely for deduplication — we never tear down project root watches during the session.
project_repo_watchers: HashSet<PathBuf>,
/// Project-scoped MCP config paths keyed to the logical roots and providers they affect.
project_config_targets_by_path: HashMap<PathBuf, Vec<(PathBuf, MCPProvider)>>,
/// Non-recursive project config parent directories currently registered with `project_watcher`.
project_roots_by_watch_dir: HashMap<PathBuf, HashSet<PathBuf>>,
/// Tracks how many provider config files remain to be parsed for each cloud environment repo.
/// When the count reaches zero, a `CloudEnvironmentScanComplete` event is emitted.
cloud_env_pending: HashMap<PathBuf, usize>,
Expand All @@ -136,6 +144,12 @@ pub struct FileMCPWatcher {
impl FileMCPWatcher {
pub fn new(ctx: &mut ModelContext<Self>) -> Self {
let (file_mcp_tx, file_mcp_rx) = async_channel::unbounded::<FileMCPDetectionMessage>();
let project_watcher = ctx.add_model(|ctx| {
BulkFilesystemWatcher::new(
Duration::from_millis(PROJECT_FILE_MCP_WATCHER_DEBOUNCE_MILLIS),
ctx,
)
});

ctx.spawn_stream_local(
file_mcp_rx,
Expand All @@ -144,10 +158,12 @@ impl FileMCPWatcher {
},
|_, _| {},
);
ctx.subscribe_to_model(&project_watcher, |me, event, ctx| {
me.handle_project_watcher_event(event, ctx);
});

// Subscribe to changes to detected repositories.
ctx.subscribe_to_model(&DetectedRepositories::handle(ctx), {
let file_mcp_tx = file_mcp_tx.clone();
move |me, event, ctx| {
let DetectedRepositoriesEvent::DetectedGitRepo { repository, source } = event;
// Register MCP servers for repos the user actively navigated to, and for
Expand All @@ -164,7 +180,7 @@ impl FileMCPWatcher {
providers_in_scope(repo_path.clone(), repo_path.clone()).count();
me.cloud_env_pending.insert(repo_path.clone(), count);
}
me.register_repo_for_file_mcp_watching(repo_path, ctx, file_mcp_tx.clone());
me.register_repo_for_file_mcp_watching(repo_path, ctx);
}
}
});
Expand Down Expand Up @@ -221,54 +237,150 @@ impl FileMCPWatcher {

Self {
file_mcp_tx,
project_watcher,
home_provider_watchers,
project_repo_watchers: HashSet::new(),
project_config_targets_by_path: HashMap::new(),
project_roots_by_watch_dir: HashMap::new(),
cloud_env_pending: HashMap::new(),
}
}

/// Register a project repo for file-based MCP watching via DirectoryWatcher.
/// Register a project repo for file-based MCP watching.
///
/// Project-scoped MCP config files live at a small set of known paths relative to the repo
/// root. Watch those config parent directories non-recursively instead of subscribing to
/// recursive repository updates, which can otherwise consume one inotify watch per subdirectory
/// in large ignored trees such as `node_modules`.
fn register_repo_for_file_mcp_watching(
&mut self,
repo_path: PathBuf,
ctx: &mut ModelContext<Self>,
file_mcp_tx: Sender<FileMCPDetectionMessage>,
) {
if self.project_repo_watchers.contains(&repo_path) {
return;
}
self.project_repo_watchers.insert(repo_path.clone());

let Some(repo_handle) =
DetectedRepositories::as_ref(ctx).get_watched_repo_for_path(&repo_path, ctx)
else {
for (provider, config_path) in providers_in_scope(repo_path.clone(), repo_path.clone()) {
insert_project_config_target(
&mut self.project_config_targets_by_path,
config_path.clone(),
repo_path.clone(),
provider,
);
Self::spawn_config_parse(config_path, repo_path.clone(), provider, ctx);
}

for watch_dir in project_config_parent_dirs(repo_path.clone()) {
self.register_project_watch_dir(repo_path.clone(), watch_dir, ctx);
}
}

fn register_project_watch_dir(
&mut self,
repo_path: PathBuf,
watch_dir: PathBuf,
ctx: &mut ModelContext<Self>,
) {
if !watch_dir.is_dir() {
return;
};
}

let start = repo_handle.update(ctx, |repo, ctx| {
repo.start_watching(
Box::new(FileMCPSubscriber {
stored_dir: repo_path.clone(),
message_tx: file_mcp_tx,
}),
ctx,
)
});
let subscriber_id = start.subscriber_id;
// Store optimistically; removed in the error callback below if registration fails.
self.project_repo_watchers.insert(repo_path.clone());
let roots = self
.project_roots_by_watch_dir
.entry(watch_dir.clone())
.or_default();
let should_register = roots.is_empty();
roots.insert(repo_path);

if should_register {
self.project_watcher.update(ctx, |watcher, _ctx| {
use notify_debouncer_full::notify::{RecursiveMode, WatchFilter};

std::mem::drop(watcher.register_path(
&watch_dir,
WatchFilter::accept_all(),
RecursiveMode::NonRecursive,
));
});
}
}

ctx.spawn(start.registration_future, move |me, res, ctx| {
if let Err(err) = res {
log::warn!(
"Failed to start watching {repo_path} for file-based MCP servers: {err}",
repo_path = repo_path.display(),
);
me.project_repo_watchers.remove(&repo_path);
repo_handle.update(ctx, |repo, ctx| {
repo.stop_watching(subscriber_id, ctx);
});
fn handle_project_watcher_event(
&mut self,
event: &BulkFilesystemWatcherEvent,
ctx: &mut ModelContext<Self>,
) {
let added_or_updated = event.added_or_updated_set();
let moved_to: HashSet<_> = event.moved.keys().cloned().collect();
let moved_from: HashSet<_> = event.moved.values().cloned().collect();

let config_targets: Vec<_> = self
.project_config_targets_by_path
.iter()
.flat_map(|(config_path, targets)| {
targets.iter().map(move |(root_path, provider)| {
(config_path.clone(), root_path.clone(), *provider)
})
})
.collect();
let mut watch_dirs_to_register = Vec::new();
let mut watch_dirs_to_forget = HashSet::new();
let mut config_updates = Vec::new();

for (config_path, root_path, provider) in config_targets {
let parent_dir = config_path.parent().map(Path::to_path_buf);
let parent_was_added = parent_dir.as_ref().is_some_and(|parent| {
parent != &root_path
&& (added_or_updated.contains(parent) || moved_to.contains(parent))
});
let parent_was_deleted = parent_dir.as_ref().is_some_and(|parent| {
parent != &root_path
&& (event.deleted.contains(parent) || moved_from.contains(parent))
});

if parent_was_added {
if let Some(parent) = parent_dir.clone() {
watch_dirs_to_register.push((root_path.clone(), parent));
}
}
});
if parent_was_deleted {
if let Some(parent) = parent_dir.clone() {
watch_dirs_to_forget.insert(parent);
}
}

let was_deleted = event.deleted.contains(&config_path)
|| moved_from.contains(&config_path)
|| parent_was_deleted;
let was_added = added_or_updated.contains(&config_path)
|| moved_to.contains(&config_path)
|| parent_was_added;

if was_deleted || was_added {
config_updates.push((root_path, provider, config_path, was_deleted, was_added));
}
}

for watch_dir in watch_dirs_to_forget {
self.project_roots_by_watch_dir.remove(&watch_dir);
}

for (root_path, watch_dir) in watch_dirs_to_register {
self.register_project_watch_dir(root_path, watch_dir, ctx);
}

for (root_path, provider, config_path, was_deleted, was_added) in config_updates {
self.handle_single_config_update(
root_path,
provider,
config_path,
was_deleted,
was_added,
ctx,
);
}
}

/// Register a home provider subdir (e.g. `~/.codex`) for watching via `DirectoryWatcher`,
Expand Down Expand Up @@ -610,6 +722,26 @@ fn providers_in_scope(
})
}

fn project_config_parent_dirs(root_path: PathBuf) -> HashSet<PathBuf> {
providers_in_scope(root_path.clone(), root_path)
.filter_map(|(_, config_path)| config_path.parent().map(Path::to_path_buf))
.collect()
}

fn insert_project_config_target(
targets_by_path: &mut HashMap<PathBuf, Vec<(PathBuf, MCPProvider)>>,
config_path: PathBuf,
root_path: PathBuf,
provider: MCPProvider,
) {
let targets = targets_by_path.entry(config_path).or_default();
if !targets.iter().any(|(existing_root, existing_provider)| {
existing_root == &root_path && existing_provider == &provider
}) {
targets.push((root_path, provider));
}
}

/// Substitutes environment variables in the format ${VAR_NAME} in the given JSON string.
/// Returns an error if any environment variable is not found, as the server cannot be started.
fn substitute_env_vars(json_content: &str) -> Result<String, anyhow::Error> {
Expand Down
66 changes: 64 additions & 2 deletions app/src/ai/mcp/file_mcp_watcher_tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use super::substitute_env_vars;
use std::env;
use super::{
insert_project_config_target, project_config_parent_dirs, providers_in_scope,
substitute_env_vars,
};
use crate::ai::mcp::MCPProvider;
use std::{collections::HashMap, env, path::PathBuf};

fn cleanup_env_vars(vars: &[&str]) {
for var in vars {
Expand Down Expand Up @@ -72,3 +76,61 @@ fn test_substitute_env_vars_missing_or_empty() {
// Cleanup
cleanup_env_vars(&["EMPTY_VAR"]);
}

#[test]
fn test_project_config_parent_dirs_only_tracks_known_config_locations() {
let root = PathBuf::from("/tmp/repo");
let mut parent_dirs = project_config_parent_dirs(root)
.into_iter()
.map(|path| path.to_string_lossy().to_string())
.collect::<Vec<_>>();
parent_dirs.sort();

assert_eq!(
parent_dirs,
vec![
"/tmp/repo",
"/tmp/repo/.agents",
"/tmp/repo/.codex",
"/tmp/repo/.warp",
]
);
}

#[test]
fn test_providers_in_scope_preserves_project_and_provider_config_paths() {
let root = PathBuf::from("/tmp/repo");
let mut config_paths = providers_in_scope(root.clone(), root)
.map(|(provider, path)| (provider, path.to_string_lossy().to_string()))
.collect::<Vec<_>>();
config_paths.sort_by(|(_, left), (_, right)| left.cmp(right));

assert!(config_paths.contains(&(MCPProvider::Claude, "/tmp/repo/.claude.json".into())));
assert!(config_paths.contains(&(MCPProvider::Claude, "/tmp/repo/.mcp.json".into())));
assert!(config_paths.contains(&(MCPProvider::Codex, "/tmp/repo/.codex/config.toml".into())));
assert!(config_paths.contains(&(MCPProvider::Warp, "/tmp/repo/.warp/.mcp.json".into())));
assert!(config_paths.contains(&(MCPProvider::Agents, "/tmp/repo/.agents/.mcp.json".into())));
}

#[test]
fn test_insert_project_config_target_deduplicates_root_provider_pairs() {
let mut targets = HashMap::new();
let config_path = PathBuf::from("/tmp/repo/.mcp.json");
let root = PathBuf::from("/tmp/repo");

insert_project_config_target(
&mut targets,
config_path.clone(),
root.clone(),
MCPProvider::Claude,
);
insert_project_config_target(
&mut targets,
config_path.clone(),
root.clone(),
MCPProvider::Claude,
);
insert_project_config_target(&mut targets, config_path.clone(), root, MCPProvider::Agents);

assert_eq!(targets.get(&config_path).unwrap().len(), 2);
}
5 changes: 3 additions & 2 deletions crates/repo_metadata/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,13 @@ impl DirectoryWatcher {
let registration_future = if let Some(ref watcher) = self.watcher {
if let Some(local_path) = local_path.clone() {
watcher.update(ctx, |watcher, _ctx| {
use crate::entry::should_ignore_git_path;
use crate::entry::{gitignores_for_directory, path_passes_filters};
use notify_debouncer_full::notify::{RecursiveMode, WatchFilter};
use std::sync::Arc;
let gitignores = gitignores_for_directory(&local_path);

let watch_filter = WatchFilter::with_filter(Arc::new(move |watch_path| {
!should_ignore_git_path(watch_path)
path_passes_filters(watch_path, &gitignores)
}));

Some(watcher.register_path(&local_path, watch_filter, RecursiveMode::Recursive))
Expand Down
Loading