Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,374 changes: 0 additions & 2,374 deletions src/indexer/pipeline.rs

This file was deleted.

197 changes: 197 additions & 0 deletions src/indexer/pipeline/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
//! Context-string assembly for a node + edge bundle, plus the two recovery
//! paths that re-run that assembly outside the main indexer:
//! - `regenerate_context_strings`: incremental dirty propagation (rebuilds
//! ctx for nodes whose cross-file edges flipped during a re-index).
//! - `repair_null_context_strings`: startup repair when a prior Phase 3
//! transaction failed and left rows with NULL context_string.
//!
//! `categorize_edges` and `format_route_from_metadata` are also used by the
//! main `index_files` Phase 3 builder, so they live here as `pub(super)`.

use anyhow::Result;
use std::collections::{HashMap, HashSet};

use crate::embedding::context::{build_context_string, NodeContext};
use crate::embedding::model::EmbeddingModel;
use crate::storage::db::Database;
use crate::storage::queries::{
get_edges_batch, get_nodes_missing_context, get_nodes_with_files_by_ids,
update_context_strings_batch, EdgeInfo, NodeResult,
};
use crate::domain::{REL_CALLS, REL_IMPORTS, REL_INHERITS, REL_ROUTES_TO, REL_IMPLEMENTS, REL_EXPORTS};

use super::embed::embed_and_store_batch;

/// Extract "METHOD path" from route edge metadata JSON, falling back to the edge name.
pub(super) fn format_route_from_metadata(metadata: Option<&str>, name: &str) -> String {
if let Some(meta) = metadata {
if let Ok(v) = serde_json::from_str::<serde_json::Value>(meta) {
let method = v["method"].as_str().unwrap_or("ALL");
if let Some(path) = v["path"].as_str() {
return format!("{} {}", method, path);
}
}
}
name.to_string()
}

pub(super) struct CategorizedEdges {
pub callees: Vec<String>,
pub callers: Vec<String>,
pub inherits: Vec<String>,
pub routes: Vec<String>,
pub imports: Vec<String>,
pub implements: Vec<String>,
pub exports: Vec<String>,
}

pub(super) fn categorize_edges(edges: Option<&Vec<EdgeInfo>>, format_route: impl Fn(Option<&str>, &str) -> String) -> CategorizedEdges {
let mut result = CategorizedEdges {
callees: Vec::new(),
callers: Vec::new(),
inherits: Vec::new(),
routes: Vec::new(),
imports: Vec::new(),
implements: Vec::new(),
exports: Vec::new(),
};
if let Some(edge_list) = edges {
for (relation, direction, name, metadata) in edge_list {
match (relation.as_str(), direction.as_str()) {
(rel, "out") if rel == REL_CALLS => result.callees.push(name.clone()),
(rel, "in") if rel == REL_CALLS => result.callers.push(name.clone()),
(rel, "out") if rel == REL_INHERITS => result.inherits.push(name.clone()),
(rel, "out") if rel == REL_ROUTES_TO => {
result.routes.push(format_route(metadata.as_deref(), name));
}
(rel, "out") if rel == REL_IMPORTS => result.imports.push(name.clone()),
(rel, "out") if rel == REL_IMPLEMENTS => result.implements.push(name.clone()),
(rel, "out") if rel == REL_EXPORTS => result.exports.push(name.clone()),
_ => {}
}
}
}
result
}

/// Regenerate context strings (and embeddings) for the given set of dirty nodes.
pub(super) fn regenerate_context_strings(db: &Database, dirty_ids: &HashSet<i64>, model: Option<&EmbeddingModel>) -> Result<()> {
let tx = db.conn().unchecked_transaction()?;
let id_vec: Vec<i64> = dirty_ids.iter().copied().collect();
let all_edges = get_edges_batch(db.conn(), &id_vec)?;
let all_nodes: HashMap<i64, (NodeResult, String, Option<String>)> = {
let nwfs = get_nodes_with_files_by_ids(db.conn(), &id_vec)?;
nwfs.into_iter().map(|nwf| (nwf.node.id, (nwf.node, nwf.file_path, nwf.language))).collect()
};

// Build all context strings first
let mut context_updates: Vec<(i64, String)> = Vec::with_capacity(dirty_ids.len());
for &node_id in dirty_ids {
if let Some((node, file_path, language)) = all_nodes.get(&node_id) {
let edges = all_edges.get(&node_id);
let cat = categorize_edges(edges, format_route_from_metadata);

let ctx = build_context_string(&NodeContext {
node_type: node.node_type.clone(),
name: node.name.clone(),
qualified_name: node.qualified_name.clone(),
file_path: file_path.clone(),
language: language.clone(),
signature: node.signature.clone(),
return_type: node.return_type.clone(),
param_types: node.param_types.clone(),
code_content: Some(node.code_content.clone()),
routes: cat.routes,
callees: cat.callees,
callers: cat.callers,
inherits: cat.inherits,
imports: cat.imports,
implements: cat.implements,
exports: cat.exports,
doc_comment: node.doc_comment.clone(),
});

context_updates.push((node_id, ctx));
}
}

// Batch update context strings
update_context_strings_batch(db.conn(), &context_updates)?;
tx.commit()?;

// Embed outside the committed tx — recoverable on failure
if let Some(m) = model {
if db.vec_enabled() {
embed_and_store_batch(db, m, &context_updates)?;
}
}
Ok(())
}

