diff --git a/scripts/paperclip-hermes-bridge.py b/scripts/paperclip-hermes-bridge.py new file mode 100755 index 00000000000..e98744ee1a0 --- /dev/null +++ b/scripts/paperclip-hermes-bridge.py @@ -0,0 +1,641 @@ +#!/usr/bin/env python3 +""" +Hermes <-> Paperclip Seamless Bridge +Usage: python3 paperclip-hermes-bridge.py [args] + +Commands: + list-agents List all agents and their status + agent-status Get detailed status of a specific agent + assign Create and assign an issue to an agent + issues [--status STATUS] List issues (optionally filter by status) + issue-status Get detailed issue status + update-issue Update issue status (todo, in_progress, done, etc.) + message Send a message/comment to an agent's current issue + poll Poll an issue until it's done (blocks) + run Full cycle: create issue, assign, poll until done, return result + broadcast Create an unassigned issue for the whole team + team-status Quick health check of all agents + preview Detect project type in workspace and start dev server, return URL + serve Start a dev server for a local project and return the URL + share Generate a shareable preview URL for an issue's workspace +""" +import sys, json, urllib.request, urllib.error, time, argparse, os, subprocess, socket, re + +BASE = "http://localhost:3100/api" +COMPANY = "39909fad-4bf8-4302-8093-d3bf8235a881" + +# Project type detection patterns +PROJECT_TYPES = { + "vite": { + "files": ["vite.config.ts", "vite.config.js", "vite.config.mjs"], + "commands": ["npm run dev", "pnpm dev", "yarn dev"], + "default_port": 5173, + "url_path": "/", + }, + "nextjs": { + "files": ["next.config.ts", "next.config.js", "next.config.mjs"], + "commands": ["npm run dev", "pnpm dev", "yarn dev"], + "default_port": 3000, + "url_path": "/", + }, + "tauri": { + "files": ["src-tauri/tauri.conf.json", "tauri.conf.json"], + "commands": ["npm run tauri dev", "pnpm tauri dev", "yarn tauri dev"], + "default_port": 1420, + "url_path": "/", + }, + "react-scripts": { + "files": [], # Detected by package.json scripts + "commands": ["npm start", "pnpm start", "yarn start"], + "default_port": 3000, + "url_path": "/", + }, + "static": { + "files": ["index.html"], + "commands": ["python3 -m http.server", "npx serve .", "python -m http.server"], + "default_port": 8000, + "url_path": "/", + }, +} + +def api(method, path, data=None): + url = f"{BASE}{path}" + req = urllib.request.Request(url, method=method) + if data: + req.add_header("Content-Type", "application/json") + req.data = json.dumps(data).encode() + try: + with urllib.request.urlopen(req, timeout=30) as r: + return json.loads(r.read().decode()) + except urllib.error.HTTPError as e: + return {"error": e.read().decode(), "status": e.code} + except Exception as e: + return {"error": str(e)} + +def get_agents(): + result = api("GET", f"/companies/{COMPANY}/agents") + if isinstance(result, list): + return result + if isinstance(result, dict) and "error" in result: + print(f"Error fetching agents: {result['error']}") + return [] + return result if isinstance(result, list) else [] + +def get_agent_by_name(name): + agents = get_agents() + name_lower = name.lower() + for a in agents: + if isinstance(a, dict) and (a.get('name', '').lower() == name_lower or a.get('role', '').lower() == name_lower): + return a + # Fuzzy match + for a in agents: + if isinstance(a, dict) and (name_lower in a.get('name', '').lower() or name_lower in a.get('role', '').lower()): + return a + return None + +def list_agents(): + agents = get_agents() + print(f"{'Agent Name':<30} {'Role':<15} {'Status':<10} {'Adapter':<15} {'Last Heartbeat'}") + print("-" * 90) + for a in agents: + if not isinstance(a, dict): + continue + hb = a.get('lastHeartbeatAt', 'never')[:19] if a.get('lastHeartbeatAt') else 'never' + print(f"{a.get('name','?'):<30} {a.get('role','?'):<15} {a.get('status','?'):<10} {a.get('adapterType','?'):<15} {hb}") + +def agent_status(name): + agent = get_agent_by_name(name) + if not agent: + print(f"Agent '{name}' not found") + return + print(json.dumps(agent, indent=2)) + +def assign(agent_name, task, title=None): + agent = get_agent_by_name(agent_name) + if not agent: + print(f"Agent '{agent_name}' not found. Available agents:") + list_agents() + return None + issue_title = title or task[:80] + issue = api("POST", f"/companies/{COMPANY}/issues", { + "title": issue_title, + "description": task, + "status": "todo", + "assigneeAgentId": agent['id'] + }) + if 'error' in issue: + print(f"Error creating issue: {issue['error']}") + return None + print(f"CREATED | Issue: {issue['id']} | Assigned to: {agent['name']} | Status: {issue['status']}") + return issue + +def list_issues(status=None): + path = f"/companies/{COMPANY}/issues" + if status: + path += f"?status={status}" + items = api("GET", path) + if not isinstance(items, list): + print(f"Error: {items}") + return + print(f"{'Issue ID':<36} {'Status':<12} {'Assignee':<20} {'Title'}") + print("-" * 110) + for i in items: + if not isinstance(i, dict): + continue + assignee = i.get('assigneeAgentId', 'unassigned')[:18] if i.get('assigneeAgentId') else 'unassigned' + print(f"{i.get('id','?'):<36} {i.get('status','?'):<12} {assignee:<20} {i.get('title','')[:50]}") + +def issue_status(issue_id): + # Try /api/issues/{id} first (Paperclip shortcut route) + issue = api("GET", f"/issues/{issue_id}") + if 'error' in issue: + # Fallback to company-scoped route + issue = api("GET", f"/companies/{COMPANY}/issues/{issue_id}") + if 'error' in issue: + print(f"Error: {issue['error']}") + return + print(json.dumps(issue, indent=2)) + +def update_issue(issue_id, status): + valid = ['backlog', 'todo', 'in_progress', 'in_review', 'done', 'blocked', 'cancelled'] + if status not in valid: + print(f"Invalid status. Use: {', '.join(valid)}") + return + result = api("PUT", f"/issues/{issue_id}", {"status": status}) + if 'error' in result: + print(f"Error: {result['error']}") + else: + print(f"Updated {issue_id} -> {status}") + +def message_agent(agent_name, msg): + agent = get_agent_by_name(agent_name) + if not agent: + print(f"Agent '{agent_name}' not found") + return + # Add comment to agent's most recent issue + issues = api("GET", f"/companies/{COMPANY}/issues") + if not isinstance(issues, list): + print(f"Error fetching issues: {issues}") + return + agent_issues = [i for i in issues if isinstance(i, dict) and i.get('assigneeAgentId') == agent['id']] + if not agent_issues: + print(f"No active issues for {agent_name}. Creating one...") + issue = assign(agent_name, msg) + return + latest = sorted(agent_issues, key=lambda x: x.get('updatedAt', ''), reverse=True)[0] + comment = api("POST", f"/companies/{COMPANY}/issues/{latest['id']}/comments", { + "content": f"[Hermes] {msg}" + }) + print(f"Messaged {agent_name} on issue {latest['id']}") + +def poll_issue(issue_id, interval=10, timeout=3600): + start = time.time() + print(f"Polling issue {issue_id}...") + while time.time() - start < timeout: + issue = api("GET", f"/issues/{issue_id}") + if 'error' in issue: + issue = api("GET", f"/companies/{COMPANY}/issues/{issue_id}") + if 'error' in issue: + print(f"Error: {issue['error']}") + return None + status = issue.get('status', 'unknown') + print(f" [{time.strftime('%H:%M:%S')}] Status: {status}") + if status in ['done', 'cancelled', 'blocked']: + print(f"\nFinal result:\n{issue.get('description', 'No description')}") + return issue + time.sleep(interval) + print("Timeout reached") + return None + +def run_task(agent_name, task, title=None): + issue = assign(agent_name, task, title) + if not issue: + return + return poll_issue(issue['id']) + +def broadcast(task): + issue = api("POST", f"/companies/{COMPANY}/issues", { + "title": task[:80], + "description": task, + "status": "todo" + }) + if 'error' in issue: + print(f"Error: {issue['error']}") + else: + print(f"Broadcast issue {issue['id']} created. Team can self-assign.") + +def team_status(): + agents = get_agents() + issues = api("GET", f"/companies/{COMPANY}/issues") + if not isinstance(issues, list): + print(f"Error fetching issues: {issues}") + return + active_issues = [i for i in issues if isinstance(i, dict) and i.get('status') not in ['done', 'cancelled']] + print(f"Team: OpenScanAi") + print(f"Agents: {len(agents)} | Active Issues: {len(active_issues)}") + print(f"\nAgent Health:") + for a in agents: + if not isinstance(a, dict): + continue + status_icon = "āœ…" if a.get('status') == 'idle' else "🟔" if a.get('status') == 'active' else "šŸ”“" + agent_issues = [i for i in active_issues if isinstance(i, dict) and i.get('assigneeAgentId') == a.get('id')] + print(f" {status_icon} {a.get('name','?'):<25} {a.get('status','?'):<10} ({len(agent_issues)} active issues)") + +# --------------------------------------------------------------------------- +# Preview / Dev Server functionality +# --------------------------------------------------------------------------- + +def find_free_port(start_port=8000, max_port=9000): + """Find a free port starting from start_port.""" + for port in range(start_port, max_port + 1): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + try: + s.bind(('127.0.0.1', port)) + return port + except OSError: + continue + return None + +def detect_project_type(project_path): + """Detect the type of project based on files present.""" + if not os.path.isdir(project_path): + return None + + # Check for package.json first + package_json_path = os.path.join(project_path, "package.json") + has_package_json = os.path.exists(package_json_path) + + # Check each project type + for proj_type, config in PROJECT_TYPES.items(): + # Check specific config files + for file in config["files"]: + if os.path.exists(os.path.join(project_path, file)): + return proj_type + + # For react-scripts, check package.json scripts + if proj_type == "react-scripts" and has_package_json: + try: + with open(package_json_path, 'r') as f: + pkg = json.load(f) + scripts = pkg.get("scripts", {}) + deps = {**pkg.get("dependencies", {}), **pkg.get("devDependencies", {})} + if "react-scripts" in deps or "start" in scripts: + # Distinguish from Next.js/Vite which also have react + if "next" not in deps and "vite" not in deps: + return "react-scripts" + except: + pass + + # Check for static HTML + if os.path.exists(os.path.join(project_path, "index.html")): + return "static" + + # Check for package.json with dev script as fallback + if has_package_json: + try: + with open(package_json_path, 'r') as f: + pkg = json.load(f) + scripts = pkg.get("scripts", {}) + if "dev" in scripts: + # Try to infer from dependencies + deps = {**pkg.get("dependencies", {}), **pkg.get("devDependencies", {})} + if "vite" in deps: + return "vite" + if "next" in deps: + return "nextjs" + return "vite" # Generic fallback + if "start" in scripts: + return "react-scripts" + except: + pass + + return None + +def get_dev_command(project_type, project_path, preferred_port=None): + """Get the appropriate dev command for the project type.""" + config = PROJECT_TYPES.get(project_type) + if not config: + return None, None + + # Check which package manager is available + package_json_path = os.path.join(project_path, "package.json") + has_npm = os.path.exists(os.path.join(project_path, "package-lock.json")) + has_pnpm = os.path.exists(os.path.join(project_path, "pnpm-lock.yaml")) + has_yarn = os.path.exists(os.path.join(project_path, "yarn.lock")) + + # Determine package manager + if has_pnpm: + pm = "pnpm" + elif has_yarn: + pm = "yarn" + else: + pm = "npm" + + # Build command + if project_type == "vite": + port = preferred_port or config["default_port"] + if pm == "npm": + return f"npm run dev -- --port {port}", port + else: + return f"{pm} dev --port {port}", port + elif project_type == "nextjs": + port = preferred_port or config["default_port"] + if pm == "npm": + return f"npm run dev -- --port {port}", port + else: + return f"{pm} dev --port {port}", port + elif project_type == "tauri": + port = preferred_port or config["default_port"] + # Tauri uses its own port config, but we can try + return f"{pm} tauri dev", port + elif project_type == "react-scripts": + port = preferred_port or config["default_port"] + if pm == "npm": + return f"PORT={port} npm start", port + else: + return f"PORT={port} {pm} start", port + elif project_type == "static": + port = preferred_port or config["default_port"] + return f"python3 -m http.server {port}", port + + return None, None + +def start_dev_server(project_path, project_type=None, preferred_port=None): + """Start a dev server for the project and return the URL.""" + if not project_type: + project_type = detect_project_type(project_path) + + if not project_type: + print(f"Could not detect project type in {project_path}") + return None + + print(f"Detected project type: {project_type}") + + # Find a free port + port = preferred_port or find_free_port(PROJECT_TYPES[project_type]["default_port"]) + if not port: + print("Could not find a free port") + return None + + # Check if port is already in use + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + if s.connect_ex(('127.0.0.1', port)) == 0: + print(f"Port {port} is already in use. Trying to find another...") + port = find_free_port(port + 1) + if not port: + print("Could not find a free port") + return None + + command, actual_port = get_dev_command(project_type, project_path, port) + if not command: + print(f"Could not determine dev command for {project_type}") + return None + + print(f"Starting dev server: {command}") + print(f"Working directory: {project_path}") + + # Start the server in background + try: + process = subprocess.Popen( + command, + shell=True, + cwd=project_path, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + + # Wait a bit for server to start + time.sleep(3) + + # Check if process is still running + if process.poll() is not None: + stdout, stderr = process.communicate() + print(f"Server failed to start:") + if stderr: + print(stderr) + if stdout: + print(stdout) + return None + + # Try to connect to verify it's running + max_retries = 10 + for i in range(max_retries): + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.settimeout(2) + s.connect(('127.0.0.1', actual_port)) + url = f"http://localhost:{actual_port}" + print(f"āœ… Server is running at {url}") + return { + "url": url, + "port": actual_port, + "process": process, + "project_type": project_type, + "project_path": project_path + } + except: + time.sleep(1) + + print("Server started but could not verify it's responding") + url = f"http://localhost:{actual_port}" + return { + "url": url, + "port": actual_port, + "process": process, + "project_type": project_type, + "project_path": project_path + } + + except Exception as e: + print(f"Error starting server: {e}") + return None + +def preview_issue(issue_id): + """Get the workspace for an issue, detect project type, and start dev server.""" + # Get issue details + issue = api("GET", f"/issues/{issue_id}") + if 'error' in issue: + issue = api("GET", f"/companies/{COMPANY}/issues/{issue_id}") + if 'error' in issue: + print(f"Error fetching issue: {issue['error']}") + return None + + # Get execution workspace + workspace_id = issue.get('executionWorkspaceId') + if not workspace_id: + print(f"Issue {issue_id} has no execution workspace") + return None + + workspace = api("GET", f"/execution-workspaces/{workspace_id}") + if 'error' in workspace: + print(f"Error fetching workspace: {workspace['error']}") + return None + + cwd = workspace.get('cwd') + if not cwd or not os.path.isdir(cwd): + print(f"Workspace CWD not found: {cwd}") + return None + + print(f"Workspace path: {cwd}") + + # Detect and start + result = start_dev_server(cwd) + if result: + # Add a comment to the issue with the preview URL + comment_text = f"šŸš€ **Preview URL**: {result['url']}\n\nProject type: {result['project_type']}\nStarted at: {time.strftime('%Y-%m-%d %H:%M:%S')}" + comment = api("POST", f"/companies/{COMPANY}/issues/{issue_id}/comments", { + "content": comment_text + }) + if 'error' not in comment: + print(f"Added preview URL to issue {issue_id}") + + return result + +def serve_project(project_path): + """Start a dev server for a local project path.""" + if not os.path.isdir(project_path): + print(f"Path not found: {project_path}") + return None + + result = start_dev_server(project_path) + if result: + print(f"\nPreview URL: {result['url']}") + print(f"Project: {result['project_path']}") + print(f"Type: {result['project_type']}") + print(f"PID: {result['process'].pid}") + print("\nPress Ctrl+C to stop the server") + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nStopping server...") + result['process'].terminate() + result['process'].wait(timeout=5) + print("Server stopped") + return result + +def generate_shareable_url(issue_id): + """Generate a shareable preview URL for an issue's workspace.""" + # Get issue details + issue = api("GET", f"/issues/{issue_id}") + if 'error' in issue: + issue = api("GET", f"/companies/{COMPANY}/issues/{issue_id}") + if 'error' in issue: + print(f"Error fetching issue: {issue['error']}") + return None + + # Get execution workspace + workspace_id = issue.get('executionWorkspaceId') + if not workspace_id: + print(f"Issue {issue_id} has no execution workspace") + return None + + workspace = api("GET", f"/execution-workspaces/{workspace_id}") + if 'error' in workspace: + print(f"Error fetching workspace: {workspace['error']}") + return None + + cwd = workspace.get('cwd') + if not cwd or not os.path.isdir(cwd): + print(f"Workspace CWD not found: {cwd}") + return None + + # Detect project type + project_type = detect_project_type(cwd) + if not project_type: + print(f"Could not detect project type in {cwd}") + return None + + # Find a free port + port = find_free_port(PROJECT_TYPES[project_type]["default_port"]) + if not port: + print("Could not find a free port") + return None + + # Generate URLs + local_url = f"http://localhost:{port}" + + # Try to get the public URL from environment or config + public_url = os.environ.get('PAPERCLIP_PUBLIC_URL', '') + if not public_url: + # Try to get from instance settings + settings = api("GET", "/instance-settings") + if isinstance(settings, dict): + public_url = settings.get('publicBaseUrl', '') + + # Generate shareable link + share_url = f"{public_url}/preview/{issue_id}" if public_url else local_url + + # Build result + result = { + "issue_id": issue_id, + "project_type": project_type, + "workspace_path": cwd, + "local_url": local_url, + "share_url": share_url, + "port": port, + "agent_name": issue.get('assigneeAgentId', 'unassigned'), + "issue_title": issue.get('title', 'Untitled'), + } + + # Print to chat/terminal + print("\n" + "="*60) + print("šŸ”— SHAREABLE PREVIEW LINK") + print("="*60) + print(f"Issue: {result['issue_title']}") + print(f"Agent: {result['agent_name']}") + print(f"Project: {result['project_type']}") + print(f"\nšŸ“Ž Shareable URL: {result['share_url']}") + print(f"šŸ–„ļø Local URL: {result['local_url']}") + print("="*60) + + # Also post as comment to the issue + comment_text = f"šŸ”— **Shareable Preview Link**\n\n- **URL**: {result['share_url']}\n- **Local**: {result['local_url']}\n- **Project Type**: {result['project_type']}\n\n_Generated by Hermes Bridge_" + comment = api("POST", f"/companies/{COMPANY}/issues/{issue_id}/comments", { + "content": comment_text + }) + if 'error' not in comment: + print(f"\nāœ… Posted preview link to issue {issue_id}") + + return result + +if __name__ == "__main__": + if len(sys.argv) < 2: + print(__doc__) + sys.exit(1) + + cmd = sys.argv[1] + + if cmd == "list-agents": + list_agents() + elif cmd == "agent-status" and len(sys.argv) >= 3: + agent_status(sys.argv[2]) + elif cmd == "assign" and len(sys.argv) >= 4: + assign(sys.argv[2], " ".join(sys.argv[3:])) + elif cmd == "issues": + status = sys.argv[3] if len(sys.argv) > 3 and sys.argv[2] == "--status" else None + list_issues(status) + elif cmd == "issue-status" and len(sys.argv) >= 3: + issue_status(sys.argv[2]) + elif cmd == "update-issue" and len(sys.argv) >= 4: + update_issue(sys.argv[2], sys.argv[3]) + elif cmd == "message" and len(sys.argv) >= 4: + message_agent(sys.argv[2], " ".join(sys.argv[3:])) + elif cmd == "poll" and len(sys.argv) >= 3: + poll_issue(sys.argv[2]) + elif cmd == "run" and len(sys.argv) >= 4: + run_task(sys.argv[2], " ".join(sys.argv[3:])) + elif cmd == "broadcast" and len(sys.argv) >= 3: + broadcast(" ".join(sys.argv[2:])) + elif cmd == "team-status": + team_status() + elif cmd == "preview" and len(sys.argv) >= 3: + preview_issue(sys.argv[2]) + elif cmd == "serve" and len(sys.argv) >= 3: + serve_project(sys.argv[2]) + elif cmd == "share" and len(sys.argv) >= 3: + generate_shareable_url(sys.argv[2]) + else: + print(f"Unknown command: {cmd}") + print(__doc__) diff --git a/scripts/tests/.gitignore b/scripts/tests/.gitignore new file mode 100644 index 00000000000..c18dd8d83ce --- /dev/null +++ b/scripts/tests/.gitignore @@ -0,0 +1 @@ +__pycache__/ diff --git a/scripts/tests/__pycache__/test_paperclip_hermes_bridge.cpython-314-pytest-9.0.3.pyc b/scripts/tests/__pycache__/test_paperclip_hermes_bridge.cpython-314-pytest-9.0.3.pyc new file mode 100644 index 00000000000..6c9b7a2c054 Binary files /dev/null and b/scripts/tests/__pycache__/test_paperclip_hermes_bridge.cpython-314-pytest-9.0.3.pyc differ diff --git a/scripts/tests/__pycache__/test_paperclip_hermes_bridge.cpython-314.pyc b/scripts/tests/__pycache__/test_paperclip_hermes_bridge.cpython-314.pyc new file mode 100644 index 00000000000..6ec99eaac4d Binary files /dev/null and b/scripts/tests/__pycache__/test_paperclip_hermes_bridge.cpython-314.pyc differ diff --git a/scripts/tests/test_paperclip_hermes_bridge.py b/scripts/tests/test_paperclip_hermes_bridge.py new file mode 100644 index 00000000000..3552a08d16c --- /dev/null +++ b/scripts/tests/test_paperclip_hermes_bridge.py @@ -0,0 +1,377 @@ +#!/usr/bin/env python3 +""" +Tests for paperclip-hermes-bridge.py +Run with: python3 -m pytest scripts/tests/test_paperclip_hermes_bridge.py -v +""" +import sys +import os +import json +import tempfile +import socket +from pathlib import Path +from unittest.mock import patch, MagicMock, mock_open + +# Add parent directory to path +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +# Import with proper module name +import importlib.util +spec = importlib.util.spec_from_file_location( + "paperclip_hermes_bridge", + os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "paperclip-hermes-bridge.py") +) +bridge = importlib.util.module_from_spec(spec) +spec.loader.exec_module(bridge) + + +class TestFindFreePort: + """Tests for find_free_port function.""" + + def test_finds_free_port(self): + """Should find a free port in the given range.""" + port = bridge.find_free_port(start_port=18000, max_port=18010) + assert port is not None + assert 18000 <= port <= 18010 + + def test_port_is_actually_free(self): + """Found port should be bindable.""" + port = bridge.find_free_port(start_port=18000, max_port=18010) + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('127.0.0.1', port)) + + def test_returns_none_when_no_ports_available(self): + """Should return None when all ports in range are taken.""" + # Bind all ports in a small range + sockets = [] + try: + for p in range(19000, 19005): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind(('127.0.0.1', p)) + sockets.append(s) + + result = bridge.find_free_port(start_port=19000, max_port=19004) + assert result is None + finally: + for s in sockets: + s.close() + + +class TestDetectProjectType: + """Tests for detect_project_type function.""" + + def test_detects_vite(self, tmp_path): + """Should detect Vite project from vite.config.ts.""" + (tmp_path / "vite.config.ts").write_text("export default {}") + assert bridge.detect_project_type(str(tmp_path)) == "vite" + + def test_detects_vite_js(self, tmp_path): + """Should detect Vite project from vite.config.js.""" + (tmp_path / "vite.config.js").write_text("export default {}") + assert bridge.detect_project_type(str(tmp_path)) == "vite" + + def test_detects_nextjs(self, tmp_path): + """Should detect Next.js project from next.config.js.""" + (tmp_path / "next.config.js").write_text("module.exports = {}") + assert bridge.detect_project_type(str(tmp_path)) == "nextjs" + + def test_detects_tauri(self, tmp_path): + """Should detect Tauri project from tauri.conf.json.""" + (tmp_path / "src-tauri").mkdir() + (tmp_path / "src-tauri" / "tauri.conf.json").write_text("{}") + assert bridge.detect_project_type(str(tmp_path)) == "tauri" + + def test_detects_static_html(self, tmp_path): + """Should detect static project from index.html.""" + (tmp_path / "index.html").write_text("") + assert bridge.detect_project_type(str(tmp_path)) == "static" + + def test_detects_react_scripts(self, tmp_path): + """Should detect Create React App from package.json with react-scripts.""" + pkg = { + "dependencies": {"react-scripts": "5.0.1"}, + "scripts": {"start": "react-scripts start"} + } + (tmp_path / "package.json").write_text(json.dumps(pkg)) + assert bridge.detect_project_type(str(tmp_path)) == "react-scripts" + + def test_detects_vite_from_deps(self, tmp_path): + """Should detect Vite from package.json devDependencies.""" + pkg = { + "devDependencies": {"vite": "^4.0.0"}, + "scripts": {"dev": "vite"} + } + (tmp_path / "package.json").write_text(json.dumps(pkg)) + assert bridge.detect_project_type(str(tmp_path)) == "vite" + + def test_detects_nextjs_from_deps(self, tmp_path): + """Should detect Next.js from package.json dependencies.""" + pkg = { + "dependencies": {"next": "^13.0.0"}, + "scripts": {"dev": "next dev"} + } + (tmp_path / "package.json").write_text(json.dumps(pkg)) + assert bridge.detect_project_type(str(tmp_path)) == "nextjs" + + def test_returns_none_for_unknown(self, tmp_path): + """Should return None for unknown project types.""" + (tmp_path / "random.txt").write_text("hello") + assert bridge.detect_project_type(str(tmp_path)) is None + + def test_returns_none_for_nonexistent_path(self): + """Should return None for non-existent path.""" + assert bridge.detect_project_type("/nonexistent/path/12345") is None + + +class TestGetDevCommand: + """Tests for get_dev_command function.""" + + def test_vite_with_npm(self, tmp_path): + """Should generate correct Vite command with npm.""" + (tmp_path / "package-lock.json").write_text("{}") + cmd, port = bridge.get_dev_command("vite", str(tmp_path)) + assert "npm run dev" in cmd + assert "--port" in cmd + assert port == 5173 + + def test_vite_with_pnpm(self, tmp_path): + """Should generate correct Vite command with pnpm.""" + (tmp_path / "pnpm-lock.yaml").write_text("") + cmd, port = bridge.get_dev_command("vite", str(tmp_path)) + assert "pnpm dev" in cmd + assert port == 5173 + + def test_nextjs_with_yarn(self, tmp_path): + """Should generate correct Next.js command with yarn.""" + (tmp_path / "yarn.lock").write_text("") + cmd, port = bridge.get_dev_command("nextjs", str(tmp_path)) + assert "yarn dev" in cmd + assert port == 3000 + + def test_react_scripts_port_override(self, tmp_path): + """Should use custom port for React scripts.""" + (tmp_path / "package-lock.json").write_text("{}") + cmd, port = bridge.get_dev_command("react-scripts", str(tmp_path), preferred_port=4000) + assert "PORT=4000" in cmd + assert port == 4000 + + def test_static_server(self, tmp_path): + """Should generate Python http.server command for static.""" + cmd, port = bridge.get_dev_command("static", str(tmp_path)) + assert "python3 -m http.server" in cmd + assert port == 8000 + + def test_tauri_command(self, tmp_path): + """Should generate Tauri dev command.""" + (tmp_path / "package-lock.json").write_text("{}") + cmd, port = bridge.get_dev_command("tauri", str(tmp_path)) + assert "tauri dev" in cmd + + +class TestApi: + """Tests for api function.""" + + @patch('urllib.request.urlopen') + def test_get_request(self, mock_urlopen): + """Should make GET request and parse JSON response.""" + mock_response = MagicMock() + mock_response.read.return_value = b'{"id": "123", "name": "test"}' + mock_urlopen.return_value.__enter__.return_value = mock_response + + result = bridge.api("GET", "/test") + assert result == {"id": "123", "name": "test"} + + @patch('urllib.request.urlopen') + def test_post_request(self, mock_urlopen): + """Should make POST request with JSON body.""" + mock_response = MagicMock() + mock_response.read.return_value = b'{"success": true}' + mock_urlopen.return_value.__enter__.return_value = mock_response + + result = bridge.api("POST", "/test", {"key": "value"}) + assert result == {"success": True} + + @patch('urllib.request.urlopen') + def test_http_error(self, mock_urlopen): + """Should handle HTTP errors gracefully.""" + from urllib.error import HTTPError + mock_urlopen.side_effect = HTTPError( + "http://test", 404, "Not Found", {}, None + ) + + result = bridge.api("GET", "/test") + assert "error" in result + assert result["status"] == 404 + + +class TestGetAgents: + """Tests for get_agents function.""" + + @patch.object(bridge, 'api') + def test_returns_list(self, mock_api): + """Should return list of agents.""" + mock_api.return_value = [ + {"id": "1", "name": "Agent1", "status": "idle"}, + {"id": "2", "name": "Agent2", "status": "active"} + ] + + result = bridge.get_agents() + assert len(result) == 2 + assert result[0]["name"] == "Agent1" + + @patch.object(bridge, 'api') + def test_handles_error(self, mock_api): + """Should return empty list on error.""" + mock_api.return_value = {"error": "Connection failed"} + + result = bridge.get_agents() + assert result == [] + + @patch.object(bridge, 'api') + def test_handles_non_list(self, mock_api): + """Should return empty list for non-list response.""" + mock_api.return_value = {"count": 5} + + result = bridge.get_agents() + assert result == [] + + +class TestGetAgentByName: + """Tests for get_agent_by_name function.""" + + @patch.object(bridge, 'get_agents') + def test_exact_match(self, mock_get_agents): + """Should find agent by exact name match.""" + mock_get_agents.return_value = [ + {"id": "1", "name": "Frontend Developer", "role": "dev"}, + {"id": "2", "name": "Backend Architect", "role": "arch"} + ] + + result = bridge.get_agent_by_name("Frontend Developer") + assert result["id"] == "1" + + @patch.object(bridge, 'get_agents') + def test_fuzzy_match(self, mock_get_agents): + """Should find agent by fuzzy match.""" + mock_get_agents.return_value = [ + {"id": "1", "name": "Frontend Developer", "role": "dev"} + ] + + result = bridge.get_agent_by_name("frontend") + assert result["id"] == "1" + + @patch.object(bridge, 'get_agents') + def test_no_match(self, mock_get_agents): + """Should return None when no match found.""" + mock_get_agents.return_value = [ + {"id": "1", "name": "Agent1", "role": "dev"} + ] + + result = bridge.get_agent_by_name("nonexistent") + assert result is None + + +class TestAssign: + """Tests for assign function.""" + + @patch.object(bridge, 'get_agent_by_name') + @patch.object(bridge, 'api') + def test_creates_issue(self, mock_api, mock_get_agent): + """Should create issue and assign to agent.""" + mock_get_agent.return_value = {"id": "agent1", "name": "Test Agent"} + mock_api.return_value = {"id": "issue1", "status": "todo"} + + result = bridge.assign("Test Agent", "Build landing page") + assert result["id"] == "issue1" + + @patch.object(bridge, 'get_agent_by_name') + def test_agent_not_found(self, mock_get_agent): + """Should return None when agent not found.""" + mock_get_agent.return_value = None + + result = bridge.assign("Nonexistent", "Task") + assert result is None + + +class TestGenerateShareableUrl: + """Tests for generate_shareable_url function.""" + + @patch.object(bridge, 'api') + @patch.object(bridge, 'detect_project_type') + @patch.object(bridge, 'find_free_port') + def test_generates_urls(self, mock_find_port, mock_detect, mock_api, tmp_path): + """Should generate both local and shareable URLs.""" + test_cwd = str(tmp_path) + mock_api.side_effect = [ + {"id": "issue1", "executionWorkspaceId": "ws1", "title": "Test Issue", "assigneeAgentId": "agent1"}, + {"cwd": test_cwd, "id": "ws1"}, + {"id": "comment1"} + ] + mock_detect.return_value = "vite" + mock_find_port.return_value = 5173 + + with patch.dict(os.environ, {"PAPERCLIP_PUBLIC_URL": "https://test.com"}): + result = bridge.generate_shareable_url("issue1") + + assert result is not None + assert result["local_url"] == "http://localhost:5173" + assert result["share_url"] == "https://test.com/preview/issue1" + assert result["project_type"] == "vite" + + @patch.object(bridge, 'api') + def test_no_workspace(self, mock_api): + """Should return None when issue has no workspace.""" + mock_api.return_value = {"id": "issue1", "executionWorkspaceId": None} + + result = bridge.generate_shareable_url("issue1") + assert result is None + + @patch.object(bridge, 'api') + def test_issue_not_found(self, mock_api): + """Should return None when issue not found.""" + mock_api.return_value = {"error": "Not found"} + + result = bridge.generate_shareable_url("nonexistent") + assert result is None + + +class TestProjectTypeConfig: + """Tests for PROJECT_TYPES configuration.""" + + def test_all_types_have_required_keys(self): + """All project types should have required configuration keys.""" + required_keys = ["files", "commands", "default_port", "url_path"] + for proj_type, config in bridge.PROJECT_TYPES.items(): + for key in required_keys: + assert key in config, f"{proj_type} missing {key}" + + def test_ports_are_valid(self): + """All default ports should be in valid range.""" + for proj_type, config in bridge.PROJECT_TYPES.items(): + port = config["default_port"] + assert 1 <= port <= 65535, f"{proj_type} has invalid port {port}" + + +class TestIntegration: + """Integration-style tests.""" + + def test_full_workflow_mocked(self, tmp_path): + """Test complete workflow with mocked API.""" + # Create a fake Vite project + (tmp_path / "vite.config.ts").write_text("export default {}") + (tmp_path / "package.json").write_text(json.dumps({ + "scripts": {"dev": "vite"} + })) + + # Detect project type + proj_type = bridge.detect_project_type(str(tmp_path)) + assert proj_type == "vite" + + # Get dev command + cmd, port = bridge.get_dev_command(proj_type, str(tmp_path)) + assert cmd is not None + assert port == 5173 + + +if __name__ == "__main__": + import pytest + pytest.main([__file__, "-v"])