Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ def _shutdown():
system_control = app.config.get("SYSTEM_CONTROL")
if system_control:
system_control.close()
if camera:
camera.release()


atexit.register(_shutdown)
Expand Down
43 changes: 29 additions & 14 deletions lib/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ def stop(self):
if self._thread and self._thread.is_alive():
self._thread.join(timeout=2.0)
self._cleanup()
if self._thread and self._thread.is_alive():
self._thread.join(timeout=2.0)

def get_latest_jpeg(self):
with self._frame_lock:
Expand Down Expand Up @@ -275,29 +277,31 @@ def _run_gst_subprocess(self):

print(f"[RPi Camera] Listening on UDP port {self.port} …")
self.is_listening = True
self._gst_proc = subprocess.Popen(
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0,
)
self._gst_proc = proc

self._stderr_thread = threading.Thread(target=self._drain_stderr, daemon=True)
self._stderr_thread = threading.Thread(target=self._drain_stderr, args=(proc,), daemon=True)
self._stderr_thread.start()

stream = self._gst_proc.stdout
stream = proc.stdout
if stream is None:
self.is_connected = False
self._cleanup_gst(proc)
return

had_frame = False
buffer = bytearray()
while not self._stop_event.is_set():
chunk = stream.read(8192)
if not chunk:
if self._gst_proc.poll() is not None:
if self._gst_proc.returncode not in (0, None):
self.last_error = f"gst-launch exited with code {self._gst_proc.returncode}"
if proc.poll() is not None:
if proc.returncode not in (0, None):
self.last_error = f"gst-launch exited with code {proc.returncode}"
break
time.sleep(0.01)
continue
Expand Down Expand Up @@ -329,15 +333,15 @@ def _run_gst_subprocess(self):
if self._is_stream_stale():
self.is_connected = False

self._cleanup_gst()
self._cleanup_gst(proc)

def _is_stream_stale(self, timeout_s=2.0):
return (time.monotonic() - self._last_frame_ts) > timeout_s

def _drain_stderr(self):
def _drain_stderr(self, proc):
try:
if self._gst_proc and self._gst_proc.stderr:
for raw in self._gst_proc.stderr:
if proc.stderr:
for raw in proc.stderr:
line = raw.decode(errors="ignore").strip()
if line and ("ERROR" in line.upper() or "not found" in line.lower()):
self.last_error = line
Expand All @@ -346,19 +350,30 @@ def _drain_stderr(self):
except Exception:
pass

def _cleanup_gst(self):
proc = self._gst_proc
self._gst_proc = None
def _cleanup_gst(self, proc=None):
proc = proc or self._gst_proc
if self._gst_proc is proc:
self._gst_proc = None
if not proc:
return
try:
proc.terminate()
if proc.poll() is None:
proc.terminate()
proc.wait(timeout=1.5)
except Exception:
try:
proc.kill()
except Exception:
pass
try:
proc.wait(timeout=1.5)
except Exception:
pass
stderr_thread = self._stderr_thread
if stderr_thread and stderr_thread.is_alive() and stderr_thread is not threading.current_thread():
stderr_thread.join(timeout=1.0)
if self._stderr_thread is stderr_thread:
self._stderr_thread = None

def _cleanup(self):
self.is_connected = False
Expand Down
81 changes: 81 additions & 0 deletions tests/test_camera_cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import subprocess

from lib import camera


class TimeoutThenKillProcess:
def __init__(self):
self.terminate_called = False
self.kill_called = False
self.wait_calls = 0

def poll(self):
return None if not self.kill_called else -9

def terminate(self):
self.terminate_called = True

def wait(self, timeout=None):
self.wait_calls += 1
if not self.kill_called:
raise subprocess.TimeoutExpired("gst-launch-1.0", timeout)
return -9

def kill(self):
self.kill_called = True


class EmptyStream:
def __init__(self, receiver):
self.receiver = receiver

def read(self, _size):
self.receiver._gst_proc = None # pylint: disable=protected-access
return b""


class ExitedProcess:
def __init__(self, receiver):
self.stdout = EmptyStream(receiver)
self.stderr = []
self.returncode = 0
self.wait_calls = 0

def poll(self):
return self.returncode

def terminate(self):
raise AssertionError("already-exited process should not be terminated")

def wait(self, timeout=None):
self.wait_calls += 1
return self.returncode

def kill(self):
raise AssertionError("already-exited process should not be killed")


def test_gst_cleanup_reaps_process_after_kill():
receiver = camera.RPiCameraReceiver()
proc = TimeoutThenKillProcess()
receiver._gst_proc = proc # pylint: disable=protected-access

receiver._cleanup_gst() # pylint: disable=protected-access

assert proc.terminate_called is True
assert proc.kill_called is True
assert proc.wait_calls == 2
assert receiver._gst_proc is None # pylint: disable=protected-access


def test_gst_reader_uses_local_process_when_cleanup_clears_instance_ref(monkeypatch):
receiver = camera.RPiCameraReceiver()
proc = ExitedProcess(receiver)

monkeypatch.setattr(camera.shutil, "which", lambda _name: "/usr/bin/gst-launch-1.0")
monkeypatch.setattr(camera.subprocess, "Popen", lambda *args, **kwargs: proc)

receiver._run_gst_subprocess() # pylint: disable=protected-access

assert proc.wait_calls == 1
assert receiver._gst_proc is None # pylint: disable=protected-access
Loading