diff --git a/eyetrackvr_backend/calibration.py b/eyetrackvr_backend/calibration.py new file mode 100644 index 0000000..428a318 --- /dev/null +++ b/eyetrackvr_backend/calibration.py @@ -0,0 +1,192 @@ +import numpy as np +import matplotlib.pyplot as plt + + +class CalibrationEllipse: + def __init__(self, n_std_devs=2.5): + self.xs = [] + self.ys = [] + self.n_std_devs = float(n_std_devs) + self.fitted = False + + self.scale_factor = 0.80 + + self.flip_y = False + self.flip_x = True + + # Parameters + self.center = None + self.axes = None + self.evecs = None + + def add_sample(self, x, y): + self.xs.append(float(x)) + self.ys.append(float(y)) + self.fitted = False + + def set_inset_percent(self, percent_smaller=0.0): + clamped_percent = np.clip(percent_smaller, 0.0, 100.0) + self.scale_factor = 1.0 - (clamped_percent / 100.0) + + def init_from_save(self, evecs, axes): + """ + Initialize from save. + NOTE: We ignore the saved 'evecs' rotation to ensure strict axis alignment. + """ + try: + axes_array = np.asarray(axes, dtype=float) + + if axes_array.shape != (2,): + print(f"[ERROR] Invalid axes shape: {axes_array.shape}.") + return False + + if np.all(axes_array == 0) or np.any(np.isnan(axes_array)): + print("[ERROR] Saved data contains zero or NaN values.") + return False + + # Force Identity Matrix (No Rotation) + self.evecs = np.eye(2) + self.axes = axes_array + + self.fitted = True + return True + + except (ValueError, TypeError) as e: + print(f"[ERROR] Failed to load calibration data: {e}") + self.fitted = False + return False + + def fit_ellipse(self): + """ + Fits an axis-aligned ellipse (no rotation) using standard deviation. + """ + N = len(self.xs) + if N < 2: + print("Warning: Need >= 2 samples to fit.") + self.fitted = False + return 0, 0 + + # 1. Calculate Center (Mean) + mean_x = np.mean(self.xs) + mean_y = np.mean(self.ys) + self.center = np.array([mean_x, mean_y]) + + # 2. Calculate Axis Lengths (Standard Deviation) + std_x = np.std(self.xs) + std_y = np.std(self.ys) + + # Apply sigma multiplier + radius_x = std_x * self.n_std_devs + radius_y = std_y * self.n_std_devs + + # Safety clamp + if radius_x < 1e-12: + radius_x = 1e-12 + if radius_y < 1e-12: + radius_y = 1e-12 + + self.axes = np.array([radius_x, radius_y]) + + # 3. Force Identity Matrix (Strict Horizontal/Vertical alignment) + self.evecs = np.eye(2) + + self.fitted = True + return self.evecs, self.axes + + def normalize(self, pupil_pos, target_pos=None, clip=True): + if not self.fitted: + return 0.0, 0.0 + + x, y = float(pupil_pos[0]), float(pupil_pos[1]) + + if target_pos is None: + cx, cy = self.center + else: + cx, cy = target_pos + + # Calculate deltas + dx = x - cx + dy = y - cy + + # Get calibration radii + rx, ry = self.axes * self.scale_factor + + # Normalize + norm_x = dx / rx + norm_y = dy / ry + + # --- APPLY FLIPS --- + # If flip_x is True: Inverts the sign. + final_x = -norm_x if self.flip_x else norm_x + + # If flip_y is False: Inverts Screen Y (so Up is Positive). + final_y = norm_y if self.flip_y else -norm_y + + if clip: + final_x = np.clip(final_x, -1.0, 1.0) + final_y = np.clip(final_y, -1.0, 1.0) + + return float(final_x), float(final_y) + + def denormalize(self, norm_x, norm_y, target_pos=None): + if not self.fitted: + return 0.0, 0.0 + + # 1. Reverse the Output Mapping + nx = -norm_x if self.flip_x else norm_x + ny = norm_y if self.flip_y else -norm_y + + # 2. Scale back up + rx, ry = self.axes * self.scale_factor + dx = nx * rx + dy = ny * ry + + # 3. Add Center + if target_pos is None: + cx, cy = self.center + else: + cx, cy = target_pos + + return float(cx + dx), float(cy + dy) + + def fit_and_visualize(self): + plt.figure(figsize=(10, 8)) + + plt.plot(self.xs, self.ys, 'k.', label='Samples', alpha=0.5) + plt.axis('equal') + plt.grid(True, alpha=0.3) + + # Invert plot Y axis to match screen coordinates + plt.gca().invert_yaxis() + + if not self.fitted: + self.fit_ellipse() + + if self.fitted: + scaled_axes = self.axes * self.scale_factor + t = np.linspace(0, 2 * np.pi, 200) + + el_x = self.center[0] + scaled_axes[0] * np.cos(t) + el_y = self.center[1] + scaled_axes[1] * np.sin(t) + + plt.plot(el_x, el_y, 'b-', linewidth=2, label='Axis-Aligned Fit') + plt.plot(self.center[0], self.center[1], 'r+', markersize=15, label='Center') + + plt.hlines(self.center[1], + self.center[0] - scaled_axes[0], + self.center[0] + scaled_axes[0], + colors='g', linestyles='-', label='Width (X)') + + plt.vlines(self.center[0], + self.center[1] - scaled_axes[1], + self.center[1] + scaled_axes[1], + colors='m', linestyles='-', label='Height (Y)') + + plt.title(f'Axis-Aligned Calibration (FlipX={self.flip_x})') + else: + plt.title("Fit FAILED") + + plt.legend() + plt.tight_layout() + plt.show() + diff --git a/eyetrackvr_backend/etvr.py b/eyetrackvr_backend/etvr.py index 241fc6b..ed62592 100644 --- a/eyetrackvr_backend/etvr.py +++ b/eyetrackvr_backend/etvr.py @@ -5,7 +5,7 @@ from .config import ConfigManager from multiprocessing import Manager from .logger import get_logger -from fastapi import APIRouter +from fastapi import APIRouter, HTTPException from .tracker import Tracker logger = get_logger() @@ -89,6 +89,21 @@ def shutdown(self) -> None: self.config.stop() os.kill(os.getpid(), signal.SIGTERM) + def _get_tracker(self, uuid: str) -> Tracker: + for tracker in self.trackers: + if tracker.uuid == uuid: + return tracker + raise HTTPException(status_code=404, detail=f"No tracker found with UUID `{uuid}`") + + def tracker_recenter(self, uuid: str) -> dict: + return self._get_tracker(uuid).recenter() + + def tracker_calibrate(self, uuid: str) -> dict: + return self._get_tracker(uuid).calibrate() + + def tracker_calibration_state(self, uuid: str) -> dict: + return self._get_tracker(uuid).check_state() + def add_routes(self) -> None: logger.debug("Adding routes to ETVR") # region: Image streaming endpoints @@ -159,6 +174,38 @@ def add_routes(self) -> None: """, ) # endregion + # region: Tracker Calibration Endpoints + self.router.add_api_route( + name="Recenter tracker calibration", + path="/etvr/tracker/{uuid}/recenter", + endpoint=self.tracker_recenter, + methods=["GET"], + tags=["Tracker Calibration"], + description=""" + Recenter the calibration for a tracker. + """, + ) + self.router.add_api_route( + name="Toggle tracker calibration", + path="/etvr/tracker/{uuid}/calibrate", + endpoint=self.tracker_calibrate, + methods=["GET"], + tags=["Tracker Calibration"], + description=""" + Toggle calibration mode for a tracker. Calling again stops calibration and fits the ellipse. + """, + ) + self.router.add_api_route( + name="Get tracker calibration state", + path="/etvr/tracker/{uuid}/calibration/state", + endpoint=self.tracker_calibration_state, + methods=["GET"], + tags=["Tracker Calibration"], + description=""" + Return the current calibration state for a tracker. + """, + ) + # endregion # region: Config Endpoints self.router.add_api_route( name="Update Config", diff --git a/eyetrackvr_backend/processes/__init__.py b/eyetrackvr_backend/processes/__init__.py index e04011d..ae5cb06 100644 --- a/eyetrackvr_backend/processes/__init__.py +++ b/eyetrackvr_backend/processes/__init__.py @@ -1,3 +1,4 @@ from .camera import Camera from .eye_processor import EyeProcessor from .osc import VRChatOSC, VRChatOSCReceiver +from .calibration import CalibrationProcessor diff --git a/eyetrackvr_backend/processes/calibration.py b/eyetrackvr_backend/processes/calibration.py new file mode 100644 index 0000000..1e68037 --- /dev/null +++ b/eyetrackvr_backend/processes/calibration.py @@ -0,0 +1,74 @@ +from queue import Queue, Empty, Full +import numpy as np + +from ..types import EyeData, TRACKING_FAILED +from ..utils import WorkerProcess +from ..calibration import CalibrationEllipse + + +class CalibrationProcessor(WorkerProcess): + def __init__( + self, + input_queue: Queue[EyeData], + output_queue: Queue[EyeData], + state, + name: str, + uuid: str, + ): + super().__init__(name=f"Calibration {name}", uuid=uuid) + # Synced variables + self.input_queue = input_queue + self.output_queue = output_queue + self.state = state + # Unsynced variables + self.calibration = CalibrationEllipse() + self.recenter_reference: np.ndarray | None = None + self._prev_calibrating = False + + def startup(self) -> None: + pass + + def run(self) -> None: + try: + eye_data: EyeData = self.input_queue.get(block=True, timeout=0.5) + except Empty: + return + except Exception: + self.logger.exception("Failed to get eye data from queue") + return + + calibrating = bool(self.state.get("calibrating", False)) + if calibrating and not self._prev_calibrating: + self.calibration = CalibrationEllipse() + self.state["calibrated"] = False + self.state["samples"] = 0 + + if not calibrating and self._prev_calibrating: + self.calibration.fit_ellipse() + self.state["calibrated"] = bool(self.calibration.fitted) + self.state["samples"] = len(self.calibration.xs) + + self._prev_calibrating = calibrating + + if self.state.get("recenter_requested", False): + self.recenter_reference = np.array([eye_data.x, eye_data.y], dtype=float) + self.state["recenter_requested"] = False + self.state["recentered"] = True + + if calibrating and eye_data != TRACKING_FAILED: + self.calibration.add_sample(eye_data.x, eye_data.y) + self.state["samples"] = len(self.calibration.xs) + + if self.calibration.fitted: + norm_x, norm_y = self.calibration.normalize((eye_data.x, eye_data.y), target_pos=self.recenter_reference) + eye_data.x = (norm_x + 1.0) / 2.0 + eye_data.y = (norm_y + 1.0) / 2.0 + + try: + self.output_queue.put(eye_data, block=False) + except Full: + pass + + def shutdown(self) -> None: + pass + diff --git a/eyetrackvr_backend/tracker.py b/eyetrackvr_backend/tracker.py index 8288fc1..2beb658 100644 --- a/eyetrackvr_backend/tracker.py +++ b/eyetrackvr_backend/tracker.py @@ -6,7 +6,7 @@ from .config import EyeTrackConfig from .visualizer import Visualizer from multiprocessing.managers import SyncManager -from .processes import EyeProcessor, Camera, VRChatOSC +from .processes import EyeProcessor, Camera, VRChatOSC, CalibrationProcessor # TODO: when we start to integrate babble this should become a common interface that eye trackers and mouth trackers inherit from @@ -19,31 +19,51 @@ def __init__(self, config: EyeTrackConfig, uuid: str, manager: SyncManager, rout # IPC stuff self.manager = manager self.osc_queue: Queue[EyeData] = self.manager.Queue(maxsize=60) + self.calibration_queue: Queue[EyeData] = self.manager.Queue(maxsize=60) self.image_queue: Queue[MatLike] = self.manager.Queue(maxsize=60) # Used purely for visualization in the frontend self.camera_queue: Queue[MatLike] = self.manager.Queue(maxsize=15) self.algo_frame_queue: Queue[MatLike] = self.manager.Queue(maxsize=15) # processes - self.processor = EyeProcessor(self.tracker_config, self.image_queue, self.osc_queue, self.algo_frame_queue) + self.processor = EyeProcessor(self.tracker_config, self.image_queue, self.calibration_queue, self.algo_frame_queue) self.camera = Camera(self.tracker_config, self.image_queue, self.camera_queue) self.osc_sender = VRChatOSC(self.osc_queue, self.tracker_config.name) + self.calibration_state = self.manager.dict( + { + "calibrating": False, + "calibrated": False, + "recenter_requested": False, + "recentered": False, + "samples": 0, + } + ) + self.calibration = CalibrationProcessor( + self.calibration_queue, + self.osc_queue, + self.calibration_state, + self.tracker_config.name, + self.tracker_config.uuid, + ) # Visualization self.camera_visualizer = Visualizer(self.camera_queue) self.algorithm_visualizer = Visualizer(self.algo_frame_queue) def start(self) -> None: self.osc_sender.start() + self.calibration.start() self.processor.start() self.camera.start() def stop(self) -> None: self.camera.stop() self.processor.stop() + self.calibration.stop() self.osc_sender.stop() self.camera_visualizer.stop() self.algorithm_visualizer.stop() # if we dont do this we memory leak :3 clear_queue(self.osc_queue) + clear_queue(self.calibration_queue) clear_queue(self.image_queue) clear_queue(self.camera_queue) clear_queue(self.algo_frame_queue) @@ -51,4 +71,21 @@ def stop(self) -> None: def restart(self) -> None: self.camera.restart() self.osc_sender.restart() + self.calibration.restart() self.processor.restart() + + def recenter(self) -> dict: + self.calibration_state["recenter_requested"] = True + return self.check_state() + + def calibrate(self) -> dict: + self.calibration_state["calibrating"] = not bool(self.calibration_state.get("calibrating", False)) + return self.check_state() + + def check_state(self) -> dict: + return { + "calibrating": bool(self.calibration_state.get("calibrating", False)), + "calibrated": bool(self.calibration_state.get("calibrated", False)), + "recentered": bool(self.calibration_state.get("recentered", False)), + "samples": int(self.calibration_state.get("samples", 0)), + }