Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .githooks/pre-commit
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,23 @@ if command -v cargo-deny >/dev/null 2>&1 || command -v cargo deny >/dev/null 2>&
}
fi

# 5. Validate workflow YAML syntax
if command -v python3 >/dev/null 2>&1; then
for f in .github/workflows/*.yml; do
if [ -f "$f" ]; then
python3 -c "import yaml; yaml.safe_load(open('$f'))" 2>/dev/null || {
echo "FAIL: $f has invalid YAML syntax"
exit 1
}
fi
done
fi

# 6. Check for stale .perf files in src
stale=$(find . -name "*.perf" -not -path "./target/*" 2>/dev/null)
if [ -n "$stale" ]; then
echo "FAIL: stale .perf files committed: $stale"
exit 1
fi

echo "[guard] All checks passed."
13 changes: 8 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ jobs:
- name: Install tools
run: |
cargo install cargo-audit cargo-deny 2>/dev/null
- name: Validate workflow YAML syntax
run: |
for f in .github/workflows/*.yml; do
python3 -c "import yaml; yaml.safe_load(open('$f'))" || { echo "FAIL: $f has invalid YAML"; exit 1; }
echo " OK: $f"
done
- name: Build check
run: cargo check --release
- name: cargo test
run: cargo test --release
- name: clippy (deny warnings) — lib only
Expand All @@ -36,11 +44,6 @@ jobs:
- name: Init smoke test
run: |
timeout 10 ./target/release/reliary-agent init </dev/null 2>&1 || true
- name: Stale file detector
run: |
stale=$(find . -name "*.bak" -o -name "*.perf" -o -name "*.old" -not -path "./target/*" 2>/dev/null)
if [ -n "$stale" ]; then echo "FAIL: stale files: $stale"; exit 1; fi
test -f .gitleaks.toml || { echo "FAIL: .gitleaks.toml missing"; exit 1; }
- name: Dependency graph diff (against master)
run: |
cargo tree -e normal --prefix none 2>/dev/null | sort > /tmp/deps_head.txt
Expand Down
177 changes: 177 additions & 0 deletions crates/reliary-agent/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::sync::{Arc, Mutex, LazyLock};
use serde_json::Value;
use std::time::Instant;

// ── Token counting (lightweight heuristic) ──

Expand Down Expand Up @@ -81,6 +82,153 @@ fn daemon_cmd_str(cmd: &str) -> String {
crate::daemon::daemon_handle_cmd_str(cmd, &get_state())
}

// ── History Compression Components ──

/// Adaptive compression policy — adjusts aggressiveness based on last output length.
#[derive(Clone)]
struct AdaptivePolicy {
last_output_len: usize,
aggressiveness: f32,
concise_turns: u32,
}

impl AdaptivePolicy {
fn new() -> Self {
Self { last_output_len: 0, aggressiveness: 0.4, concise_turns: 0 }
}

fn compute_aggressiveness(last_output_len: usize) -> f32 {
match last_output_len {
0..=500 => 0.2,
501..=1500 => 0.4,
1501..=3000 => 0.6,
_ => 0.8,
}
}

fn update(&mut self, output_len: usize) {
self.last_output_len = output_len;
let new = Self::compute_aggressiveness(output_len);
// Decay aggressiveness when LLM is concise
if output_len < 500 {
self.concise_turns += 1;
if self.concise_turns >= 2 {
self.aggressiveness = self.aggressiveness.max(0.1) - 0.1;
}
} else {
self.concise_turns = 0;
self.aggressiveness = new;
}
self.aggressiveness = self.aggressiveness.clamp(0.1, 0.9);
}
}

/// Per-auth-key state (policy + dedup cache).
struct PerKeyState {
policy: AdaptivePolicy,
dedup_cache: HashMap<u64, (String, Instant)>, // hash -> (file_path, last_seen)
}

impl PerKeyState {
fn new() -> Self {
Self { policy: AdaptivePolicy::new(), dedup_cache: HashMap::new() }
}

fn check_dedup(&mut self, content: &str, path: &str) -> Option<String> {
let mut h = std::collections::hash_map::DefaultHasher::new();
content.hash(&mut h);
let hash = h.finish();

// Evict entries older than 5 minutes
let now = Instant::now();
self.dedup_cache.retain(|_, (_, t)| now.duration_since(*t).as_secs() < 300);

if let Some((existing_path, _)) = self.dedup_cache.get(&hash) {
return Some(format!("[already seen: {} — {} chars unchanged]", existing_path, content.len()));
}

if self.dedup_cache.len() < 50 {
self.dedup_cache.insert(hash, (path.to_string(), now));
}
None
}
}

/// Global per-auth-key state store
static PER_KEY_STATE: LazyLock<Mutex<HashMap<String, PerKeyState>>> =
LazyLock::new(|| Mutex::new(HashMap::new()));

fn get_or_create_state(auth_key: &str) -> std::sync::MutexGuard<'static, HashMap<String, PerKeyState>> {
let mut guard = PER_KEY_STATE.lock().unwrap_or_else(|e| e.into_inner());
guard.entry(auth_key.to_string()).or_insert_with(PerKeyState::new);
guard
}

/// Compress old assistant reasoning — strip verbose explanations, keep structural intent.
fn compress_assistant_text(text: &str, dict: Option<&reliary_compress::CompressionDict>) -> Option<String> {
reliary_compress::compress_reasoning(text, dict)
}

/// Truncate old tool results — keep first 200 + last 50 chars.
fn truncate_tool_result(content: &str) -> String {
if content.len() <= 250 { return content.to_string(); }
let prefix = &content[..200];
let suffix = &content[content.len().saturating_sub(50)..];
format!("{} …[truncated {} chars]… {}", prefix, content.len() - 250, suffix)
}

/// Compress all messages in the conversation history.
fn compress_messages(messages: &mut Vec<Value>, state: &mut PerKeyState) -> (usize, usize) {
let total = messages.len();
let mut history_saved: usize = 0;
for i in (0..total).rev() {
let age = total - i; // 1 = most recent
let role = messages[i].get("role").and_then(|r| r.as_str()).unwrap_or("");

match role {
"assistant" if age > 2 && state.policy.aggressiveness >= 0.3 => {
// Compress old assistant reasoning
if let Some(content) = messages[i].get("content").and_then(|c| c.as_str()) {
if let Some(compressed) = compress_assistant_text(content, None) {
let saved = content.len().saturating_sub(compressed.len());
if saved > 10 {
history_saved += saved;
messages[i]["content"] = Value::String(compressed);
}
}
}
}
"tool" | "toolResult" if age > 4 => {
// Truncate old tool results
if let Some(content) = messages[i].get("content").and_then(|c| c.as_str()) {
let truncated = truncate_tool_result(content);
if truncated.len() < content.len() {
let saved = content.len().saturating_sub(truncated.len());
history_saved += saved;
messages[i]["content"] = Value::String(truncated);
}
}
}
"tool" | "toolResult" if age > 2 => {
// Dedup repeated file reads
if let Some(content) = messages[i].get("content").and_then(|c| c.as_str()) {
// Extract file path from content (heuristic: first line that looks like a path)
let path = content.lines().find(|l| l.contains(".rs") || l.contains(".py") || l.contains(".js") || l.contains(".ts"))
.unwrap_or("file");
if let Some(deduped) = state.check_dedup(content, path) {
let saved = content.len().saturating_sub(deduped.len());
history_saved += saved;
messages[i]["content"] = Value::String(deduped);
}
}
}
_ => {}
}
}

(history_saved, (state.policy.aggressiveness * 100.0) as usize)
}

// ── Health / Ping ──

async fn health() -> impl IntoResponse {
Expand Down Expand Up @@ -170,6 +318,17 @@ async fn proxy_post(
}
}

// History compression: compress old assistant reasoning + truncate old tool results
let (history_saved, aggressiveness) = {
let mut guard = get_or_create_state(&auth_key);
let state = guard.get_mut(&auth_key).unwrap();
if let Some(messages) = payload.get_mut("messages").and_then(|m| m.as_array_mut()) {
compress_messages(messages, state)
} else {
(0, 0)
}
};

// Response cache (non-streaming only)
if !is_streaming {
if let Some(messages) = payload.get("messages") {
Expand Down Expand Up @@ -220,6 +379,8 @@ async fn proxy_post(
let token_hdr_input = input_tokens.to_string();
let token_hdr_compressed = compressed_tokens.to_string();
let token_hdr_savings = token_savings.to_string();
let hdr_history_saved = history_saved.to_string();
let hdr_aggr = aggressiveness.to_string();

match req_builder.send().await {
Ok(upstream_resp) => {
Expand All @@ -238,16 +399,32 @@ async fn proxy_post(
resp.headers_mut().insert("x-reliaty-input-tokens", header::HeaderValue::from_str(&token_hdr_input).unwrap());
resp.headers_mut().insert("x-reliaty-compressed-tokens", header::HeaderValue::from_str(&token_hdr_compressed).unwrap());
resp.headers_mut().insert("x-reliaty-savings-pct", header::HeaderValue::from_str(&token_hdr_savings).unwrap());
resp.headers_mut().insert("x-reliaty-history-saved", header::HeaderValue::from_str(&hdr_history_saved).unwrap());
resp.headers_mut().insert("x-reliaty-aggressiveness", header::HeaderValue::from_str(&hdr_aggr).unwrap());
// Update adaptive policy
if let Ok(mut guard) = PER_KEY_STATE.lock() {
if let Some(st) = guard.get_mut(&auth_key) {
st.policy.update(body_bytes.len());
}
}
resp.into_response()
} else {
match upstream_resp.bytes().await {
Ok(bytes) => {
let body_str = String::from_utf8_lossy(&bytes).to_string();
store_response(&auth_key, &String::from_utf8_lossy(&body_bytes), &body_str);
// Update adaptive policy with output length
if let Ok(mut guard) = PER_KEY_STATE.lock() {
if let Some(st) = guard.get_mut(&auth_key) {
st.policy.update(body_str.len());
}
}
let mut resp = (StatusCode::OK, [("content-type", "application/json")], body_str).into_response();
resp.headers_mut().insert("x-reliaty-input-tokens", header::HeaderValue::from_str(&token_hdr_input).unwrap());
resp.headers_mut().insert("x-reliaty-compressed-tokens", header::HeaderValue::from_str(&token_hdr_compressed).unwrap());
resp.headers_mut().insert("x-reliaty-savings-pct", header::HeaderValue::from_str(&token_hdr_savings).unwrap());
resp.headers_mut().insert("x-reliaty-history-saved", header::HeaderValue::from_str(&hdr_history_saved).unwrap());
resp.headers_mut().insert("x-reliaty-aggressiveness", header::HeaderValue::from_str(&hdr_aggr).unwrap());
resp
}
Err(_) => (StatusCode::BAD_GATEWAY, "empty upstream response").into_response(),
Expand Down
Loading