Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 252 additions & 3 deletions crates/omnigraph-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,141 @@ struct StateConfig {
struct GraphConfig {
schema: PathBuf,
#[serde(default)]
queries: BTreeMap<String, QueryConfig>,
queries: QueriesDecl,
}

/// How a graph declares its stored queries. Terraform-style: the `.gq`
/// files ARE the declaration — point at them (or a directory) and every
/// `query <name>` they contain is discovered. The explicit name->file map
/// remains for fine-grained control.
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
enum QueriesDecl {
/// `queries: ./queries/` — a directory (top-level `*.gq`, sorted) or a
/// single `.gq` file; every declaration inside is registered.
Discover(PathBuf),
/// `queries: [./queries/, ./extra.gq]` — several directories/files.
DiscoverMany(Vec<PathBuf>),
/// `queries: { name: { file: ... } }` — explicit registry.
Explicit(BTreeMap<String, QueryConfig>),
}

impl Default for QueriesDecl {
fn default() -> Self {
QueriesDecl::Explicit(BTreeMap::new())
}
}

/// Expand a graph's query declaration into the canonical name->file map.
/// Discovery reads and parses each `.gq`; unreadable or unparseable files
/// and duplicate query names are loud validation errors — a declaration the
/// tool cannot enumerate is broken, not partially usable.
fn resolve_query_decls(
config_dir: &Path,
graph_id: &str,
decl: &QueriesDecl,
diagnostics: &mut Vec<Diagnostic>,
) -> (BTreeMap<String, QueryConfig>, BTreeMap<PathBuf, String>) {
let paths: Vec<PathBuf> = match decl {
QueriesDecl::Explicit(map) => {
return (
map.iter()
.map(|(name, config)| {
(name.clone(), QueryConfig { file: config.file.clone() })
})
.collect(),
BTreeMap::new(),
);
}
QueriesDecl::Discover(path) => vec![path.clone()],
QueriesDecl::DiscoverMany(paths) => paths.clone(),
};

let mut files: Vec<(PathBuf, PathBuf)> = Vec::new(); // (declared-relative, resolved)
for declared in &paths {
let resolved = resolve_config_path(config_dir, declared);
if resolved.is_dir() {
let mut entries: Vec<PathBuf> = match fs::read_dir(&resolved) {
Ok(read) => read
.flatten()
.map(|entry| entry.path())
.filter(|path| path.extension().is_some_and(|ext| ext == "gq"))
.collect(),
Err(err) => {
diagnostics.push(Diagnostic::error(
"query_dir_unreadable",
format!("graphs.{graph_id}.queries"),
format!("could not list query directory '{}': {err}", resolved.display()),
));
continue;
}
};
entries.sort();
if entries.is_empty() {
diagnostics.push(Diagnostic::warning(
"query_dir_empty",
format!("graphs.{graph_id}.queries"),
format!("query directory '{}' contains no .gq files", resolved.display()),
));
}
for path in entries {
let relative = declared.join(path.file_name().expect("dir entries have names"));
files.push((relative, path));
}
} else {
files.push((declared.clone(), resolved));
}
}

let mut registry: BTreeMap<String, QueryConfig> = BTreeMap::new();
let mut origin: BTreeMap<String, PathBuf> = BTreeMap::new();
// Content read once at discovery and handed to the caller — the per-query
// digest/typecheck pass reuses it instead of re-reading (no N+1 reads, no
// window for the file to change between enumeration and validation).
let mut contents: BTreeMap<PathBuf, String> = BTreeMap::new();
for (declared, resolved) in files {
let source = match fs::read_to_string(&resolved) {
Ok(source) => source,
Err(err) => {
diagnostics.push(Diagnostic::error(
"query_file_missing",
format!("graphs.{graph_id}.queries"),
format!("could not read query file '{}': {err}", resolved.display()),
));
continue;
}
};
let parsed = match parse_query(&source) {
Ok(parsed) => parsed,
Err(err) => {
diagnostics.push(Diagnostic::error(
"query_parse_error",
format!("graphs.{graph_id}.queries"),
format!("'{}' does not parse: {err}", resolved.display()),
));
continue;
}
};
for query_decl in &parsed.queries {
let name = query_decl.name.clone();
if let Some(previous) = origin.get(&name) {
diagnostics.push(Diagnostic::error(
"duplicate_query_name",
format!("graphs.{graph_id}.queries.{name}"),
format!(
"query '{name}' is declared in both '{}' and '{}'",
previous.display(),
declared.display()
),
));
continue;
}
origin.insert(name.clone(), declared.clone());
registry.insert(name, QueryConfig { file: declared.clone() });
}
contents.insert(declared, source);
}
(registry, contents)
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -3600,7 +3734,9 @@ fn load_desired(config_dir: &Path) -> LoadOutcome {
}
});

