Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 192 additions & 0 deletions eyetrackvr_backend/calibration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import numpy as np
import matplotlib.pyplot as plt


class CalibrationEllipse:
def __init__(self, n_std_devs=2.5):
self.xs = []
self.ys = []
self.n_std_devs = float(n_std_devs)
self.fitted = False

self.scale_factor = 0.80

self.flip_y = False
self.flip_x = True

# Parameters
self.center = None
self.axes = None
self.evecs = None

def add_sample(self, x, y):
self.xs.append(float(x))
self.ys.append(float(y))
self.fitted = False

def set_inset_percent(self, percent_smaller=0.0):
clamped_percent = np.clip(percent_smaller, 0.0, 100.0)
self.scale_factor = 1.0 - (clamped_percent / 100.0)

def init_from_save(self, evecs, axes):
"""
Initialize from save.
NOTE: We ignore the saved 'evecs' rotation to ensure strict axis alignment.
"""
try:
axes_array = np.asarray(axes, dtype=float)

if axes_array.shape != (2,):
print(f"[ERROR] Invalid axes shape: {axes_array.shape}.")
return False

if np.all(axes_array == 0) or np.any(np.isnan(axes_array)):
print("[ERROR] Saved data contains zero or NaN values.")
return False

# Force Identity Matrix (No Rotation)
self.evecs = np.eye(2)
self.axes = axes_array

self.fitted = True
return True

except (ValueError, TypeError) as e:
print(f"[ERROR] Failed to load calibration data: {e}")
self.fitted = False
return False

def fit_ellipse(self):
"""
Fits an axis-aligned ellipse (no rotation) using standard deviation.
"""
N = len(self.xs)
if N < 2:
print("Warning: Need >= 2 samples to fit.")
self.fitted = False
return 0, 0

# 1. Calculate Center (Mean)
mean_x = np.mean(self.xs)
mean_y = np.mean(self.ys)
self.center = np.array([mean_x, mean_y])

# 2. Calculate Axis Lengths (Standard Deviation)
std_x = np.std(self.xs)
std_y = np.std(self.ys)

# Apply sigma multiplier
radius_x = std_x * self.n_std_devs
radius_y = std_y * self.n_std_devs

# Safety clamp
if radius_x < 1e-12:
radius_x = 1e-12
if radius_y < 1e-12:
radius_y = 1e-12

self.axes = np.array([radius_x, radius_y])

# 3. Force Identity Matrix (Strict Horizontal/Vertical alignment)
self.evecs = np.eye(2)

self.fitted = True
return self.evecs, self.axes

def normalize(self, pupil_pos, target_pos=None, clip=True):
if not self.fitted:
return 0.0, 0.0

x, y = float(pupil_pos[0]), float(pupil_pos[1])

if target_pos is None:
cx, cy = self.center
else:
cx, cy = target_pos

# Calculate deltas
dx = x - cx
dy = y - cy

# Get calibration radii
rx, ry = self.axes * self.scale_factor

# Normalize
norm_x = dx / rx
norm_y = dy / ry

# --- APPLY FLIPS ---
# If flip_x is True: Inverts the sign.
final_x = -norm_x if self.flip_x else norm_x

# If flip_y is False: Inverts Screen Y (so Up is Positive).
final_y = norm_y if self.flip_y else -norm_y

if clip:
final_x = np.clip(final_x, -1.0, 1.0)
final_y = np.clip(final_y, -1.0, 1.0)

return float(final_x), float(final_y)

def denormalize(self, norm_x, norm_y, target_pos=None):
if not self.fitted:
return 0.0, 0.0

# 1. Reverse the Output Mapping
nx = -norm_x if self.flip_x else norm_x
ny = norm_y if self.flip_y else -norm_y

# 2. Scale back up
rx, ry = self.axes * self.scale_factor
dx = nx * rx
dy = ny * ry

# 3. Add Center
if target_pos is None:
cx, cy = self.center
else:
cx, cy = target_pos

return float(cx + dx), float(cy + dy)

def fit_and_visualize(self):
plt.figure(figsize=(10, 8))

plt.plot(self.xs, self.ys, 'k.', label='Samples', alpha=0.5)
plt.axis('equal')
plt.grid(True, alpha=0.3)

# Invert plot Y axis to match screen coordinates
plt.gca().invert_yaxis()

if not self.fitted:
self.fit_ellipse()

if self.fitted:
scaled_axes = self.axes * self.scale_factor
t = np.linspace(0, 2 * np.pi, 200)

el_x = self.center[0] + scaled_axes[0] * np.cos(t)
el_y = self.center[1] + scaled_axes[1] * np.sin(t)

plt.plot(el_x, el_y, 'b-', linewidth=2, label='Axis-Aligned Fit')
plt.plot(self.center[0], self.center[1], 'r+', markersize=15, label='Center')

plt.hlines(self.center[1],
self.center[0] - scaled_axes[0],
self.center[0] + scaled_axes[0],
colors='g', linestyles='-', label='Width (X)')