/// Repair nodes that have NULL context_string (likely from a failed Phase 3).
/// This is called at startup after index verification.
pub fn repair_null_context_strings(
db: &Database,
model: Option<&EmbeddingModel>,
) -> Result<usize> {
let missing_ids = get_nodes_missing_context(db.conn())?;
if missing_ids.is_empty() {
return Ok(0);
}

tracing::info!("[repair] Found {} nodes with NULL context_string, rebuilding...", missing_ids.len());

// Load node details with file paths
let nodes_with_files = get_nodes_with_files_by_ids(db.conn(), &missing_ids)?;

// Load edges for all affected nodes in one batch
let all_edges = get_edges_batch(db.conn(), &missing_ids)?;

// Build context strings
let mut context_updates: Vec<(i64, String)> = Vec::new();
for nwf in &nodes_with_files {
let node = &nwf.node;
let edges = all_edges.get(&node.id);
let cat = categorize_edges(edges, format_route_from_metadata);

let ctx = build_context_string(&NodeContext {
node_type: node.node_type.clone(),
name: node.name.clone(),
qualified_name: node.qualified_name.clone(),
file_path: nwf.file_path.clone(),
language: nwf.language.clone(),
signature: node.signature.clone(),
return_type: node.return_type.clone(),
param_types: node.param_types.clone(),
code_content: Some(node.code_content.clone()),
routes: cat.routes,
callees: cat.callees,
callers: cat.callers,
inherits: cat.inherits,
imports: cat.imports,
implements: cat.implements,
exports: cat.exports,
doc_comment: node.doc_comment.clone(),
});

context_updates.push((node.id, ctx));
}

// Update in DB within a transaction (avoids per-row fsync under autocommit)
if !context_updates.is_empty() {
let tx = db.conn().unchecked_transaction()?;
update_context_strings_batch(db.conn(), &context_updates)?;
tx.commit()?;

// Re-embed if model available
if let Some(m) = model {
if db.vec_enabled() {
embed_and_store_batch(db, m, &context_updates)?;
}
}
}

let count = context_updates.len();
tracing::info!("[repair] Repaired context strings for {} nodes", count);
Ok(count)
}
71 changes: 71 additions & 0 deletions src/indexer/pipeline/embed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//! Batch embedding + vector store. Wraps `EmbeddingModel::embed_batch` with
//! a per-batch DB transaction; on batch failure falls back to per-row embed
//! so a single malformed input doesn't tank the whole sweep.
//!
//! Public so `mcp::server` can call it from the background embedding thread
//! (separate from the indexer's foreground Phase 3 path).

use anyhow::Result;

use crate::embedding::model::EmbeddingModel;
use crate::storage::db::Database;
use crate::storage::queries::insert_node_vectors_batch;

/// Embed context strings using batched inference and batch-insert vectors.
/// Public so the background embedding thread in server.rs can call it.
/// Wraps vector inserts in a transaction for atomicity and performance.
pub fn embed_and_store_batch(db: &Database, model: &EmbeddingModel, context_updates: &[(i64, String)]) -> Result<()> {
if context_updates.is_empty() {
return Ok(());
}

let t0 = std::time::Instant::now();
let texts: Vec<&str> = context_updates.iter().map(|(_, ctx)| ctx.as_str()).collect();
let ids: Vec<i64> = context_updates.iter().map(|(id, _)| *id).collect();

let embeddings = match model.embed_batch(&texts) {
Ok(embs) => embs,
Err(e) => {
tracing::warn!("Batch embed failed, falling back to sequential: {}", e);
// Fallback: sequential embed
let mut embs = Vec::new();
for (i, text) in texts.iter().enumerate() {
match model.embed(text) {
Ok(emb) => embs.push(Some(emb)),
Err(e2) => {
tracing::warn!("Failed to embed node {}: {}", ids[i], e2);
embs.push(None);
}
}
}
let vectors: Vec<(i64, Vec<f32>)> = ids.iter().zip(embs)
.filter_map(|(&id, emb)| emb.map(|e| (id, e)))
.collect();
if !vectors.is_empty() {
let tx = db.conn().unchecked_transaction()?;
insert_node_vectors_batch(db.conn(), &vectors)?;
tx.commit()?;
}
tracing::info!("[embed] {} nodes (sequential fallback) in {:.1}s",
context_updates.len(), t0.elapsed().as_secs_f64());
return Ok(());
}
};

let vectors: Vec<(i64, Vec<f32>)> = ids.into_iter().zip(embeddings).collect();
let t_embed = t0.elapsed();

if !vectors.is_empty() {
let tx = db.conn().unchecked_transaction()?;
insert_node_vectors_batch(db.conn(), &vectors)?;
tx.commit()?;
}

tracing::info!("[embed] {} nodes in {:.1}s (embed {:.1}s, store {:.1}s)",
context_updates.len(),
t0.elapsed().as_secs_f64(),
t_embed.as_secs_f64(),
(t0.elapsed() - t_embed).as_secs_f64(),
);
Ok(())
}
Loading
Loading