From b4c567a5a73fabb65c361c195d96f286a3b56017 Mon Sep 17 00:00:00 2001 From: Daniel Lindestad Date: Wed, 6 May 2026 19:03:26 +0200 Subject: [PATCH] fix camera cleanup --- app.py | 2 + lib/camera.py | 43 ++++++++++++------- tests/test_camera_cleanup.py | 81 ++++++++++++++++++++++++++++++++++++ 3 files changed, 112 insertions(+), 14 deletions(-) create mode 100644 tests/test_camera_cleanup.py diff --git a/app.py b/app.py index d557ac3..c7ed525 100644 --- a/app.py +++ b/app.py @@ -133,6 +133,8 @@ def _shutdown(): system_control = app.config.get("SYSTEM_CONTROL") if system_control: system_control.close() + if camera: + camera.release() atexit.register(_shutdown) diff --git a/lib/camera.py b/lib/camera.py index e2aa075..2cc463e 100644 --- a/lib/camera.py +++ b/lib/camera.py @@ -91,6 +91,8 @@ def stop(self): if self._thread and self._thread.is_alive(): self._thread.join(timeout=2.0) self._cleanup() + if self._thread and self._thread.is_alive(): + self._thread.join(timeout=2.0) def get_latest_jpeg(self): with self._frame_lock: @@ -275,19 +277,21 @@ def _run_gst_subprocess(self): print(f"[RPi Camera] Listening on UDP port {self.port} …") self.is_listening = True - self._gst_proc = subprocess.Popen( + proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0, ) + self._gst_proc = proc - self._stderr_thread = threading.Thread(target=self._drain_stderr, daemon=True) + self._stderr_thread = threading.Thread(target=self._drain_stderr, args=(proc,), daemon=True) self._stderr_thread.start() - stream = self._gst_proc.stdout + stream = proc.stdout if stream is None: self.is_connected = False + self._cleanup_gst(proc) return had_frame = False @@ -295,9 +299,9 @@ def _run_gst_subprocess(self): while not self._stop_event.is_set(): chunk = stream.read(8192) if not chunk: - if self._gst_proc.poll() is not None: - if self._gst_proc.returncode not in (0, None): - self.last_error = f"gst-launch exited with code {self._gst_proc.returncode}" + if proc.poll() is not None: + if proc.returncode not in (0, None): + self.last_error = f"gst-launch exited with code {proc.returncode}" break time.sleep(0.01) continue @@ -329,15 +333,15 @@ def _run_gst_subprocess(self): if self._is_stream_stale(): self.is_connected = False - self._cleanup_gst() + self._cleanup_gst(proc) def _is_stream_stale(self, timeout_s=2.0): return (time.monotonic() - self._last_frame_ts) > timeout_s - def _drain_stderr(self): + def _drain_stderr(self, proc): try: - if self._gst_proc and self._gst_proc.stderr: - for raw in self._gst_proc.stderr: + if proc.stderr: + for raw in proc.stderr: line = raw.decode(errors="ignore").strip() if line and ("ERROR" in line.upper() or "not found" in line.lower()): self.last_error = line @@ -346,19 +350,30 @@ def _drain_stderr(self): except Exception: pass - def _cleanup_gst(self): - proc = self._gst_proc - self._gst_proc = None + def _cleanup_gst(self, proc=None): + proc = proc or self._gst_proc + if self._gst_proc is proc: + self._gst_proc = None if not proc: return try: - proc.terminate() + if proc.poll() is None: + proc.terminate() proc.wait(timeout=1.5) except Exception: try: proc.kill() except Exception: pass + try: + proc.wait(timeout=1.5) + except Exception: + pass + stderr_thread = self._stderr_thread + if stderr_thread and stderr_thread.is_alive() and stderr_thread is not threading.current_thread(): + stderr_thread.join(timeout=1.0) + if self._stderr_thread is stderr_thread: + self._stderr_thread = None def _cleanup(self): self.is_connected = False diff --git a/tests/test_camera_cleanup.py b/tests/test_camera_cleanup.py new file mode 100644 index 0000000..24fef88 --- /dev/null +++ b/tests/test_camera_cleanup.py @@ -0,0 +1,81 @@ +import subprocess + +from lib import camera + + +class TimeoutThenKillProcess: + def __init__(self): + self.terminate_called = False + self.kill_called = False + self.wait_calls = 0 + + def poll(self): + return None if not self.kill_called else -9 + + def terminate(self): + self.terminate_called = True + + def wait(self, timeout=None): + self.wait_calls += 1 + if not self.kill_called: + raise subprocess.TimeoutExpired("gst-launch-1.0", timeout) + return -9 + + def kill(self): + self.kill_called = True + + +class EmptyStream: + def __init__(self, receiver): + self.receiver = receiver + + def read(self, _size): + self.receiver._gst_proc = None # pylint: disable=protected-access + return b"" + + +class ExitedProcess: + def __init__(self, receiver): + self.stdout = EmptyStream(receiver) + self.stderr = [] + self.returncode = 0 + self.wait_calls = 0 + + def poll(self): + return self.returncode + + def terminate(self): + raise AssertionError("already-exited process should not be terminated") + + def wait(self, timeout=None): + self.wait_calls += 1 + return self.returncode + + def kill(self): + raise AssertionError("already-exited process should not be killed") + + +def test_gst_cleanup_reaps_process_after_kill(): + receiver = camera.RPiCameraReceiver() + proc = TimeoutThenKillProcess() + receiver._gst_proc = proc # pylint: disable=protected-access + + receiver._cleanup_gst() # pylint: disable=protected-access + + assert proc.terminate_called is True + assert proc.kill_called is True + assert proc.wait_calls == 2 + assert receiver._gst_proc is None # pylint: disable=protected-access + + +def test_gst_reader_uses_local_process_when_cleanup_clears_instance_ref(monkeypatch): + receiver = camera.RPiCameraReceiver() + proc = ExitedProcess(receiver) + + monkeypatch.setattr(camera.shutil, "which", lambda _name: "/usr/bin/gst-launch-1.0") + monkeypatch.setattr(camera.subprocess, "Popen", lambda *args, **kwargs: proc) + + receiver._run_gst_subprocess() # pylint: disable=protected-access + + assert proc.wait_calls == 1 + assert receiver._gst_proc is None # pylint: disable=protected-access