plt.vlines(self.center[0],
self.center[1] - scaled_axes[1],
self.center[1] + scaled_axes[1],
colors='m', linestyles='-', label='Height (Y)')

plt.title(f'Axis-Aligned Calibration (FlipX={self.flip_x})')
else:
plt.title("Fit FAILED")

plt.legend()
plt.tight_layout()
plt.show()

49 changes: 48 additions & 1 deletion eyetrackvr_backend/etvr.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from .config import ConfigManager
from multiprocessing import Manager
from .logger import get_logger
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from .tracker import Tracker

logger = get_logger()
Expand Down Expand Up @@ -89,6 +89,21 @@ def shutdown(self) -> None:
self.config.stop()
os.kill(os.getpid(), signal.SIGTERM)

def _get_tracker(self, uuid: str) -> Tracker:
for tracker in self.trackers:
if tracker.uuid == uuid:
return tracker
raise HTTPException(status_code=404, detail=f"No tracker found with UUID `{uuid}`")

def tracker_recenter(self, uuid: str) -> dict:
return self._get_tracker(uuid).recenter()

def tracker_calibrate(self, uuid: str) -> dict:
return self._get_tracker(uuid).calibrate()

def tracker_calibration_state(self, uuid: str) -> dict:
return self._get_tracker(uuid).check_state()

def add_routes(self) -> None:
logger.debug("Adding routes to ETVR")
# region: Image streaming endpoints
Expand Down Expand Up @@ -159,6 +174,38 @@ def add_routes(self) -> None:
""",
)
# endregion
# region: Tracker Calibration Endpoints
self.router.add_api_route(
name="Recenter tracker calibration",
path="/etvr/tracker/{uuid}/recenter",
endpoint=self.tracker_recenter,
methods=["GET"],
tags=["Tracker Calibration"],
description="""
Recenter the calibration for a tracker.
""",
)
self.router.add_api_route(
name="Toggle tracker calibration",
path="/etvr/tracker/{uuid}/calibrate",
endpoint=self.tracker_calibrate,
methods=["GET"],
tags=["Tracker Calibration"],
description="""
Toggle calibration mode for a tracker. Calling again stops calibration and fits the ellipse.
""",
)
self.router.add_api_route(
name="Get tracker calibration state",
path="/etvr/tracker/{uuid}/calibration/state",
endpoint=self.tracker_calibration_state,
methods=["GET"],
tags=["Tracker Calibration"],
description="""
Return the current calibration state for a tracker.
""",
)
# endregion
# region: Config Endpoints
self.router.add_api_route(
name="Update Config",
Expand Down
1 change: 1 addition & 0 deletions eyetrackvr_backend/processes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .camera import Camera
from .eye_processor import EyeProcessor
from .osc import VRChatOSC, VRChatOSCReceiver
from .calibration import CalibrationProcessor
74 changes: 74 additions & 0 deletions eyetrackvr_backend/processes/calibration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from queue import Queue, Empty, Full
import numpy as np

from ..types import EyeData, TRACKING_FAILED
from ..utils import WorkerProcess
from ..calibration import CalibrationEllipse


class CalibrationProcessor(WorkerProcess):
def __init__(
self,
input_queue: Queue[EyeData],
output_queue: Queue[EyeData],
state,
name: str,
uuid: str,
):
super().__init__(name=f"Calibration {name}", uuid=uuid)
# Synced variables
self.input_queue = input_queue
self.output_queue = output_queue
self.state = state
# Unsynced variables
self.calibration = CalibrationEllipse()
self.recenter_reference: np.ndarray | None = None
self._prev_calibrating = False

def startup(self) -> None:
pass

def run(self) -> None:
try:
eye_data: EyeData = self.input_queue.get(block=True, timeout=0.5)
except Empty:
return
except Exception:
self.logger.exception("Failed to get eye data from queue")
return

calibrating = bool(self.state.get("calibrating", False))
if calibrating and not self._prev_calibrating:
self.calibration = CalibrationEllipse()
self.state["calibrated"] = False
self.state["samples"] = 0

if not calibrating and self._prev_calibrating:
self.calibration.fit_ellipse()
self.state["calibrated"] = bool(self.calibration.fitted)
self.state["samples"] = len(self.calibration.xs)

self._prev_calibrating = calibrating

if self.state.get("recenter_requested", False):
self.recenter_reference = np.array([eye_data.x, eye_data.y], dtype=float)
self.state["recenter_requested"] = False
self.state["recentered"] = True

if calibrating and eye_data != TRACKING_FAILED:
self.calibration.add_sample(eye_data.x, eye_data.y)
self.state["samples"] = len(self.calibration.xs)

if self.calibration.fitted:
norm_x, norm_y = self.calibration.normalize((eye_data.x, eye_data.y), target_pos=self.recenter_reference)
eye_data.x = (norm_x + 1.0) / 2.0
eye_data.y = (norm_y + 1.0) / 2.0

try:
self.output_queue.put(eye_data, block=False)
except Full:
pass

def shutdown(self) -> None:
pass

Loading
Loading