diff --git a/dev-utils/rf-tracker/index.html b/dev-utils/rf-tracker/index.html new file mode 100644 index 0000000..cd905bc --- /dev/null +++ b/dev-utils/rf-tracker/index.html @@ -0,0 +1,466 @@ + + + + + + RF Tracker + + + + +

RF Tracker

+
Connecting…
+ + +
+
+
+
+
+
+
+
+
← LEFT
+
RIGHT →
+
FRONT
+
+ + +
+
+
Chain 0
+
+
+
+
Chain 1
+
+
+
+
Strength
+
+
+
+
Error
+
+
+
+ + +
+
+
Direction (← L | 0 | R →)
+
+
+
+
+ ← Strong Left + Centered + Strong Right → +
+
+
+ + +
+

Calibration

+
+ + + 0.0 +
+
+ + +
+

+ Point antenna at car, click "Set Baseline Here" to zero the offset. +

+
+ +

+ Rotate antenna until arrow points to FRONT and error is near zero.
+ Maximize signal strength while centering the arrow. +

+ + + + diff --git a/dev-utils/rf-tracker/main.py b/dev-utils/rf-tracker/main.py new file mode 100644 index 0000000..738d722 --- /dev/null +++ b/dev-utils/rf-tracker/main.py @@ -0,0 +1,55 @@ +import os +import json +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from fastapi.staticfiles import StaticFiles +from fastapi.responses import FileResponse +from pathlib import Path + +from rocket_reader import RocketReader + +app = FastAPI() + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +rocket_host = os.getenv("ROCKET_HOST", "192.168.1.20") +rocket_user = os.getenv("ROCKET_USER", "wfr-daq") +rocket_pass = os.getenv("ROCKET_PASS", "westernformularacing") + +reader = RocketReader(host=rocket_host, user=rocket_user, password=rocket_pass) + +APP_DIR = Path(__file__).parent + + +@app.get("/signal") +def get_signal(): + data = reader.get_status() + if not data: + return {"error": "no data"} + + direction = reader.compute_direction(data["chain0"], data["chain1"]) + normalized = reader.normalize(direction["error"]) + + return { + "chain0": data["chain0"], + "chain1": data["chain1"], + "error": direction["error"], + "normalized": normalized, + "strength": direction["strength"], + } + + +@app.get("/") +def index(): + return FileResponse(str(APP_DIR / "index.html")) + + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8060) diff --git a/dev-utils/rf-tracker/requirements.txt b/dev-utils/rf-tracker/requirements.txt new file mode 100644 index 0000000..bdd3e3b --- /dev/null +++ b/dev-utils/rf-tracker/requirements.txt @@ -0,0 +1,3 @@ +fastapi>=0.100.0 +uvicorn>=0.23.0 +paramiko>=3.0.0 diff --git a/dev-utils/rf-tracker/rocket_reader.py b/dev-utils/rf-tracker/rocket_reader.py new file mode 100644 index 0000000..4639de5 --- /dev/null +++ b/dev-utils/rf-tracker/rocket_reader.py @@ -0,0 +1,57 @@ +import paramiko +import re +import os + + +class RocketReader: + def __init__(self, host=None, user=None, password=None): + self.host = host or os.getenv("ROCKET_HOST", "192.168.1.20") + self.user = user or os.getenv("ROCKET_USER", "wfr-daq") + self.password = password or os.getenv("ROCKET_PASS", "westernformularacing") + + def get_status(self): + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(self.host, username=self.user, password=self.password) + + stdin, stdout, stderr = ssh.exec_command("mca-status") + output = stdout.read().decode() + ssh.close() + + return self.parse(output) + + def parse(self, raw): + match = re.search(r'"chain":\s*\[(-?\d+),\s*(-?\d+)\]', raw) + if not match: + return None + + c0 = int(match.group(1)) + c1 = int(match.group(2)) + + return { + "chain0": c0, + "chain1": c1, + } + + def compute_direction(self, c0, c1): + error = c0 - c1 + strength = (c0 + c1) / 2 + return { + "error": error, + "strength": strength, + } + + def normalize(self, error): + return max(min(error / 20.0, 1), -1) + + +if __name__ == "__main__": + reader = RocketReader() + data = reader.get_status() + if data: + direction = reader.compute_direction(data["chain0"], data["chain1"]) + print(f"chain0={data['chain0']}, chain1={data['chain1']}, " + f"error={direction['error']}, normalized={reader.normalize(direction['error']):.2f}, " + f"strength={direction['strength']:.1f}") + else: + print("Failed to parse mca-status output")