for (query_name, query) in &graph.queries {
let (graph_queries, query_contents) =
resolve_query_decls(&config_dir, graph_id, &graph.queries, &mut diagnostics);
for (query_name, query) in &graph_queries {
validate_id(
"query name",
&format!("graphs.{graph_id}.queries.{query_name}"),
Expand All @@ -3618,7 +3754,11 @@ fn load_desired(config_dir: &Path) -> LoadOutcome {
});

let query_path = resolve_config_path(&config_dir, &query.file);
match fs::read_to_string(&query_path) {
let source = match query_contents.get(&query.file) {
Some(cached) => Ok(cached.clone()),
None => fs::read_to_string(&query_path),
};
match source {
Ok(source) => {
let digest = sha256_hex(source.as_bytes());
graph_query_digests
Expand Down Expand Up @@ -7560,6 +7700,115 @@ policies:
);
}

// ---- query discovery (Terraform-style declaration) ----

#[test]
fn queries_directory_discovers_every_declaration() {
let dir = tempfile::tempdir().unwrap();
fs::write(dir.path().join("people.pg"), "\nnode Person {\n name: String @key\n}\n").unwrap();
fs::create_dir(dir.path().join("queries")).unwrap();
fs::write(
dir.path().join("queries/people.gq"),
"\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n\nquery all_people() {\n match { $p: Person }\n return { $p.name }\n}\n",
)
.unwrap();
fs::write(
dir.path().join("queries/extra.gq"),
"\nquery count_people() {\n match { $p: Person }\n return { count($p) }\n}\n",
)
.unwrap();
fs::write(dir.path().join("queries/notes.txt"), "ignored").unwrap();
fs::write(
dir.path().join("cluster.yaml"),
"version: 1\ngraphs:\n knowledge:\n schema: ./people.pg\n queries: ./queries/\n",
)
.unwrap();

let out = validate_config_dir(dir.path());
assert!(out.ok, "{:?}", out.diagnostics);
let names: Vec<&str> = out
.resource_digests
.keys()
.filter_map(|address| address.strip_prefix("query.knowledge."))
.collect();
assert_eq!(names, vec!["all_people", "count_people", "find_person"]);
}

#[test]
fn queries_list_and_single_file_forms_discover() {
let dir = tempfile::tempdir().unwrap();
fs::write(dir.path().join("people.pg"), "\nnode Person {\n name: String @key\n}\n").unwrap();
fs::write(
dir.path().join("a.gq"),
"\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n",
)
.unwrap();
fs::write(
dir.path().join("b.gq"),
"\nquery all_people() {\n match { $p: Person }\n return { $p.name }\n}\n",
)
.unwrap();
fs::write(
dir.path().join("cluster.yaml"),
"version: 1\ngraphs:\n knowledge:\n schema: ./people.pg\n queries: [./a.gq, ./b.gq]\n",
)
.unwrap();
let out = validate_config_dir(dir.path());
assert!(out.ok, "{:?}", out.diagnostics);
assert!(out.resource_digests.contains_key("query.knowledge.find_person"));
assert!(out.resource_digests.contains_key("query.knowledge.all_people"));

// Single-file string form
fs::write(
dir.path().join("cluster.yaml"),
"version: 1\ngraphs:\n knowledge:\n schema: ./people.pg\n queries: ./a.gq\n",
)
.unwrap();
let out = validate_config_dir(dir.path());
assert!(out.ok, "{:?}", out.diagnostics);
assert!(out.resource_digests.contains_key("query.knowledge.find_person"));
assert!(!out.resource_digests.contains_key("query.knowledge.all_people"));
}

#[test]
fn query_discovery_rejects_duplicates_and_parse_errors() {
let dir = tempfile::tempdir().unwrap();
fs::write(dir.path().join("people.pg"), "\nnode Person {\n name: String @key\n}\n").unwrap();
let decl = "\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n";
fs::write(dir.path().join("a.gq"), decl).unwrap();
fs::write(dir.path().join("b.gq"), decl).unwrap();
fs::write(
dir.path().join("cluster.yaml"),
"version: 1\ngraphs:\n knowledge:\n schema: ./people.pg\n queries: [./a.gq, ./b.gq]\n",
)
.unwrap();
let out = validate_config_dir(dir.path());
assert!(!out.ok);
assert!(
out.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "duplicate_query_name"),
"{:?}",
out.diagnostics
);

fs::write(dir.path().join("broken.gq"), "query {{{ nope").unwrap();
fs::write(
dir.path().join("cluster.yaml"),
"version: 1\ngraphs:\n knowledge:\n schema: ./people.pg\n queries: ./broken.gq\n",
)
.unwrap();
let out = validate_config_dir(dir.path());
assert!(!out.ok);
assert!(
out.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "query_parse_error"),
"{:?}",
out.diagnostics
);
}

#[test]
fn status_warns_on_pending_recovery_sidecar() {
let dir = fixture();
Expand Down
20 changes: 10 additions & 10 deletions docs/user/cli-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ cli:
query:
roots: [<dir>, …] # search path for .gq files
auth:
env_file: ./.env.omni
env_file: .env.omni
aliases:
<alias>:
# accepted values: `read` / `query` (read alias), `change` / `mutate`
Expand All @@ -70,20 +70,20 @@ aliases:
queries: # top-level registry — applies only to a bare-URI (anonymous) graph; a graph served by name uses its `graphs.<id>.queries`. Mirrors top-level `policy`.
<query-name>: { file: <path-to-.gq> } # mcp.expose defaults to true
policy:
file: ./policy.yaml
file: policy.yaml
```

## Cluster config preview

```bash
omnigraph cluster validate --config ./company-brain
omnigraph cluster plan --config ./company-brain --json
omnigraph cluster apply --config ./company-brain --json
omnigraph cluster approve graph.<id> --config ./company-brain --as <actor>
omnigraph cluster status --config ./company-brain --json
omnigraph cluster refresh --config ./company-brain --json
omnigraph cluster import --config ./company-brain --json
omnigraph cluster force-unlock <LOCK_ID> --config ./company-brain --json
omnigraph cluster validate --config company-brain
omnigraph cluster plan --config company-brain --json
omnigraph cluster apply --config company-brain --json
omnigraph cluster approve graph.<id> --config company-brain --as <actor>
omnigraph cluster status --config company-brain --json
omnigraph cluster refresh --config company-brain --json
omnigraph cluster import --config company-brain --json
omnigraph cluster force-unlock <LOCK_ID> --config company-brain --json
```

`--config` is a directory containing `cluster.yaml`; it defaults to `.`.
Expand Down
Loading
Loading